|  | # pysqlite2/test/regression.py: pysqlite regression tests | 
|  | # | 
|  | # Copyright (C) 2006-2010 Gerhard Häring <gh@ghaering.de> | 
|  | # | 
|  | # This file is part of pysqlite. | 
|  | # | 
|  | # This software is provided 'as-is', without any express or implied | 
|  | # warranty.  In no event will the authors be held liable for any damages | 
|  | # arising from the use of this software. | 
|  | # | 
|  | # Permission is granted to anyone to use this software for any purpose, | 
|  | # including commercial applications, and to alter it and redistribute it | 
|  | # freely, subject to the following restrictions: | 
|  | # | 
|  | # 1. The origin of this software must not be misrepresented; you must not | 
|  | #    claim that you wrote the original software. If you use this software | 
|  | #    in a product, an acknowledgment in the product documentation would be | 
|  | #    appreciated but is not required. | 
|  | # 2. Altered source versions must be plainly marked as such, and must not be | 
|  | #    misrepresented as being the original software. | 
|  | # 3. This notice may not be removed or altered from any source distribution. | 
|  |  | 
|  | import datetime | 
|  | import unittest | 
|  | import sqlite3 as sqlite | 
|  | import weakref | 
|  | import functools | 
|  |  | 
|  | from test import support | 
|  | from unittest.mock import patch | 
|  | from test.test_sqlite3.test_dbapi import memory_database, cx_limit | 
|  |  | 
|  |  | 
|  | class RegressionTests(unittest.TestCase): | 
|  | def setUp(self): | 
|  | self.con = sqlite.connect(":memory:") | 
|  |  | 
|  | def tearDown(self): | 
|  | self.con.close() | 
|  |  | 
|  | def test_pragma_user_version(self): | 
|  | # This used to crash pysqlite because this pragma command returns NULL for the column name | 
|  | cur = self.con.cursor() | 
|  | cur.execute("pragma user_version") | 
|  |  | 
|  | def test_pragma_schema_version(self): | 
|  | # This still crashed pysqlite <= 2.2.1 | 
|  | con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_COLNAMES) | 
|  | try: | 
|  | cur = self.con.cursor() | 
|  | cur.execute("pragma schema_version") | 
|  | finally: | 
|  | cur.close() | 
|  | con.close() | 
|  |  | 
|  | def test_statement_reset(self): | 
|  | # pysqlite 2.1.0 to 2.2.0 have the problem that not all statements are | 
|  | # reset before a rollback, but only those that are still in the | 
|  | # statement cache. The others are not accessible from the connection object. | 
|  | con = sqlite.connect(":memory:", cached_statements=5) | 
|  | cursors = [con.cursor() for x in range(5)] | 
|  | cursors[0].execute("create table test(x)") | 
|  | for i in range(10): | 
|  | cursors[0].executemany("insert into test(x) values (?)", [(x,) for x in range(10)]) | 
|  |  | 
|  | for i in range(5): | 
|  | cursors[i].execute(" " * i + "select x from test") | 
|  |  | 
|  | con.rollback() | 
|  |  | 
|  | def test_column_name_with_spaces(self): | 
|  | cur = self.con.cursor() | 
|  | cur.execute('select 1 as "foo bar [datetime]"') | 
|  | self.assertEqual(cur.description[0][0], "foo bar [datetime]") | 
|  |  | 
|  | cur.execute('select 1 as "foo baz"') | 
|  | self.assertEqual(cur.description[0][0], "foo baz") | 
|  |  | 
|  | def test_statement_finalization_on_close_db(self): | 
|  | # pysqlite versions <= 2.3.3 only finalized statements in the statement | 
|  | # cache when closing the database. statements that were still | 
|  | # referenced in cursors weren't closed and could provoke " | 
|  | # "OperationalError: Unable to close due to unfinalised statements". | 
|  | con = sqlite.connect(":memory:") | 
|  | cursors = [] | 
|  | # default statement cache size is 100 | 
|  | for i in range(105): | 
|  | cur = con.cursor() | 
|  | cursors.append(cur) | 
|  | cur.execute("select 1 x union select " + str(i)) | 
|  | con.close() | 
|  |  | 
|  | def test_on_conflict_rollback(self): | 
|  | con = sqlite.connect(":memory:") | 
|  | con.execute("create table foo(x, unique(x) on conflict rollback)") | 
|  | con.execute("insert into foo(x) values (1)") | 
|  | try: | 
|  | con.execute("insert into foo(x) values (1)") | 
|  | except sqlite.DatabaseError: | 
|  | pass | 
|  | con.execute("insert into foo(x) values (2)") | 
|  | try: | 
|  | con.commit() | 
|  | except sqlite.OperationalError: | 
|  | self.fail("pysqlite knew nothing about the implicit ROLLBACK") | 
|  |  | 
|  | def test_workaround_for_buggy_sqlite_transfer_bindings(self): | 
|  | """ | 
|  | pysqlite would crash with older SQLite versions unless | 
|  | a workaround is implemented. | 
|  | """ | 
|  | self.con.execute("create table foo(bar)") | 
|  | self.con.execute("drop table foo") | 
|  | self.con.execute("create table foo(bar)") | 
|  |  | 
|  | def test_empty_statement(self): | 
|  | """ | 
|  | pysqlite used to segfault with SQLite versions 3.5.x. These return NULL | 
|  | for "no-operation" statements | 
|  | """ | 
|  | self.con.execute("") | 
|  |  | 
|  | def test_type_map_usage(self): | 
|  | """ | 
|  | pysqlite until 2.4.1 did not rebuild the row_cast_map when recompiling | 
|  | a statement. This test exhibits the problem. | 
|  | """ | 
|  | SELECT = "select * from foo" | 
|  | con = sqlite.connect(":memory:",detect_types=sqlite.PARSE_DECLTYPES) | 
|  | cur = con.cursor() | 
|  | cur.execute("create table foo(bar timestamp)") | 
|  | cur.execute("insert into foo(bar) values (?)", (datetime.datetime.now(),)) | 
|  | cur.execute(SELECT) | 
|  | cur.execute("drop table foo") | 
|  | cur.execute("create table foo(bar integer)") | 
|  | cur.execute("insert into foo(bar) values (5)") | 
|  | cur.execute(SELECT) | 
|  |  | 
|  | def test_bind_mutating_list(self): | 
|  | # Issue41662: Crash when mutate a list of parameters during iteration. | 
|  | class X: | 
|  | def __conform__(self, protocol): | 
|  | parameters.clear() | 
|  | return "..." | 
|  | parameters = [X(), 0] | 
|  | con = sqlite.connect(":memory:",detect_types=sqlite.PARSE_DECLTYPES) | 
|  | con.execute("create table foo(bar X, baz integer)") | 
|  | # Should not crash | 
|  | with self.assertRaises(IndexError): | 
|  | con.execute("insert into foo(bar, baz) values (?, ?)", parameters) | 
|  |  | 
|  | def test_error_msg_decode_error(self): | 
|  | # When porting the module to Python 3.0, the error message about | 
|  | # decoding errors disappeared. This verifies they're back again. | 
|  | with self.assertRaises(sqlite.OperationalError) as cm: | 
|  | self.con.execute("select 'xxx' || ? || 'yyy' colname", | 
|  | (bytes(bytearray([250])),)).fetchone() | 
|  | msg = "Could not decode to UTF-8 column 'colname' with text 'xxx" | 
|  | self.assertIn(msg, str(cm.exception)) | 
|  |  | 
|  | def test_register_adapter(self): | 
|  | """ | 
|  | See issue 3312. | 
|  | """ | 
|  | self.assertRaises(TypeError, sqlite.register_adapter, {}, None) | 
|  |  | 
|  | def test_set_isolation_level(self): | 
|  | # See issue 27881. | 
|  | class CustomStr(str): | 
|  | def upper(self): | 
|  | return None | 
|  | def __del__(self): | 
|  | con.isolation_level = "" | 
|  |  | 
|  | con = sqlite.connect(":memory:") | 
|  | con.isolation_level = None | 
|  | for level in "", "DEFERRED", "IMMEDIATE", "EXCLUSIVE": | 
|  | with self.subTest(level=level): | 
|  | con.isolation_level = level | 
|  | con.isolation_level = level.lower() | 
|  | con.isolation_level = level.capitalize() | 
|  | con.isolation_level = CustomStr(level) | 
|  |  | 
|  | # setting isolation_level failure should not alter previous state | 
|  | con.isolation_level = None | 
|  | con.isolation_level = "DEFERRED" | 
|  | pairs = [ | 
|  | (1, TypeError), (b'', TypeError), ("abc", ValueError), | 
|  | ("IMMEDIATE\0EXCLUSIVE", ValueError), ("\xe9", ValueError), | 
|  | ] | 
|  | for value, exc in pairs: | 
|  | with self.subTest(level=value): | 
|  | with self.assertRaises(exc): | 
|  | con.isolation_level = value | 
|  | self.assertEqual(con.isolation_level, "DEFERRED") | 
|  |  | 
|  | def test_cursor_constructor_call_check(self): | 
|  | """ | 
|  | Verifies that cursor methods check whether base class __init__ was | 
|  | called. | 
|  | """ | 
|  | class Cursor(sqlite.Cursor): | 
|  | def __init__(self, con): | 
|  | pass | 
|  |  | 
|  | con = sqlite.connect(":memory:") | 
|  | cur = Cursor(con) | 
|  | with self.assertRaises(sqlite.ProgrammingError): | 
|  | cur.execute("select 4+5").fetchall() | 
|  | with self.assertRaisesRegex(sqlite.ProgrammingError, | 
|  | r'^Base Cursor\.__init__ not called\.$'): | 
|  | cur.close() | 
|  |  | 
|  | def test_str_subclass(self): | 
|  | """ | 
|  | The Python 3.0 port of the module didn't cope with values of subclasses of str. | 
|  | """ | 
|  | class MyStr(str): pass | 
|  | self.con.execute("select ?", (MyStr("abc"),)) | 
|  |  | 
|  | def test_connection_constructor_call_check(self): | 
|  | """ | 
|  | Verifies that connection methods check whether base class __init__ was | 
|  | called. | 
|  | """ | 
|  | class Connection(sqlite.Connection): | 
|  | def __init__(self, name): | 
|  | pass | 
|  |  | 
|  | con = Connection(":memory:") | 
|  | with self.assertRaises(sqlite.ProgrammingError): | 
|  | cur = con.cursor() | 
|  |  | 
|  | def test_auto_commit(self): | 
|  | """ | 
|  | Verifies that creating a connection in autocommit mode works. | 
|  | 2.5.3 introduced a regression so that these could no longer | 
|  | be created. | 
|  | """ | 
|  | con = sqlite.connect(":memory:", isolation_level=None) | 
|  |  | 
|  | def test_pragma_autocommit(self): | 
|  | """ | 
|  | Verifies that running a PRAGMA statement that does an autocommit does | 
|  | work. This did not work in 2.5.3/2.5.4. | 
|  | """ | 
|  | cur = self.con.cursor() | 
|  | cur.execute("create table foo(bar)") | 
|  | cur.execute("insert into foo(bar) values (5)") | 
|  |  | 
|  | cur.execute("pragma page_size") | 
|  | row = cur.fetchone() | 
|  |  | 
|  | def test_connection_call(self): | 
|  | """ | 
|  | Call a connection with a non-string SQL request: check error handling | 
|  | of the statement constructor. | 
|  | """ | 
|  | self.assertRaises(TypeError, self.con, b"select 1") | 
|  |  | 
|  | def test_collation(self): | 
|  | def collation_cb(a, b): | 
|  | return 1 | 
|  | self.assertRaises(UnicodeEncodeError, self.con.create_collation, | 
|  | # Lone surrogate cannot be encoded to the default encoding (utf8) | 
|  | "\uDC80", collation_cb) | 
|  |  | 
|  | def test_recursive_cursor_use(self): | 
|  | """ | 
|  | http://bugs.python.org/issue10811 | 
|  |  | 
|  | Recursively using a cursor, such as when reusing it from a generator led to segfaults. | 
|  | Now we catch recursive cursor usage and raise a ProgrammingError. | 
|  | """ | 
|  | con = sqlite.connect(":memory:") | 
|  |  | 
|  | cur = con.cursor() | 
|  | cur.execute("create table a (bar)") | 
|  | cur.execute("create table b (baz)") | 
|  |  | 
|  | def foo(): | 
|  | cur.execute("insert into a (bar) values (?)", (1,)) | 
|  | yield 1 | 
|  |  | 
|  | with self.assertRaises(sqlite.ProgrammingError): | 
|  | cur.executemany("insert into b (baz) values (?)", | 
|  | ((i,) for i in foo())) | 
|  |  | 
|  | def test_convert_timestamp_microsecond_padding(self): | 
|  | """ | 
|  | http://bugs.python.org/issue14720 | 
|  |  | 
|  | The microsecond parsing of convert_timestamp() should pad with zeros, | 
|  | since the microsecond string "456" actually represents "456000". | 
|  | """ | 
|  |  | 
|  | con = sqlite.connect(":memory:", detect_types=sqlite.PARSE_DECLTYPES) | 
|  | cur = con.cursor() | 
|  | cur.execute("CREATE TABLE t (x TIMESTAMP)") | 
|  |  | 
|  | # Microseconds should be 456000 | 
|  | cur.execute("INSERT INTO t (x) VALUES ('2012-04-04 15:06:00.456')") | 
|  |  | 
|  | # Microseconds should be truncated to 123456 | 
|  | cur.execute("INSERT INTO t (x) VALUES ('2012-04-04 15:06:00.123456789')") | 
|  |  | 
|  | cur.execute("SELECT * FROM t") | 
|  | values = [x[0] for x in cur.fetchall()] | 
|  |  | 
|  | self.assertEqual(values, [ | 
|  | datetime.datetime(2012, 4, 4, 15, 6, 0, 456000), | 
|  | datetime.datetime(2012, 4, 4, 15, 6, 0, 123456), | 
|  | ]) | 
|  |  | 
|  | def test_invalid_isolation_level_type(self): | 
|  | # isolation level is a string, not an integer | 
|  | self.assertRaises(TypeError, | 
|  | sqlite.connect, ":memory:", isolation_level=123) | 
|  |  | 
|  |  | 
|  | def test_null_character(self): | 
|  | # Issue #21147 | 
|  | cur = self.con.cursor() | 
|  | queries = ["\0select 1", "select 1\0"] | 
|  | for query in queries: | 
|  | with self.subTest(query=query): | 
|  | self.assertRaisesRegex(sqlite.ProgrammingError, "null char", | 
|  | self.con.execute, query) | 
|  | with self.subTest(query=query): | 
|  | self.assertRaisesRegex(sqlite.ProgrammingError, "null char", | 
|  | cur.execute, query) | 
|  |  | 
|  | def test_surrogates(self): | 
|  | con = sqlite.connect(":memory:") | 
|  | self.assertRaises(UnicodeEncodeError, con, "select '\ud8ff'") | 
|  | self.assertRaises(UnicodeEncodeError, con, "select '\udcff'") | 
|  | cur = con.cursor() | 
|  | self.assertRaises(UnicodeEncodeError, cur.execute, "select '\ud8ff'") | 
|  | self.assertRaises(UnicodeEncodeError, cur.execute, "select '\udcff'") | 
|  |  | 
|  | def test_large_sql(self): | 
|  | msg = "query string is too large" | 
|  | with memory_database() as cx, cx_limit(cx) as lim: | 
|  | cu = cx.cursor() | 
|  |  | 
|  | cx("select 1".ljust(lim)) | 
|  | # use a different SQL statement; don't reuse from the LRU cache | 
|  | cu.execute("select 2".ljust(lim)) | 
|  |  | 
|  | sql = "select 3".ljust(lim+1) | 
|  | self.assertRaisesRegex(sqlite.DataError, msg, cx, sql) | 
|  | self.assertRaisesRegex(sqlite.DataError, msg, cu.execute, sql) | 
|  |  | 
|  | def test_commit_cursor_reset(self): | 
|  | """ | 
|  | Connection.commit() did reset cursors, which made sqlite3 | 
|  | to return rows multiple times when fetched from cursors | 
|  | after commit. See issues 10513 and 23129 for details. | 
|  | """ | 
|  | con = sqlite.connect(":memory:") | 
|  | con.executescript(""" | 
|  | create table t(c); | 
|  | create table t2(c); | 
|  | insert into t values(0); | 
|  | insert into t values(1); | 
|  | insert into t values(2); | 
|  | """) | 
|  |  | 
|  | self.assertEqual(con.isolation_level, "") | 
|  |  | 
|  | counter = 0 | 
|  | for i, row in enumerate(con.execute("select c from t")): | 
|  | with self.subTest(i=i, row=row): | 
|  | con.execute("insert into t2(c) values (?)", (i,)) | 
|  | con.commit() | 
|  | if counter == 0: | 
|  | self.assertEqual(row[0], 0) | 
|  | elif counter == 1: | 
|  | self.assertEqual(row[0], 1) | 
|  | elif counter == 2: | 
|  | self.assertEqual(row[0], 2) | 
|  | counter += 1 | 
|  | self.assertEqual(counter, 3, "should have returned exactly three rows") | 
|  |  | 
|  | def test_bpo31770(self): | 
|  | """ | 
|  | The interpreter shouldn't crash in case Cursor.__init__() is called | 
|  | more than once. | 
|  | """ | 
|  | def callback(*args): | 
|  | pass | 
|  | con = sqlite.connect(":memory:") | 
|  | cur = sqlite.Cursor(con) | 
|  | ref = weakref.ref(cur, callback) | 
|  | cur.__init__(con) | 
|  | del cur | 
|  | # The interpreter shouldn't crash when ref is collected. | 
|  | del ref | 
|  | support.gc_collect() | 
|  |  | 
|  | def test_del_isolation_level_segfault(self): | 
|  | with self.assertRaises(AttributeError): | 
|  | del self.con.isolation_level | 
|  |  | 
|  | def test_bpo37347(self): | 
|  | class Printer: | 
|  | def log(self, *args): | 
|  | return sqlite.SQLITE_OK | 
|  |  | 
|  | for method in [self.con.set_trace_callback, | 
|  | functools.partial(self.con.set_progress_handler, n=1), | 
|  | self.con.set_authorizer]: | 
|  | printer_instance = Printer() | 
|  | method(printer_instance.log) | 
|  | method(printer_instance.log) | 
|  | self.con.execute("select 1")  # trigger seg fault | 
|  | method(None) | 
|  |  | 
|  | def test_return_empty_bytestring(self): | 
|  | cur = self.con.execute("select X''") | 
|  | val = cur.fetchone()[0] | 
|  | self.assertEqual(val, b'') | 
|  |  | 
|  | def test_table_lock_cursor_replace_stmt(self): | 
|  | with memory_database() as con: | 
|  | cur = con.cursor() | 
|  | cur.execute("create table t(t)") | 
|  | cur.executemany("insert into t values(?)", | 
|  | ((v,) for v in range(5))) | 
|  | con.commit() | 
|  | cur.execute("select t from t") | 
|  | cur.execute("drop table t") | 
|  | con.commit() | 
|  |  | 
|  | def test_table_lock_cursor_dealloc(self): | 
|  | with memory_database() as con: | 
|  | con.execute("create table t(t)") | 
|  | con.executemany("insert into t values(?)", | 
|  | ((v,) for v in range(5))) | 
|  | con.commit() | 
|  | cur = con.execute("select t from t") | 
|  | del cur | 
|  | con.execute("drop table t") | 
|  | con.commit() | 
|  |  | 
|  | def test_table_lock_cursor_non_readonly_select(self): | 
|  | with memory_database() as con: | 
|  | con.execute("create table t(t)") | 
|  | con.executemany("insert into t values(?)", | 
|  | ((v,) for v in range(5))) | 
|  | con.commit() | 
|  | def dup(v): | 
|  | con.execute("insert into t values(?)", (v,)) | 
|  | return | 
|  | con.create_function("dup", 1, dup) | 
|  | cur = con.execute("select dup(t) from t") | 
|  | del cur | 
|  | con.execute("drop table t") | 
|  | con.commit() | 
|  |  | 
|  | def test_executescript_step_through_select(self): | 
|  | with memory_database() as con: | 
|  | values = [(v,) for v in range(5)] | 
|  | with con: | 
|  | con.execute("create table t(t)") | 
|  | con.executemany("insert into t values(?)", values) | 
|  | steps = [] | 
|  | con.create_function("step", 1, lambda x: steps.append((x,))) | 
|  | con.executescript("select step(t) from t") | 
|  | self.assertEqual(steps, values) | 
|  |  | 
|  |  | 
|  | class RecursiveUseOfCursors(unittest.TestCase): | 
|  | # GH-80254: sqlite3 should not segfault for recursive use of cursors. | 
|  | msg = "Recursive use of cursors not allowed" | 
|  |  | 
|  | def setUp(self): | 
|  | self.con = sqlite.connect(":memory:", | 
|  | detect_types=sqlite.PARSE_COLNAMES) | 
|  | self.cur = self.con.cursor() | 
|  | self.cur.execute("create table test(x foo)") | 
|  | self.cur.executemany("insert into test(x) values (?)", | 
|  | [("foo",), ("bar",)]) | 
|  |  | 
|  | def tearDown(self): | 
|  | self.cur.close() | 
|  | self.con.close() | 
|  |  | 
|  | def test_recursive_cursor_init(self): | 
|  | conv = lambda x: self.cur.__init__(self.con) | 
|  | with patch.dict(sqlite.converters, {"INIT": conv}): | 
|  | self.cur.execute(f'select x as "x [INIT]", x from test') | 
|  | self.assertRaisesRegex(sqlite.ProgrammingError, self.msg, | 
|  | self.cur.fetchall) | 
|  |  | 
|  | def test_recursive_cursor_close(self): | 
|  | conv = lambda x: self.cur.close() | 
|  | with patch.dict(sqlite.converters, {"CLOSE": conv}): | 
|  | self.cur.execute(f'select x as "x [CLOSE]", x from test') | 
|  | self.assertRaisesRegex(sqlite.ProgrammingError, self.msg, | 
|  | self.cur.fetchall) | 
|  |  | 
|  | def test_recursive_cursor_iter(self): | 
|  | conv = lambda x, l=[]: self.cur.fetchone() if l else l.append(None) | 
|  | with patch.dict(sqlite.converters, {"ITER": conv}): | 
|  | self.cur.execute(f'select x as "x [ITER]", x from test') | 
|  | self.assertRaisesRegex(sqlite.ProgrammingError, self.msg, | 
|  | self.cur.fetchall) | 
|  |  | 
|  |  | 
|  | if __name__ == "__main__": | 
|  | unittest.main() |