| """Test script for the dbm.open function based on testdumbdbm.py""" |
| |
| import unittest |
| import dbm |
| import os |
| from test.support import import_helper |
| from test.support import os_helper |
| |
| try: |
| from dbm import ndbm |
| except ImportError: |
| ndbm = None |
| |
| dirname = os_helper.TESTFN |
| _fname = os.path.join(dirname, os_helper.TESTFN) |
| |
| # |
| # Iterates over every database module supported by dbm currently available. |
| # |
| def dbm_iterator(): |
| for name in dbm._names: |
| try: |
| mod = __import__(name, fromlist=['open']) |
| except ImportError: |
| continue |
| dbm._modules[name] = mod |
| yield mod |
| |
| # |
| # Clean up all scratch databases we might have created during testing |
| # |
| def cleaunup_test_dir(): |
| os_helper.rmtree(dirname) |
| |
| def setup_test_dir(): |
| cleaunup_test_dir() |
| os.mkdir(dirname) |
| |
| |
| class AnyDBMTestCase: |
| _dict = {'a': b'Python:', |
| 'b': b'Programming', |
| 'c': b'the', |
| 'd': b'way', |
| 'f': b'Guido', |
| 'g': b'intended', |
| } |
| |
| def init_db(self): |
| f = dbm.open(_fname, 'n') |
| for k in self._dict: |
| f[k.encode("ascii")] = self._dict[k] |
| f.close() |
| |
| def keys_helper(self, f): |
| keys = sorted(k.decode("ascii") for k in f.keys()) |
| dkeys = sorted(self._dict.keys()) |
| self.assertEqual(keys, dkeys) |
| return keys |
| |
| def test_error(self): |
| self.assertTrue(issubclass(self.module.error, OSError)) |
| |
| def test_anydbm_not_existing(self): |
| self.assertRaises(dbm.error, dbm.open, _fname) |
| |
| def test_anydbm_creation(self): |
| f = dbm.open(_fname, 'c') |
| self.assertEqual(list(f.keys()), []) |
| for key in self._dict: |
| f[key.encode("ascii")] = self._dict[key] |
| self.read_helper(f) |
| f.close() |
| |
| def test_anydbm_creation_n_file_exists_with_invalid_contents(self): |
| # create an empty file |
| os_helper.create_empty_file(_fname) |
| with dbm.open(_fname, 'n') as f: |
| self.assertEqual(len(f), 0) |
| |
| def test_anydbm_modification(self): |
| self.init_db() |
| f = dbm.open(_fname, 'c') |
| self._dict['g'] = f[b'g'] = b"indented" |
| self.read_helper(f) |
| # setdefault() works as in the dict interface |
| self.assertEqual(f.setdefault(b'xxx', b'foo'), b'foo') |
| self.assertEqual(f[b'xxx'], b'foo') |
| f.close() |
| |
| def test_anydbm_read(self): |
| self.init_db() |
| f = dbm.open(_fname, 'r') |
| self.read_helper(f) |
| # get() works as in the dict interface |
| self.assertEqual(f.get(b'a'), self._dict['a']) |
| self.assertEqual(f.get(b'xxx', b'foo'), b'foo') |
| self.assertIsNone(f.get(b'xxx')) |
| with self.assertRaises(KeyError): |
| f[b'xxx'] |
| f.close() |
| |
| def test_anydbm_keys(self): |
| self.init_db() |
| f = dbm.open(_fname, 'r') |
| keys = self.keys_helper(f) |
| f.close() |
| |
| def test_empty_value(self): |
| if getattr(dbm._defaultmod, 'library', None) == 'Berkeley DB': |
| self.skipTest("Berkeley DB doesn't distinguish the empty value " |
| "from the absent one") |
| f = dbm.open(_fname, 'c') |
| self.assertEqual(f.keys(), []) |
| f[b'empty'] = b'' |
| self.assertEqual(f.keys(), [b'empty']) |
| self.assertIn(b'empty', f) |
| self.assertEqual(f[b'empty'], b'') |
| self.assertEqual(f.get(b'empty'), b'') |
| self.assertEqual(f.setdefault(b'empty'), b'') |
| f.close() |
| |
| def test_anydbm_access(self): |
| self.init_db() |
| f = dbm.open(_fname, 'r') |
| key = "a".encode("ascii") |
| self.assertIn(key, f) |
| assert(f[key] == b"Python:") |
| f.close() |
| |
| def read_helper(self, f): |
| keys = self.keys_helper(f) |
| for key in self._dict: |
| self.assertEqual(self._dict[key], f[key.encode("ascii")]) |
| |
| def test_keys(self): |
| with dbm.open(_fname, 'c') as d: |
| self.assertEqual(d.keys(), []) |
| a = [(b'a', b'b'), (b'12345678910', b'019237410982340912840198242')] |
| for k, v in a: |
| d[k] = v |
| self.assertEqual(sorted(d.keys()), sorted(k for (k, v) in a)) |
| for k, v in a: |
| self.assertIn(k, d) |
| self.assertEqual(d[k], v) |
| self.assertNotIn(b'xxx', d) |
| self.assertRaises(KeyError, lambda: d[b'xxx']) |
| |
| def setUp(self): |
| self.addCleanup(setattr, dbm, '_defaultmod', dbm._defaultmod) |
| dbm._defaultmod = self.module |
| self.addCleanup(cleaunup_test_dir) |
| setup_test_dir() |
| |
| |
| class WhichDBTestCase(unittest.TestCase): |
| def test_whichdb(self): |
| self.addCleanup(setattr, dbm, '_defaultmod', dbm._defaultmod) |
| for module in dbm_iterator(): |
| # Check whether whichdb correctly guesses module name |
| # for databases opened with "module" module. |
| name = module.__name__ |
| setup_test_dir() |
| dbm._defaultmod = module |
| # Try with empty files first |
| with module.open(_fname, 'c'): pass |
| self.assertEqual(name, self.dbm.whichdb(_fname)) |
| # Now add a key |
| with module.open(_fname, 'w') as f: |
| f[b"1"] = b"1" |
| # and test that we can find it |
| self.assertIn(b"1", f) |
| # and read it |
| self.assertEqual(f[b"1"], b"1") |
| self.assertEqual(name, self.dbm.whichdb(_fname)) |
| |
| @unittest.skipUnless(ndbm, reason='Test requires ndbm') |
| def test_whichdb_ndbm(self): |
| # Issue 17198: check that ndbm which is referenced in whichdb is defined |
| with open(_fname + '.db', 'wb'): pass |
| self.assertIsNone(self.dbm.whichdb(_fname)) |
| |
| def setUp(self): |
| self.addCleanup(cleaunup_test_dir) |
| setup_test_dir() |
| self.dbm = import_helper.import_fresh_module('dbm') |
| |
| |
| for mod in dbm_iterator(): |
| assert mod.__name__.startswith('dbm.') |
| suffix = mod.__name__[4:] |
| testname = f'TestCase_{suffix}' |
| globals()[testname] = type(testname, |
| (AnyDBMTestCase, unittest.TestCase), |
| {'module': mod}) |
| |
| |
| if __name__ == "__main__": |
| unittest.main() |