forked from Rust-related/RustPython
* add supporting for PyAtomic<PyObject> * create sqlite module * add dependency sqlite3-sys * add module constants * import sqlite3 from cpython * adjust lib * add module structure * impl Connection.cursor * add module exceptions * impl lstrip_sql * impl statement new * wip cursor.execute * wip cursor * wip error to exception * add SqliteRaw and SqliteStatementRaw * impl statement parameters binding * wip cursor.execute * add test_sqlite * impl closeable connection * impl closeable cursor * impl cursor.executemany * impl cursor.executescript * impl cursor.fetch* * impl connection.backup * stage 1 * add support connection.backup with progress * fix backup deadlock * support changable isolation_level * impl converter * impl adapter * impl text_factory and blob * impl create_function * impl create_function 2 * fix empty statement * impl blob support * impl create_aggregate * impl create_aggregate 2 * refactor create_* * impl enable_callback_traceback * impl create_collation * refactor create_* with CallbackData * fix text and blob use SQLITE_TRANSIENT * fix str to SQLITE_TEXT * impl thread check * impl Connection Factory * impl busy timeout * shift sqlite3-sys -> libsqlite3-sys * refactor CallbackData * impl create_window_function * refactor callback functions * add module attr converters * fix nullable isolation_level * add module attr adapters * fix nullable adapt proto * impl set_authorizer * impl trace_callback * impl set_progress_handler * impl cancellable sqlite function* * impl attributes for Connection * fix some failed tests * impl Row * impl Blob methods * impl Blob subscript & ass_subscript * pass tests * rebase * no sqlite for wasm * use ThreadId instead u64 * no libsqlite3-sys for wasm * fix into_cstring for all platform * fixup * rebase * fix windows into_bytes * disable sqlite for android * fixup
167 lines
5.7 KiB
Python
Vendored
167 lines
5.7 KiB
Python
Vendored
import sqlite3 as sqlite
|
|
import unittest
|
|
|
|
|
|
class BackupTests(unittest.TestCase):
|
|
def setUp(self):
|
|
cx = self.cx = sqlite.connect(":memory:")
|
|
cx.execute('CREATE TABLE foo (key INTEGER)')
|
|
cx.executemany('INSERT INTO foo (key) VALUES (?)', [(3,), (4,)])
|
|
cx.commit()
|
|
|
|
def tearDown(self):
|
|
self.cx.close()
|
|
|
|
def verify_backup(self, bckcx):
|
|
result = bckcx.execute("SELECT key FROM foo ORDER BY key").fetchall()
|
|
self.assertEqual(result[0][0], 3)
|
|
self.assertEqual(result[1][0], 4)
|
|
|
|
def test_bad_target(self):
|
|
with self.assertRaises(TypeError):
|
|
self.cx.backup(None)
|
|
with self.assertRaises(TypeError):
|
|
self.cx.backup()
|
|
|
|
def test_bad_target_filename(self):
|
|
with self.assertRaises(TypeError):
|
|
self.cx.backup('some_file_name.db')
|
|
|
|
def test_bad_target_same_connection(self):
|
|
with self.assertRaises(ValueError):
|
|
self.cx.backup(self.cx)
|
|
|
|
def test_bad_target_closed_connection(self):
|
|
bck = sqlite.connect(':memory:')
|
|
bck.close()
|
|
with self.assertRaises(sqlite.ProgrammingError):
|
|
self.cx.backup(bck)
|
|
|
|
def test_bad_source_closed_connection(self):
|
|
bck = sqlite.connect(':memory:')
|
|
source = sqlite.connect(":memory:")
|
|
source.close()
|
|
with self.assertRaises(sqlite.ProgrammingError):
|
|
source.backup(bck)
|
|
|
|
def test_bad_target_in_transaction(self):
|
|
bck = sqlite.connect(':memory:')
|
|
bck.execute('CREATE TABLE bar (key INTEGER)')
|
|
bck.executemany('INSERT INTO bar (key) VALUES (?)', [(3,), (4,)])
|
|
with self.assertRaises(sqlite.OperationalError) as cm:
|
|
self.cx.backup(bck)
|
|
if sqlite.sqlite_version_info < (3, 8, 8):
|
|
self.assertEqual(str(cm.exception), 'target is in transaction')
|
|
|
|
def test_keyword_only_args(self):
|
|
with self.assertRaises(TypeError):
|
|
with sqlite.connect(':memory:') as bck:
|
|
self.cx.backup(bck, 1)
|
|
|
|
def test_simple(self):
|
|
with sqlite.connect(':memory:') as bck:
|
|
self.cx.backup(bck)
|
|
self.verify_backup(bck)
|
|
|
|
def test_progress(self):
|
|
journal = []
|
|
|
|
def progress(status, remaining, total):
|
|
journal.append(status)
|
|
|
|
with sqlite.connect(':memory:') as bck:
|
|
self.cx.backup(bck, pages=1, progress=progress)
|
|
self.verify_backup(bck)
|
|
|
|
self.assertEqual(len(journal), 2)
|
|
self.assertEqual(journal[0], sqlite.SQLITE_OK)
|
|
self.assertEqual(journal[1], sqlite.SQLITE_DONE)
|
|
|
|
def test_progress_all_pages_at_once_1(self):
|
|
journal = []
|
|
|
|
def progress(status, remaining, total):
|
|
journal.append(remaining)
|
|
|
|
with sqlite.connect(':memory:') as bck:
|
|
self.cx.backup(bck, progress=progress)
|
|
self.verify_backup(bck)
|
|
|
|
self.assertEqual(len(journal), 1)
|
|
self.assertEqual(journal[0], 0)
|
|
|
|
def test_progress_all_pages_at_once_2(self):
|
|
journal = []
|
|
|
|
def progress(status, remaining, total):
|
|
journal.append(remaining)
|
|
|
|
with sqlite.connect(':memory:') as bck:
|
|
self.cx.backup(bck, pages=-1, progress=progress)
|
|
self.verify_backup(bck)
|
|
|
|
self.assertEqual(len(journal), 1)
|
|
self.assertEqual(journal[0], 0)
|
|
|
|
# TODO: RUSTPYTHON
|
|
@unittest.expectedFailure
|
|
def test_non_callable_progress(self):
|
|
with self.assertRaises(TypeError) as cm:
|
|
with sqlite.connect(':memory:') as bck:
|
|
self.cx.backup(bck, pages=1, progress='bar')
|
|
self.assertEqual(str(cm.exception), 'progress argument must be a callable')
|
|
|
|
def test_modifying_progress(self):
|
|
journal = []
|
|
|
|
def progress(status, remaining, total):
|
|
if not journal:
|
|
self.cx.execute('INSERT INTO foo (key) VALUES (?)', (remaining+1000,))
|
|
self.cx.commit()
|
|
journal.append(remaining)
|
|
|
|
with sqlite.connect(':memory:') as bck:
|
|
self.cx.backup(bck, pages=1, progress=progress)
|
|
self.verify_backup(bck)
|
|
|
|
result = bck.execute("SELECT key FROM foo"
|
|
" WHERE key >= 1000"
|
|
" ORDER BY key").fetchall()
|
|
self.assertEqual(result[0][0], 1001)
|
|
|
|
self.assertEqual(len(journal), 3)
|
|
self.assertEqual(journal[0], 1)
|
|
self.assertEqual(journal[1], 1)
|
|
self.assertEqual(journal[2], 0)
|
|
|
|
def test_failing_progress(self):
|
|
def progress(status, remaining, total):
|
|
raise SystemError('nearly out of space')
|
|
|
|
with self.assertRaises(SystemError) as err:
|
|
with sqlite.connect(':memory:') as bck:
|
|
self.cx.backup(bck, progress=progress)
|
|
self.assertEqual(str(err.exception), 'nearly out of space')
|
|
|
|
def test_database_source_name(self):
|
|
with sqlite.connect(':memory:') as bck:
|
|
self.cx.backup(bck, name='main')
|
|
with sqlite.connect(':memory:') as bck:
|
|
self.cx.backup(bck, name='temp')
|
|
with self.assertRaises(sqlite.OperationalError) as cm:
|
|
with sqlite.connect(':memory:') as bck:
|
|
self.cx.backup(bck, name='non-existing')
|
|
self.assertIn("unknown database", str(cm.exception))
|
|
|
|
self.cx.execute("ATTACH DATABASE ':memory:' AS attached_db")
|
|
self.cx.execute('CREATE TABLE attached_db.foo (key INTEGER)')
|
|
self.cx.executemany('INSERT INTO attached_db.foo (key) VALUES (?)', [(3,), (4,)])
|
|
self.cx.commit()
|
|
with sqlite.connect(':memory:') as bck:
|
|
self.cx.backup(bck, name='attached_db')
|
|
self.verify_backup(bck)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|