forked from Rust-related/RustPython
* add supporting for PyAtomic<PyObject> * create sqlite module * add dependency sqlite3-sys * add module constants * import sqlite3 from cpython * adjust lib * add module structure * impl Connection.cursor * add module exceptions * impl lstrip_sql * impl statement new * wip cursor.execute * wip cursor * wip error to exception * add SqliteRaw and SqliteStatementRaw * impl statement parameters binding * wip cursor.execute * add test_sqlite * impl closeable connection * impl closeable cursor * impl cursor.executemany * impl cursor.executescript * impl cursor.fetch* * impl connection.backup * stage 1 * add support connection.backup with progress * fix backup deadlock * support changable isolation_level * impl converter * impl adapter * impl text_factory and blob * impl create_function * impl create_function 2 * fix empty statement * impl blob support * impl create_aggregate * impl create_aggregate 2 * refactor create_* * impl enable_callback_traceback * impl create_collation * refactor create_* with CallbackData * fix text and blob use SQLITE_TRANSIENT * fix str to SQLITE_TEXT * impl thread check * impl Connection Factory * impl busy timeout * shift sqlite3-sys -> libsqlite3-sys * refactor CallbackData * impl create_window_function * refactor callback functions * add module attr converters * fix nullable isolation_level * add module attr adapters * fix nullable adapt proto * impl set_authorizer * impl trace_callback * impl set_progress_handler * impl cancellable sqlite function* * impl attributes for Connection * fix some failed tests * impl Row * impl Blob methods * impl Blob subscript & ass_subscript * pass tests * rebase * no sqlite for wasm * use ThreadId instead u64 * no libsqlite3-sys for wasm * fix into_cstring for all platform * fixup * rebase * fix windows into_bytes * disable sqlite for android * fixup
125 lines
4.5 KiB
Python
Vendored
125 lines
4.5 KiB
Python
Vendored
# Author: Paul Kippes <kippesp@gmail.com>
|
|
|
|
import unittest
|
|
import sqlite3 as sqlite
|
|
from .test_dbapi import memory_database
|
|
|
|
|
|
# TODO: RUSTPYTHON
|
|
@unittest.expectedFailure
|
|
class DumpTests(unittest.TestCase):
|
|
def setUp(self):
|
|
self.cx = sqlite.connect(":memory:")
|
|
self.cu = self.cx.cursor()
|
|
|
|
def tearDown(self):
|
|
self.cx.close()
|
|
|
|
def test_table_dump(self):
|
|
expected_sqls = [
|
|
"""CREATE TABLE "index"("index" blob);"""
|
|
,
|
|
"""INSERT INTO "index" VALUES(X'01');"""
|
|
,
|
|
"""CREATE TABLE "quoted""table"("quoted""field" text);"""
|
|
,
|
|
"""INSERT INTO "quoted""table" VALUES('quoted''value');"""
|
|
,
|
|
"CREATE TABLE t1(id integer primary key, s1 text, " \
|
|
"t1_i1 integer not null, i2 integer, unique (s1), " \
|
|
"constraint t1_idx1 unique (i2));"
|
|
,
|
|
"INSERT INTO \"t1\" VALUES(1,'foo',10,20);"
|
|
,
|
|
"INSERT INTO \"t1\" VALUES(2,'foo2',30,30);"
|
|
,
|
|
"CREATE TABLE t2(id integer, t2_i1 integer, " \
|
|
"t2_i2 integer, primary key (id)," \
|
|
"foreign key(t2_i1) references t1(t1_i1));"
|
|
,
|
|
"CREATE TRIGGER trigger_1 update of t1_i1 on t1 " \
|
|
"begin " \
|
|
"update t2 set t2_i1 = new.t1_i1 where t2_i1 = old.t1_i1; " \
|
|
"end;"
|
|
,
|
|
"CREATE VIEW v1 as select * from t1 left join t2 " \
|
|
"using (id);"
|
|
]
|
|
[self.cu.execute(s) for s in expected_sqls]
|
|
i = self.cx.iterdump()
|
|
actual_sqls = [s for s in i]
|
|
expected_sqls = ['BEGIN TRANSACTION;'] + expected_sqls + \
|
|
['COMMIT;']
|
|
[self.assertEqual(expected_sqls[i], actual_sqls[i])
|
|
for i in range(len(expected_sqls))]
|
|
|
|
def test_dump_autoincrement(self):
|
|
expected = [
|
|
'CREATE TABLE "t1" (id integer primary key autoincrement);',
|
|
'INSERT INTO "t1" VALUES(NULL);',
|
|
'CREATE TABLE "t2" (id integer primary key autoincrement);',
|
|
]
|
|
self.cu.executescript("".join(expected))
|
|
|
|
# the NULL value should now be automatically be set to 1
|
|
expected[1] = expected[1].replace("NULL", "1")
|
|
expected.insert(0, "BEGIN TRANSACTION;")
|
|
expected.extend([
|
|
'DELETE FROM "sqlite_sequence";',
|
|
'INSERT INTO "sqlite_sequence" VALUES(\'t1\',1);',
|
|
'COMMIT;',
|
|
])
|
|
|
|
actual = [stmt for stmt in self.cx.iterdump()]
|
|
self.assertEqual(expected, actual)
|
|
|
|
def test_dump_autoincrement_create_new_db(self):
|
|
self.cu.execute("BEGIN TRANSACTION")
|
|
self.cu.execute("CREATE TABLE t1 (id integer primary key autoincrement)")
|
|
self.cu.execute("CREATE TABLE t2 (id integer primary key autoincrement)")
|
|
self.cu.executemany("INSERT INTO t1 VALUES(?)", ((None,) for _ in range(9)))
|
|
self.cu.executemany("INSERT INTO t2 VALUES(?)", ((None,) for _ in range(4)))
|
|
self.cx.commit()
|
|
|
|
with memory_database() as cx2:
|
|
query = "".join(self.cx.iterdump())
|
|
cx2.executescript(query)
|
|
cu2 = cx2.cursor()
|
|
|
|
dataset = (
|
|
("t1", 9),
|
|
("t2", 4),
|
|
)
|
|
for table, seq in dataset:
|
|
with self.subTest(table=table, seq=seq):
|
|
res = cu2.execute("""
|
|
SELECT "seq" FROM "sqlite_sequence" WHERE "name" == ?
|
|
""", (table,))
|
|
rows = res.fetchall()
|
|
self.assertEqual(rows[0][0], seq)
|
|
|
|
def test_unorderable_row(self):
|
|
# iterdump() should be able to cope with unorderable row types (issue #15545)
|
|
class UnorderableRow:
|
|
def __init__(self, cursor, row):
|
|
self.row = row
|
|
def __getitem__(self, index):
|
|
return self.row[index]
|
|
self.cx.row_factory = UnorderableRow
|
|
CREATE_ALPHA = """CREATE TABLE "alpha" ("one");"""
|
|
CREATE_BETA = """CREATE TABLE "beta" ("two");"""
|
|
expected = [
|
|
"BEGIN TRANSACTION;",
|
|
CREATE_ALPHA,
|
|
CREATE_BETA,
|
|
"COMMIT;"
|
|
]
|
|
self.cu.execute(CREATE_BETA)
|
|
self.cu.execute(CREATE_ALPHA)
|
|
got = list(self.cx.iterdump())
|
|
self.assertEqual(expected, got)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|