diff --git a/Lib/dbm/__init__.py b/Lib/dbm/__init__.py new file mode 100644 index 000000000..f65da521a --- /dev/null +++ b/Lib/dbm/__init__.py @@ -0,0 +1,189 @@ +"""Generic interface to all dbm clones. + +Use + + import dbm + d = dbm.open(file, 'w', 0o666) + +The returned object is a dbm.gnu, dbm.ndbm or dbm.dumb object, dependent on the +type of database being opened (determined by the whichdb function) in the case +of an existing dbm. If the dbm does not exist and the create or new flag ('c' +or 'n') was specified, the dbm type will be determined by the availability of +the modules (tested in the above order). + +It has the following interface (key and data are strings): + + d[key] = data # store data at key (may override data at + # existing key) + data = d[key] # retrieve data at key (raise KeyError if no + # such key) + del d[key] # delete data stored at key (raises KeyError + # if no such key) + flag = key in d # true if the key exists + list = d.keys() # return a list of all existing keys (slow!) + +Future versions may change the order in which implementations are +tested for existence, and add interfaces to other dbm-like +implementations. +""" + +__all__ = ['open', 'whichdb', 'error'] + +import io +import os +import struct +import sys + + +class error(Exception): + pass + +_names = ['dbm.gnu', 'dbm.ndbm', 'dbm.dumb'] +_defaultmod = None +_modules = {} + +error = (error, OSError) + +try: + from dbm import ndbm +except ImportError: + ndbm = None + + +def open(file, flag='r', mode=0o666): + """Open or create database at path given by *file*. + + Optional argument *flag* can be 'r' (default) for read-only access, 'w' + for read-write access of an existing database, 'c' for read-write access + to a new or existing database, and 'n' for read-write access to a new + database. + + Note: 'r' and 'w' fail if the database doesn't exist; 'c' creates it + only if it doesn't exist; and 'n' always creates a new database. + """ + global _defaultmod + if _defaultmod is None: + for name in _names: + try: + mod = __import__(name, fromlist=['open']) + except ImportError: + continue + if not _defaultmod: + _defaultmod = mod + _modules[name] = mod + if not _defaultmod: + raise ImportError("no dbm clone found; tried %s" % _names) + + # guess the type of an existing database, if not creating a new one + result = whichdb(file) if 'n' not in flag else None + if result is None: + # db doesn't exist or 'n' flag was specified to create a new db + if 'c' in flag or 'n' in flag: + # file doesn't exist and the new flag was used so use default type + mod = _defaultmod + else: + raise error[0]("db file doesn't exist; " + "use 'c' or 'n' flag to create a new db") + elif result == "": + # db type cannot be determined + raise error[0]("db type could not be determined") + elif result not in _modules: + raise error[0]("db type is {0}, but the module is not " + "available".format(result)) + else: + mod = _modules[result] + return mod.open(file, flag, mode) + + +def whichdb(filename): + """Guess which db package to use to open a db file. + + Return values: + + - None if the database file can't be read; + - empty string if the file can be read but can't be recognized + - the name of the dbm submodule (e.g. "ndbm" or "gnu") if recognized. + + Importing the given module may still fail, and opening the + database using that module may still fail. + """ + + # Check for ndbm first -- this has a .pag and a .dir file + try: + f = io.open(filename + ".pag", "rb") + f.close() + f = io.open(filename + ".dir", "rb") + f.close() + return "dbm.ndbm" + except OSError: + # some dbm emulations based on Berkeley DB generate a .db file + # some do not, but they should be caught by the bsd checks + try: + f = io.open(filename + ".db", "rb") + f.close() + # guarantee we can actually open the file using dbm + # kind of overkill, but since we are dealing with emulations + # it seems like a prudent step + if ndbm is not None: + d = ndbm.open(filename) + d.close() + return "dbm.ndbm" + except OSError: + pass + + # Check for dumbdbm next -- this has a .dir and a .dat file + try: + # First check for presence of files + os.stat(filename + ".dat") + size = os.stat(filename + ".dir").st_size + # dumbdbm files with no keys are empty + if size == 0: + return "dbm.dumb" + f = io.open(filename + ".dir", "rb") + try: + if f.read(1) in (b"'", b'"'): + return "dbm.dumb" + finally: + f.close() + except OSError: + pass + + # See if the file exists, return None if not + try: + f = io.open(filename, "rb") + except OSError: + return None + + with f: + # Read the start of the file -- the magic number + s16 = f.read(16) + s = s16[0:4] + + # Return "" if not at least 4 bytes + if len(s) != 4: + return "" + + # Convert to 4-byte int in native byte order -- return "" if impossible + try: + (magic,) = struct.unpack("=l", s) + except struct.error: + return "" + + # Check for GNU dbm + if magic in (0x13579ace, 0x13579acd, 0x13579acf): + return "dbm.gnu" + + # Later versions of Berkeley db hash file have a 12-byte pad in + # front of the file type + try: + (magic,) = struct.unpack("=l", s16[-4:]) + except struct.error: + return "" + + # Unknown + return "" + + +if __name__ == "__main__": + for filename in sys.argv[1:]: + print(whichdb(filename) or "UNKNOWN", filename) diff --git a/Lib/dbm/dumb.py b/Lib/dbm/dumb.py new file mode 100644 index 000000000..864ad371e --- /dev/null +++ b/Lib/dbm/dumb.py @@ -0,0 +1,316 @@ +"""A dumb and slow but simple dbm clone. + +For database spam, spam.dir contains the index (a text file), +spam.bak *may* contain a backup of the index (also a text file), +while spam.dat contains the data (a binary file). + +XXX TO DO: + +- seems to contain a bug when updating... + +- reclaim free space (currently, space once occupied by deleted or expanded +items is never reused) + +- support concurrent access (currently, if two processes take turns making +updates, they can mess up the index) + +- support efficient access to large databases (currently, the whole index +is read when the database is opened, and some updates rewrite the whole index) + +- support opening for read-only (flag = 'm') + +""" + +import ast as _ast +import io as _io +import os as _os +import collections.abc + +__all__ = ["error", "open"] + +_BLOCKSIZE = 512 + +error = OSError + +class _Database(collections.abc.MutableMapping): + + # The on-disk directory and data files can remain in mutually + # inconsistent states for an arbitrarily long time (see comments + # at the end of __setitem__). This is only repaired when _commit() + # gets called. One place _commit() gets called is from __del__(), + # and if that occurs at program shutdown time, module globals may + # already have gotten rebound to None. Since it's crucial that + # _commit() finish successfully, we can't ignore shutdown races + # here, and _commit() must not reference any globals. + _os = _os # for _commit() + _io = _io # for _commit() + + def __init__(self, filebasename, mode, flag='c'): + self._mode = mode + self._readonly = (flag == 'r') + + # The directory file is a text file. Each line looks like + # "%r, (%d, %d)\n" % (key, pos, siz) + # where key is the string key, pos is the offset into the dat + # file of the associated value's first byte, and siz is the number + # of bytes in the associated value. + self._dirfile = filebasename + '.dir' + + # The data file is a binary file pointed into by the directory + # file, and holds the values associated with keys. Each value + # begins at a _BLOCKSIZE-aligned byte offset, and is a raw + # binary 8-bit string value. + self._datfile = filebasename + '.dat' + self._bakfile = filebasename + '.bak' + + # The index is an in-memory dict, mirroring the directory file. + self._index = None # maps keys to (pos, siz) pairs + + # Handle the creation + self._create(flag) + self._update(flag) + + def _create(self, flag): + if flag == 'n': + for filename in (self._datfile, self._bakfile, self._dirfile): + try: + _os.remove(filename) + except OSError: + pass + # Mod by Jack: create data file if needed + try: + f = _io.open(self._datfile, 'r', encoding="Latin-1") + except OSError: + if flag not in ('c', 'n'): + raise + with _io.open(self._datfile, 'w', encoding="Latin-1") as f: + self._chmod(self._datfile) + else: + f.close() + + # Read directory file into the in-memory index dict. + def _update(self, flag): + self._modified = False + self._index = {} + try: + f = _io.open(self._dirfile, 'r', encoding="Latin-1") + except OSError: + if flag not in ('c', 'n'): + raise + self._modified = True + else: + with f: + for line in f: + line = line.rstrip() + key, pos_and_siz_pair = _ast.literal_eval(line) + key = key.encode('Latin-1') + self._index[key] = pos_and_siz_pair + + # Write the index dict to the directory file. The original directory + # file (if any) is renamed with a .bak extension first. If a .bak + # file currently exists, it's deleted. + def _commit(self): + # CAUTION: It's vital that _commit() succeed, and _commit() can + # be called from __del__(). Therefore we must never reference a + # global in this routine. + if self._index is None or not self._modified: + return # nothing to do + + try: + self._os.unlink(self._bakfile) + except OSError: + pass + + try: + self._os.rename(self._dirfile, self._bakfile) + except OSError: + pass + + with self._io.open(self._dirfile, 'w', encoding="Latin-1") as f: + self._chmod(self._dirfile) + for key, pos_and_siz_pair in self._index.items(): + # Use Latin-1 since it has no qualms with any value in any + # position; UTF-8, though, does care sometimes. + entry = "%r, %r\n" % (key.decode('Latin-1'), pos_and_siz_pair) + f.write(entry) + + sync = _commit + + def _verify_open(self): + if self._index is None: + raise error('DBM object has already been closed') + + def __getitem__(self, key): + if isinstance(key, str): + key = key.encode('utf-8') + self._verify_open() + pos, siz = self._index[key] # may raise KeyError + with _io.open(self._datfile, 'rb') as f: + f.seek(pos) + dat = f.read(siz) + return dat + + # Append val to the data file, starting at a _BLOCKSIZE-aligned + # offset. The data file is first padded with NUL bytes (if needed) + # to get to an aligned offset. Return pair + # (starting offset of val, len(val)) + def _addval(self, val): + with _io.open(self._datfile, 'rb+') as f: + f.seek(0, 2) + pos = int(f.tell()) + npos = ((pos + _BLOCKSIZE - 1) // _BLOCKSIZE) * _BLOCKSIZE + f.write(b'\0'*(npos-pos)) + pos = npos + f.write(val) + return (pos, len(val)) + + # Write val to the data file, starting at offset pos. The caller + # is responsible for ensuring that there's enough room starting at + # pos to hold val, without overwriting some other value. Return + # pair (pos, len(val)). + def _setval(self, pos, val): + with _io.open(self._datfile, 'rb+') as f: + f.seek(pos) + f.write(val) + return (pos, len(val)) + + # key is a new key whose associated value starts in the data file + # at offset pos and with length siz. Add an index record to + # the in-memory index dict, and append one to the directory file. + def _addkey(self, key, pos_and_siz_pair): + self._index[key] = pos_and_siz_pair + with _io.open(self._dirfile, 'a', encoding="Latin-1") as f: + self._chmod(self._dirfile) + f.write("%r, %r\n" % (key.decode("Latin-1"), pos_and_siz_pair)) + + def __setitem__(self, key, val): + if self._readonly: + raise error('The database is opened for reading only') + if isinstance(key, str): + key = key.encode('utf-8') + elif not isinstance(key, (bytes, bytearray)): + raise TypeError("keys must be bytes or strings") + if isinstance(val, str): + val = val.encode('utf-8') + elif not isinstance(val, (bytes, bytearray)): + raise TypeError("values must be bytes or strings") + self._verify_open() + self._modified = True + if key not in self._index: + self._addkey(key, self._addval(val)) + else: + # See whether the new value is small enough to fit in the + # (padded) space currently occupied by the old value. + pos, siz = self._index[key] + oldblocks = (siz + _BLOCKSIZE - 1) // _BLOCKSIZE + newblocks = (len(val) + _BLOCKSIZE - 1) // _BLOCKSIZE + if newblocks <= oldblocks: + self._index[key] = self._setval(pos, val) + else: + # The new value doesn't fit in the (padded) space used + # by the old value. The blocks used by the old value are + # forever lost. + self._index[key] = self._addval(val) + + # Note that _index may be out of synch with the directory + # file now: _setval() and _addval() don't update the directory + # file. This also means that the on-disk directory and data + # files are in a mutually inconsistent state, and they'll + # remain that way until _commit() is called. Note that this + # is a disaster (for the database) if the program crashes + # (so that _commit() never gets called). + + def __delitem__(self, key): + if self._readonly: + raise error('The database is opened for reading only') + if isinstance(key, str): + key = key.encode('utf-8') + self._verify_open() + self._modified = True + # The blocks used by the associated value are lost. + del self._index[key] + # XXX It's unclear why we do a _commit() here (the code always + # XXX has, so I'm not changing it). __setitem__ doesn't try to + # XXX keep the directory file in synch. Why should we? Or + # XXX why shouldn't __setitem__? + self._commit() + + def keys(self): + try: + return list(self._index) + except TypeError: + raise error('DBM object has already been closed') from None + + def items(self): + self._verify_open() + return [(key, self[key]) for key in self._index.keys()] + + def __contains__(self, key): + if isinstance(key, str): + key = key.encode('utf-8') + try: + return key in self._index + except TypeError: + if self._index is None: + raise error('DBM object has already been closed') from None + else: + raise + + def iterkeys(self): + try: + return iter(self._index) + except TypeError: + raise error('DBM object has already been closed') from None + __iter__ = iterkeys + + def __len__(self): + try: + return len(self._index) + except TypeError: + raise error('DBM object has already been closed') from None + + def close(self): + try: + self._commit() + finally: + self._index = self._datfile = self._dirfile = self._bakfile = None + + __del__ = close + + def _chmod(self, file): + self._os.chmod(file, self._mode) + + def __enter__(self): + return self + + def __exit__(self, *args): + self.close() + + +def open(file, flag='c', mode=0o666): + """Open the database file, filename, and return corresponding object. + + The flag argument, used to control how the database is opened in the + other DBM implementations, supports only the semantics of 'c' and 'n' + values. Other values will default to the semantics of 'c' value: + the database will always opened for update and will be created if it + does not exist. + + The optional mode argument is the UNIX mode of the file, used only when + the database has to be created. It defaults to octal code 0o666 (and + will be modified by the prevailing umask). + + """ + + # Modify mode depending on the umask + try: + um = _os.umask(0) + _os.umask(um) + except AttributeError: + pass + else: + # Turn off any bits that are set in the umask + mode = mode & (~um) + if flag not in ('r', 'w', 'c', 'n'): + raise ValueError("Flag must be one of 'r', 'w', 'c', or 'n'") + return _Database(file, mode, flag=flag) diff --git a/Lib/test/test_dbm.py b/Lib/test/test_dbm.py new file mode 100644 index 000000000..571da973a --- /dev/null +++ b/Lib/test/test_dbm.py @@ -0,0 +1,212 @@ +"""Test script for the dbm.open function based on testdumbdbm.py""" + +import unittest +import glob +import test.support + +# Skip tests if dbm module doesn't exist. +dbm = test.support.import_module('dbm') + +try: + from dbm import ndbm +except ImportError: + ndbm = None + +_fname = test.support.TESTFN + +# +# Iterates over every database module supported by dbm currently available, +# setting dbm to use each in turn, and yielding that module +# +def dbm_iterator(): + for name in dbm._names: + try: + mod = __import__(name, fromlist=['open']) + except ImportError: + continue + dbm._modules[name] = mod + yield mod + +# +# Clean up all scratch databases we might have created during testing +# +def delete_files(): + # we don't know the precise name the underlying database uses + # so we use glob to locate all names + for f in glob.glob(glob.escape(_fname) + "*"): + test.support.unlink(f) + + +class AnyDBMTestCase: + _dict = {'a': b'Python:', + 'b': b'Programming', + 'c': b'the', + 'd': b'way', + 'f': b'Guido', + 'g': b'intended', + } + + def init_db(self): + f = dbm.open(_fname, 'n') + for k in self._dict: + f[k.encode("ascii")] = self._dict[k] + f.close() + + def keys_helper(self, f): + keys = sorted(k.decode("ascii") for k in f.keys()) + dkeys = sorted(self._dict.keys()) + self.assertEqual(keys, dkeys) + return keys + + def test_error(self): + self.assertTrue(issubclass(self.module.error, OSError)) + + def test_anydbm_not_existing(self): + self.assertRaises(dbm.error, dbm.open, _fname) + + def test_anydbm_creation(self): + f = dbm.open(_fname, 'c') + self.assertEqual(list(f.keys()), []) + for key in self._dict: + f[key.encode("ascii")] = self._dict[key] + self.read_helper(f) + f.close() + + def test_anydbm_creation_n_file_exists_with_invalid_contents(self): + # create an empty file + test.support.create_empty_file(_fname) + with dbm.open(_fname, 'n') as f: + self.assertEqual(len(f), 0) + + def test_anydbm_modification(self): + self.init_db() + f = dbm.open(_fname, 'c') + self._dict['g'] = f[b'g'] = b"indented" + self.read_helper(f) + # setdefault() works as in the dict interface + self.assertEqual(f.setdefault(b'xxx', b'foo'), b'foo') + self.assertEqual(f[b'xxx'], b'foo') + f.close() + + def test_anydbm_read(self): + self.init_db() + f = dbm.open(_fname, 'r') + self.read_helper(f) + # get() works as in the dict interface + self.assertEqual(f.get(b'a'), self._dict['a']) + self.assertEqual(f.get(b'xxx', b'foo'), b'foo') + self.assertIsNone(f.get(b'xxx')) + with self.assertRaises(KeyError): + f[b'xxx'] + f.close() + + def test_anydbm_keys(self): + self.init_db() + f = dbm.open(_fname, 'r') + keys = self.keys_helper(f) + f.close() + + def test_empty_value(self): + if getattr(dbm._defaultmod, 'library', None) == 'Berkeley DB': + self.skipTest("Berkeley DB doesn't distinguish the empty value " + "from the absent one") + f = dbm.open(_fname, 'c') + self.assertEqual(f.keys(), []) + f[b'empty'] = b'' + self.assertEqual(f.keys(), [b'empty']) + self.assertIn(b'empty', f) + self.assertEqual(f[b'empty'], b'') + self.assertEqual(f.get(b'empty'), b'') + self.assertEqual(f.setdefault(b'empty'), b'') + f.close() + + def test_anydbm_access(self): + self.init_db() + f = dbm.open(_fname, 'r') + key = "a".encode("ascii") + self.assertIn(key, f) + assert(f[key] == b"Python:") + f.close() + + def read_helper(self, f): + keys = self.keys_helper(f) + for key in self._dict: + self.assertEqual(self._dict[key], f[key.encode("ascii")]) + + def tearDown(self): + delete_files() + + def setUp(self): + dbm._defaultmod = self.module + delete_files() + + +class WhichDBTestCase(unittest.TestCase): + def test_whichdb(self): + for module in dbm_iterator(): + # Check whether whichdb correctly guesses module name + # for databases opened with "module" module. + # Try with empty files first + name = module.__name__ + if name == 'dbm.dumb': + continue # whichdb can't support dbm.dumb + delete_files() + f = module.open(_fname, 'c') + f.close() + self.assertEqual(name, self.dbm.whichdb(_fname)) + # Now add a key + f = module.open(_fname, 'w') + f[b"1"] = b"1" + # and test that we can find it + self.assertIn(b"1", f) + # and read it + self.assertEqual(f[b"1"], b"1") + f.close() + self.assertEqual(name, self.dbm.whichdb(_fname)) + + @unittest.skipUnless(ndbm, reason='Test requires ndbm') + def test_whichdb_ndbm(self): + # Issue 17198: check that ndbm which is referenced in whichdb is defined + db_file = '{}_ndbm.db'.format(_fname) + with open(db_file, 'w'): + self.addCleanup(test.support.unlink, db_file) + self.assertIsNone(self.dbm.whichdb(db_file[:-3])) + + def tearDown(self): + delete_files() + + def setUp(self): + delete_files() + self.filename = test.support.TESTFN + self.d = dbm.open(self.filename, 'c') + self.d.close() + self.dbm = test.support.import_fresh_module('dbm') + + def test_keys(self): + self.d = dbm.open(self.filename, 'c') + self.assertEqual(self.d.keys(), []) + a = [(b'a', b'b'), (b'12345678910', b'019237410982340912840198242')] + for k, v in a: + self.d[k] = v + self.assertEqual(sorted(self.d.keys()), sorted(k for (k, v) in a)) + for k, v in a: + self.assertIn(k, self.d) + self.assertEqual(self.d[k], v) + self.assertNotIn(b'xxx', self.d) + self.assertRaises(KeyError, lambda: self.d[b'xxx']) + self.d.close() + + +def load_tests(loader, tests, pattern): + classes = [] + for mod in dbm_iterator(): + classes.append(type("TestCase-" + mod.__name__, + (AnyDBMTestCase, unittest.TestCase), + {'module': mod})) + suites = [unittest.makeSuite(c) for c in classes] + + tests.addTests(suites) + return tests + +if __name__ == "__main__": + unittest.main() diff --git a/Lib/test/test_dbm_dumb.py b/Lib/test/test_dbm_dumb.py new file mode 100644 index 000000000..0a6077820 --- /dev/null +++ b/Lib/test/test_dbm_dumb.py @@ -0,0 +1,303 @@ +"""Test script for the dumbdbm module + Original by Roger E. Masse +""" + +import contextlib +import io +import operator +import os +import stat +import unittest +import dbm.dumb as dumbdbm +from test import support +from functools import partial + +_fname = support.TESTFN + +def _delete_files(): + for ext in [".dir", ".dat", ".bak"]: + try: + os.unlink(_fname + ext) + except OSError: + pass + +class DumbDBMTestCase(unittest.TestCase): + _dict = {b'0': b'', + b'a': b'Python:', + b'b': b'Programming', + b'c': b'the', + b'd': b'way', + b'f': b'Guido', + b'g': b'intended', + '\u00fc'.encode('utf-8') : b'!', + } + + def test_dumbdbm_creation(self): + with contextlib.closing(dumbdbm.open(_fname, 'c')) as f: + self.assertEqual(list(f.keys()), []) + for key in self._dict: + f[key] = self._dict[key] + self.read_helper(f) + + @unittest.skipUnless(hasattr(os, 'umask'), 'test needs os.umask()') + def test_dumbdbm_creation_mode(self): + try: + old_umask = os.umask(0o002) + f = dumbdbm.open(_fname, 'c', 0o637) + f.close() + finally: + os.umask(old_umask) + + expected_mode = 0o635 + if os.name != 'posix': + # Windows only supports setting the read-only attribute. + # This shouldn't fail, but doesn't work like Unix either. + expected_mode = 0o666 + + import stat + st = os.stat(_fname + '.dat') + self.assertEqual(stat.S_IMODE(st.st_mode), expected_mode) + st = os.stat(_fname + '.dir') + self.assertEqual(stat.S_IMODE(st.st_mode), expected_mode) + + def test_close_twice(self): + f = dumbdbm.open(_fname) + f[b'a'] = b'b' + self.assertEqual(f[b'a'], b'b') + f.close() + f.close() + + def test_dumbdbm_modification(self): + self.init_db() + with contextlib.closing(dumbdbm.open(_fname, 'w')) as f: + self._dict[b'g'] = f[b'g'] = b"indented" + self.read_helper(f) + # setdefault() works as in the dict interface + self.assertEqual(f.setdefault(b'xxx', b'foo'), b'foo') + self.assertEqual(f[b'xxx'], b'foo') + + def test_dumbdbm_read(self): + self.init_db() + with contextlib.closing(dumbdbm.open(_fname, 'r')) as f: + self.read_helper(f) + with self.assertRaisesRegex(dumbdbm.error, + 'The database is opened for reading only'): + f[b'g'] = b'x' + with self.assertRaisesRegex(dumbdbm.error, + 'The database is opened for reading only'): + del f[b'a'] + # get() works as in the dict interface + self.assertEqual(f.get(b'a'), self._dict[b'a']) + self.assertEqual(f.get(b'xxx', b'foo'), b'foo') + self.assertIsNone(f.get(b'xxx')) + with self.assertRaises(KeyError): + f[b'xxx'] + + def test_dumbdbm_keys(self): + self.init_db() + with contextlib.closing(dumbdbm.open(_fname)) as f: + keys = self.keys_helper(f) + + def test_write_contains(self): + with contextlib.closing(dumbdbm.open(_fname)) as f: + f[b'1'] = b'hello' + self.assertIn(b'1', f) + + def test_write_write_read(self): + # test for bug #482460 + with contextlib.closing(dumbdbm.open(_fname)) as f: + f[b'1'] = b'hello' + f[b'1'] = b'hello2' + with contextlib.closing(dumbdbm.open(_fname)) as f: + self.assertEqual(f[b'1'], b'hello2') + + def test_str_read(self): + self.init_db() + with contextlib.closing(dumbdbm.open(_fname, 'r')) as f: + self.assertEqual(f['\u00fc'], self._dict['\u00fc'.encode('utf-8')]) + + def test_str_write_contains(self): + self.init_db() + with contextlib.closing(dumbdbm.open(_fname)) as f: + f['\u00fc'] = b'!' + f['1'] = 'a' + with contextlib.closing(dumbdbm.open(_fname, 'r')) as f: + self.assertIn('\u00fc', f) + self.assertEqual(f['\u00fc'.encode('utf-8')], + self._dict['\u00fc'.encode('utf-8')]) + self.assertEqual(f[b'1'], b'a') + + def test_line_endings(self): + # test for bug #1172763: dumbdbm would die if the line endings + # weren't what was expected. + with contextlib.closing(dumbdbm.open(_fname)) as f: + f[b'1'] = b'hello' + f[b'2'] = b'hello2' + + # Mangle the file by changing the line separator to Windows or Unix + with io.open(_fname + '.dir', 'rb') as file: + data = file.read() + if os.linesep == '\n': + data = data.replace(b'\n', b'\r\n') + else: + data = data.replace(b'\r\n', b'\n') + with io.open(_fname + '.dir', 'wb') as file: + file.write(data) + + f = dumbdbm.open(_fname) + self.assertEqual(f[b'1'], b'hello') + self.assertEqual(f[b'2'], b'hello2') + + + def read_helper(self, f): + keys = self.keys_helper(f) + for key in self._dict: + self.assertEqual(self._dict[key], f[key]) + + def init_db(self): + with contextlib.closing(dumbdbm.open(_fname, 'n')) as f: + for k in self._dict: + f[k] = self._dict[k] + + def keys_helper(self, f): + keys = sorted(f.keys()) + dkeys = sorted(self._dict.keys()) + self.assertEqual(keys, dkeys) + return keys + + # Perform randomized operations. This doesn't make assumptions about + # what *might* fail. + def test_random(self): + import random + d = {} # mirror the database + for dummy in range(5): + with contextlib.closing(dumbdbm.open(_fname)) as f: + for dummy in range(100): + k = random.choice('abcdefghijklm') + if random.random() < 0.2: + if k in d: + del d[k] + del f[k] + else: + v = random.choice((b'a', b'b', b'c')) * random.randrange(10000) + d[k] = v + f[k] = v + self.assertEqual(f[k], v) + + with contextlib.closing(dumbdbm.open(_fname)) as f: + expected = sorted((k.encode("latin-1"), v) for k, v in d.items()) + got = sorted(f.items()) + self.assertEqual(expected, got) + + def test_context_manager(self): + with dumbdbm.open(_fname, 'c') as db: + db["dumbdbm context manager"] = "context manager" + + with dumbdbm.open(_fname, 'r') as db: + self.assertEqual(list(db.keys()), [b"dumbdbm context manager"]) + + with self.assertRaises(dumbdbm.error): + db.keys() + + def test_check_closed(self): + f = dumbdbm.open(_fname, 'c') + f.close() + + for meth in (partial(operator.delitem, f), + partial(operator.setitem, f, 'b'), + partial(operator.getitem, f), + partial(operator.contains, f)): + with self.assertRaises(dumbdbm.error) as cm: + meth('test') + self.assertEqual(str(cm.exception), + "DBM object has already been closed") + + for meth in (operator.methodcaller('keys'), + operator.methodcaller('iterkeys'), + operator.methodcaller('items'), + len): + with self.assertRaises(dumbdbm.error) as cm: + meth(f) + self.assertEqual(str(cm.exception), + "DBM object has already been closed") + + def test_create_new(self): + with dumbdbm.open(_fname, 'n') as f: + for k in self._dict: + f[k] = self._dict[k] + + with dumbdbm.open(_fname, 'n') as f: + self.assertEqual(f.keys(), []) + + def test_eval(self): + with open(_fname + '.dir', 'w') as stream: + stream.write("str(print('Hacked!')), 0\n") + with support.captured_stdout() as stdout: + with self.assertRaises(ValueError): + with dumbdbm.open(_fname) as f: + pass + self.assertEqual(stdout.getvalue(), '') + + def test_missing_data(self): + for value in ('r', 'w'): + _delete_files() + with self.assertRaises(FileNotFoundError): + dumbdbm.open(_fname, value) + self.assertFalse(os.path.exists(_fname + '.dir')) + self.assertFalse(os.path.exists(_fname + '.bak')) + + def test_missing_index(self): + with dumbdbm.open(_fname, 'n') as f: + pass + os.unlink(_fname + '.dir') + for value in ('r', 'w'): + with self.assertRaises(FileNotFoundError): + dumbdbm.open(_fname, value) + self.assertFalse(os.path.exists(_fname + '.dir')) + self.assertFalse(os.path.exists(_fname + '.bak')) + + def test_invalid_flag(self): + for flag in ('x', 'rf', None): + with self.assertRaisesRegex(ValueError, + "Flag must be one of " + "'r', 'w', 'c', or 'n'"): + dumbdbm.open(_fname, flag) + + def test_readonly_files(self): + with support.temp_dir() as dir: + fname = os.path.join(dir, 'db') + with dumbdbm.open(fname, 'n') as f: + self.assertEqual(list(f.keys()), []) + for key in self._dict: + f[key] = self._dict[key] + os.chmod(fname + ".dir", stat.S_IRUSR) + os.chmod(fname + ".dat", stat.S_IRUSR) + os.chmod(dir, stat.S_IRUSR|stat.S_IXUSR) + with dumbdbm.open(fname, 'r') as f: + self.assertEqual(sorted(f.keys()), sorted(self._dict)) + f.close() # don't write + + @unittest.skipUnless(support.TESTFN_NONASCII, + 'requires OS support of non-ASCII encodings') + def test_nonascii_filename(self): + filename = support.TESTFN_NONASCII + for suffix in ['.dir', '.dat', '.bak']: + self.addCleanup(support.unlink, filename + suffix) + with dumbdbm.open(filename, 'c') as db: + db[b'key'] = b'value' + self.assertTrue(os.path.exists(filename + '.dat')) + self.assertTrue(os.path.exists(filename + '.dir')) + with dumbdbm.open(filename, 'r') as db: + self.assertEqual(list(db.keys()), [b'key']) + self.assertTrue(b'key' in db) + self.assertEqual(db[b'key'], b'value') + + def tearDown(self): + _delete_files() + + def setUp(self): + _delete_files() + + +if __name__ == "__main__": + unittest.main()