Merge pull request #2691 from verhovsky/copy-dbm-python-only

Add dbm from CPython 3.8
This commit is contained in:
Noah
2021-06-07 19:43:27 -05:00
committed by GitHub
4 changed files with 1020 additions and 0 deletions

189
Lib/dbm/__init__.py Normal file
View File

@@ -0,0 +1,189 @@
"""Generic interface to all dbm clones.
Use
import dbm
d = dbm.open(file, 'w', 0o666)
The returned object is a dbm.gnu, dbm.ndbm or dbm.dumb object, dependent on the
type of database being opened (determined by the whichdb function) in the case
of an existing dbm. If the dbm does not exist and the create or new flag ('c'
or 'n') was specified, the dbm type will be determined by the availability of
the modules (tested in the above order).
It has the following interface (key and data are strings):
d[key] = data # store data at key (may override data at
# existing key)
data = d[key] # retrieve data at key (raise KeyError if no
# such key)
del d[key] # delete data stored at key (raises KeyError
# if no such key)
flag = key in d # true if the key exists
list = d.keys() # return a list of all existing keys (slow!)
Future versions may change the order in which implementations are
tested for existence, and add interfaces to other dbm-like
implementations.
"""
__all__ = ['open', 'whichdb', 'error']
import io
import os
import struct
import sys
class error(Exception):
pass
_names = ['dbm.gnu', 'dbm.ndbm', 'dbm.dumb']
_defaultmod = None
_modules = {}
error = (error, OSError)
try:
from dbm import ndbm
except ImportError:
ndbm = None
def open(file, flag='r', mode=0o666):
"""Open or create database at path given by *file*.
Optional argument *flag* can be 'r' (default) for read-only access, 'w'
for read-write access of an existing database, 'c' for read-write access
to a new or existing database, and 'n' for read-write access to a new
database.
Note: 'r' and 'w' fail if the database doesn't exist; 'c' creates it
only if it doesn't exist; and 'n' always creates a new database.
"""
global _defaultmod
if _defaultmod is None:
for name in _names:
try:
mod = __import__(name, fromlist=['open'])
except ImportError:
continue
if not _defaultmod:
_defaultmod = mod
_modules[name] = mod
if not _defaultmod:
raise ImportError("no dbm clone found; tried %s" % _names)
# guess the type of an existing database, if not creating a new one
result = whichdb(file) if 'n' not in flag else None
if result is None:
# db doesn't exist or 'n' flag was specified to create a new db
if 'c' in flag or 'n' in flag:
# file doesn't exist and the new flag was used so use default type
mod = _defaultmod
else:
raise error[0]("db file doesn't exist; "
"use 'c' or 'n' flag to create a new db")
elif result == "":
# db type cannot be determined
raise error[0]("db type could not be determined")
elif result not in _modules:
raise error[0]("db type is {0}, but the module is not "
"available".format(result))
else:
mod = _modules[result]
return mod.open(file, flag, mode)
def whichdb(filename):
"""Guess which db package to use to open a db file.
Return values:
- None if the database file can't be read;
- empty string if the file can be read but can't be recognized
- the name of the dbm submodule (e.g. "ndbm" or "gnu") if recognized.
Importing the given module may still fail, and opening the
database using that module may still fail.
"""
# Check for ndbm first -- this has a .pag and a .dir file
try:
f = io.open(filename + ".pag", "rb")
f.close()
f = io.open(filename + ".dir", "rb")
f.close()
return "dbm.ndbm"
except OSError:
# some dbm emulations based on Berkeley DB generate a .db file
# some do not, but they should be caught by the bsd checks
try:
f = io.open(filename + ".db", "rb")
f.close()
# guarantee we can actually open the file using dbm
# kind of overkill, but since we are dealing with emulations
# it seems like a prudent step
if ndbm is not None:
d = ndbm.open(filename)
d.close()
return "dbm.ndbm"
except OSError:
pass
# Check for dumbdbm next -- this has a .dir and a .dat file
try:
# First check for presence of files
os.stat(filename + ".dat")
size = os.stat(filename + ".dir").st_size
# dumbdbm files with no keys are empty
if size == 0:
return "dbm.dumb"
f = io.open(filename + ".dir", "rb")
try:
if f.read(1) in (b"'", b'"'):
return "dbm.dumb"
finally:
f.close()
except OSError:
pass
# See if the file exists, return None if not
try:
f = io.open(filename, "rb")
except OSError:
return None
with f:
# Read the start of the file -- the magic number
s16 = f.read(16)
s = s16[0:4]
# Return "" if not at least 4 bytes
if len(s) != 4:
return ""
# Convert to 4-byte int in native byte order -- return "" if impossible
try:
(magic,) = struct.unpack("=l", s)
except struct.error:
return ""
# Check for GNU dbm
if magic in (0x13579ace, 0x13579acd, 0x13579acf):
return "dbm.gnu"
# Later versions of Berkeley db hash file have a 12-byte pad in
# front of the file type
try:
(magic,) = struct.unpack("=l", s16[-4:])
except struct.error:
return ""
# Unknown
return ""
if __name__ == "__main__":
for filename in sys.argv[1:]:
print(whichdb(filename) or "UNKNOWN", filename)

316
Lib/dbm/dumb.py Normal file
View File

@@ -0,0 +1,316 @@
"""A dumb and slow but simple dbm clone.
For database spam, spam.dir contains the index (a text file),
spam.bak *may* contain a backup of the index (also a text file),
while spam.dat contains the data (a binary file).
XXX TO DO:
- seems to contain a bug when updating...
- reclaim free space (currently, space once occupied by deleted or expanded
items is never reused)
- support concurrent access (currently, if two processes take turns making
updates, they can mess up the index)
- support efficient access to large databases (currently, the whole index
is read when the database is opened, and some updates rewrite the whole index)
- support opening for read-only (flag = 'm')
"""
import ast as _ast
import io as _io
import os as _os
import collections.abc
__all__ = ["error", "open"]
_BLOCKSIZE = 512
error = OSError
class _Database(collections.abc.MutableMapping):
# The on-disk directory and data files can remain in mutually
# inconsistent states for an arbitrarily long time (see comments
# at the end of __setitem__). This is only repaired when _commit()
# gets called. One place _commit() gets called is from __del__(),
# and if that occurs at program shutdown time, module globals may
# already have gotten rebound to None. Since it's crucial that
# _commit() finish successfully, we can't ignore shutdown races
# here, and _commit() must not reference any globals.
_os = _os # for _commit()
_io = _io # for _commit()
def __init__(self, filebasename, mode, flag='c'):
self._mode = mode
self._readonly = (flag == 'r')
# The directory file is a text file. Each line looks like
# "%r, (%d, %d)\n" % (key, pos, siz)
# where key is the string key, pos is the offset into the dat
# file of the associated value's first byte, and siz is the number
# of bytes in the associated value.
self._dirfile = filebasename + '.dir'
# The data file is a binary file pointed into by the directory
# file, and holds the values associated with keys. Each value
# begins at a _BLOCKSIZE-aligned byte offset, and is a raw
# binary 8-bit string value.
self._datfile = filebasename + '.dat'
self._bakfile = filebasename + '.bak'
# The index is an in-memory dict, mirroring the directory file.
self._index = None # maps keys to (pos, siz) pairs
# Handle the creation
self._create(flag)
self._update(flag)
def _create(self, flag):
if flag == 'n':
for filename in (self._datfile, self._bakfile, self._dirfile):
try:
_os.remove(filename)
except OSError:
pass
# Mod by Jack: create data file if needed
try:
f = _io.open(self._datfile, 'r', encoding="Latin-1")
except OSError:
if flag not in ('c', 'n'):
raise
with _io.open(self._datfile, 'w', encoding="Latin-1") as f:
self._chmod(self._datfile)
else:
f.close()
# Read directory file into the in-memory index dict.
def _update(self, flag):
self._modified = False
self._index = {}
try:
f = _io.open(self._dirfile, 'r', encoding="Latin-1")
except OSError:
if flag not in ('c', 'n'):
raise
self._modified = True
else:
with f:
for line in f:
line = line.rstrip()
key, pos_and_siz_pair = _ast.literal_eval(line)
key = key.encode('Latin-1')
self._index[key] = pos_and_siz_pair
# Write the index dict to the directory file. The original directory
# file (if any) is renamed with a .bak extension first. If a .bak
# file currently exists, it's deleted.
def _commit(self):
# CAUTION: It's vital that _commit() succeed, and _commit() can
# be called from __del__(). Therefore we must never reference a
# global in this routine.
if self._index is None or not self._modified:
return # nothing to do
try:
self._os.unlink(self._bakfile)
except OSError:
pass
try:
self._os.rename(self._dirfile, self._bakfile)
except OSError:
pass
with self._io.open(self._dirfile, 'w', encoding="Latin-1") as f:
self._chmod(self._dirfile)
for key, pos_and_siz_pair in self._index.items():
# Use Latin-1 since it has no qualms with any value in any
# position; UTF-8, though, does care sometimes.
entry = "%r, %r\n" % (key.decode('Latin-1'), pos_and_siz_pair)
f.write(entry)
sync = _commit
def _verify_open(self):
if self._index is None:
raise error('DBM object has already been closed')
def __getitem__(self, key):
if isinstance(key, str):
key = key.encode('utf-8')
self._verify_open()
pos, siz = self._index[key] # may raise KeyError
with _io.open(self._datfile, 'rb') as f:
f.seek(pos)
dat = f.read(siz)
return dat
# Append val to the data file, starting at a _BLOCKSIZE-aligned
# offset. The data file is first padded with NUL bytes (if needed)
# to get to an aligned offset. Return pair
# (starting offset of val, len(val))
def _addval(self, val):
with _io.open(self._datfile, 'rb+') as f:
f.seek(0, 2)
pos = int(f.tell())
npos = ((pos + _BLOCKSIZE - 1) // _BLOCKSIZE) * _BLOCKSIZE
f.write(b'\0'*(npos-pos))
pos = npos
f.write(val)
return (pos, len(val))
# Write val to the data file, starting at offset pos. The caller
# is responsible for ensuring that there's enough room starting at
# pos to hold val, without overwriting some other value. Return
# pair (pos, len(val)).
def _setval(self, pos, val):
with _io.open(self._datfile, 'rb+') as f:
f.seek(pos)
f.write(val)
return (pos, len(val))
# key is a new key whose associated value starts in the data file
# at offset pos and with length siz. Add an index record to
# the in-memory index dict, and append one to the directory file.
def _addkey(self, key, pos_and_siz_pair):
self._index[key] = pos_and_siz_pair
with _io.open(self._dirfile, 'a', encoding="Latin-1") as f:
self._chmod(self._dirfile)
f.write("%r, %r\n" % (key.decode("Latin-1"), pos_and_siz_pair))
def __setitem__(self, key, val):
if self._readonly:
raise error('The database is opened for reading only')
if isinstance(key, str):
key = key.encode('utf-8')
elif not isinstance(key, (bytes, bytearray)):
raise TypeError("keys must be bytes or strings")
if isinstance(val, str):
val = val.encode('utf-8')
elif not isinstance(val, (bytes, bytearray)):
raise TypeError("values must be bytes or strings")
self._verify_open()
self._modified = True
if key not in self._index:
self._addkey(key, self._addval(val))
else:
# See whether the new value is small enough to fit in the
# (padded) space currently occupied by the old value.
pos, siz = self._index[key]
oldblocks = (siz + _BLOCKSIZE - 1) // _BLOCKSIZE
newblocks = (len(val) + _BLOCKSIZE - 1) // _BLOCKSIZE
if newblocks <= oldblocks:
self._index[key] = self._setval(pos, val)
else:
# The new value doesn't fit in the (padded) space used
# by the old value. The blocks used by the old value are
# forever lost.
self._index[key] = self._addval(val)
# Note that _index may be out of synch with the directory
# file now: _setval() and _addval() don't update the directory
# file. This also means that the on-disk directory and data
# files are in a mutually inconsistent state, and they'll
# remain that way until _commit() is called. Note that this
# is a disaster (for the database) if the program crashes
# (so that _commit() never gets called).
def __delitem__(self, key):
if self._readonly:
raise error('The database is opened for reading only')
if isinstance(key, str):
key = key.encode('utf-8')
self._verify_open()
self._modified = True
# The blocks used by the associated value are lost.
del self._index[key]
# XXX It's unclear why we do a _commit() here (the code always
# XXX has, so I'm not changing it). __setitem__ doesn't try to
# XXX keep the directory file in synch. Why should we? Or
# XXX why shouldn't __setitem__?
self._commit()
def keys(self):
try:
return list(self._index)
except TypeError:
raise error('DBM object has already been closed') from None
def items(self):
self._verify_open()
return [(key, self[key]) for key in self._index.keys()]
def __contains__(self, key):
if isinstance(key, str):
key = key.encode('utf-8')
try:
return key in self._index
except TypeError:
if self._index is None:
raise error('DBM object has already been closed') from None
else:
raise
def iterkeys(self):
try:
return iter(self._index)
except TypeError:
raise error('DBM object has already been closed') from None
__iter__ = iterkeys
def __len__(self):
try:
return len(self._index)
except TypeError:
raise error('DBM object has already been closed') from None
def close(self):
try:
self._commit()
finally:
self._index = self._datfile = self._dirfile = self._bakfile = None
__del__ = close
def _chmod(self, file):
self._os.chmod(file, self._mode)
def __enter__(self):
return self
def __exit__(self, *args):
self.close()
def open(file, flag='c', mode=0o666):
"""Open the database file, filename, and return corresponding object.
The flag argument, used to control how the database is opened in the
other DBM implementations, supports only the semantics of 'c' and 'n'
values. Other values will default to the semantics of 'c' value:
the database will always opened for update and will be created if it
does not exist.
The optional mode argument is the UNIX mode of the file, used only when
the database has to be created. It defaults to octal code 0o666 (and
will be modified by the prevailing umask).
"""
# Modify mode depending on the umask
try:
um = _os.umask(0)
_os.umask(um)
except AttributeError:
pass
else:
# Turn off any bits that are set in the umask
mode = mode & (~um)
if flag not in ('r', 'w', 'c', 'n'):
raise ValueError("Flag must be one of 'r', 'w', 'c', or 'n'")
return _Database(file, mode, flag=flag)

212
Lib/test/test_dbm.py Normal file
View File

@@ -0,0 +1,212 @@
"""Test script for the dbm.open function based on testdumbdbm.py"""
import unittest
import glob
import test.support
# Skip tests if dbm module doesn't exist.
dbm = test.support.import_module('dbm')
try:
from dbm import ndbm
except ImportError:
ndbm = None
_fname = test.support.TESTFN
#
# Iterates over every database module supported by dbm currently available,
# setting dbm to use each in turn, and yielding that module
#
def dbm_iterator():
for name in dbm._names:
try:
mod = __import__(name, fromlist=['open'])
except ImportError:
continue
dbm._modules[name] = mod
yield mod
#
# Clean up all scratch databases we might have created during testing
#
def delete_files():
# we don't know the precise name the underlying database uses
# so we use glob to locate all names
for f in glob.glob(glob.escape(_fname) + "*"):
test.support.unlink(f)
class AnyDBMTestCase:
_dict = {'a': b'Python:',
'b': b'Programming',
'c': b'the',
'd': b'way',
'f': b'Guido',
'g': b'intended',
}
def init_db(self):
f = dbm.open(_fname, 'n')
for k in self._dict:
f[k.encode("ascii")] = self._dict[k]
f.close()
def keys_helper(self, f):
keys = sorted(k.decode("ascii") for k in f.keys())
dkeys = sorted(self._dict.keys())
self.assertEqual(keys, dkeys)
return keys
def test_error(self):
self.assertTrue(issubclass(self.module.error, OSError))
def test_anydbm_not_existing(self):
self.assertRaises(dbm.error, dbm.open, _fname)
def test_anydbm_creation(self):
f = dbm.open(_fname, 'c')
self.assertEqual(list(f.keys()), [])
for key in self._dict:
f[key.encode("ascii")] = self._dict[key]
self.read_helper(f)
f.close()
def test_anydbm_creation_n_file_exists_with_invalid_contents(self):
# create an empty file
test.support.create_empty_file(_fname)
with dbm.open(_fname, 'n') as f:
self.assertEqual(len(f), 0)
def test_anydbm_modification(self):
self.init_db()
f = dbm.open(_fname, 'c')
self._dict['g'] = f[b'g'] = b"indented"
self.read_helper(f)
# setdefault() works as in the dict interface
self.assertEqual(f.setdefault(b'xxx', b'foo'), b'foo')
self.assertEqual(f[b'xxx'], b'foo')
f.close()
def test_anydbm_read(self):
self.init_db()
f = dbm.open(_fname, 'r')
self.read_helper(f)
# get() works as in the dict interface
self.assertEqual(f.get(b'a'), self._dict['a'])
self.assertEqual(f.get(b'xxx', b'foo'), b'foo')
self.assertIsNone(f.get(b'xxx'))
with self.assertRaises(KeyError):
f[b'xxx']
f.close()
def test_anydbm_keys(self):
self.init_db()
f = dbm.open(_fname, 'r')
keys = self.keys_helper(f)
f.close()
def test_empty_value(self):
if getattr(dbm._defaultmod, 'library', None) == 'Berkeley DB':
self.skipTest("Berkeley DB doesn't distinguish the empty value "
"from the absent one")
f = dbm.open(_fname, 'c')
self.assertEqual(f.keys(), [])
f[b'empty'] = b''
self.assertEqual(f.keys(), [b'empty'])
self.assertIn(b'empty', f)
self.assertEqual(f[b'empty'], b'')
self.assertEqual(f.get(b'empty'), b'')
self.assertEqual(f.setdefault(b'empty'), b'')
f.close()
def test_anydbm_access(self):
self.init_db()
f = dbm.open(_fname, 'r')
key = "a".encode("ascii")
self.assertIn(key, f)
assert(f[key] == b"Python:")
f.close()
def read_helper(self, f):
keys = self.keys_helper(f)
for key in self._dict:
self.assertEqual(self._dict[key], f[key.encode("ascii")])
def tearDown(self):
delete_files()
def setUp(self):
dbm._defaultmod = self.module
delete_files()
class WhichDBTestCase(unittest.TestCase):
def test_whichdb(self):
for module in dbm_iterator():
# Check whether whichdb correctly guesses module name
# for databases opened with "module" module.
# Try with empty files first
name = module.__name__
if name == 'dbm.dumb':
continue # whichdb can't support dbm.dumb
delete_files()
f = module.open(_fname, 'c')
f.close()
self.assertEqual(name, self.dbm.whichdb(_fname))
# Now add a key
f = module.open(_fname, 'w')
f[b"1"] = b"1"
# and test that we can find it
self.assertIn(b"1", f)
# and read it
self.assertEqual(f[b"1"], b"1")
f.close()
self.assertEqual(name, self.dbm.whichdb(_fname))
@unittest.skipUnless(ndbm, reason='Test requires ndbm')
def test_whichdb_ndbm(self):
# Issue 17198: check that ndbm which is referenced in whichdb is defined
db_file = '{}_ndbm.db'.format(_fname)
with open(db_file, 'w'):
self.addCleanup(test.support.unlink, db_file)
self.assertIsNone(self.dbm.whichdb(db_file[:-3]))
def tearDown(self):
delete_files()
def setUp(self):
delete_files()
self.filename = test.support.TESTFN
self.d = dbm.open(self.filename, 'c')
self.d.close()
self.dbm = test.support.import_fresh_module('dbm')
def test_keys(self):
self.d = dbm.open(self.filename, 'c')
self.assertEqual(self.d.keys(), [])
a = [(b'a', b'b'), (b'12345678910', b'019237410982340912840198242')]
for k, v in a:
self.d[k] = v
self.assertEqual(sorted(self.d.keys()), sorted(k for (k, v) in a))
for k, v in a:
self.assertIn(k, self.d)
self.assertEqual(self.d[k], v)
self.assertNotIn(b'xxx', self.d)
self.assertRaises(KeyError, lambda: self.d[b'xxx'])
self.d.close()
def load_tests(loader, tests, pattern):
classes = []
for mod in dbm_iterator():
classes.append(type("TestCase-" + mod.__name__,
(AnyDBMTestCase, unittest.TestCase),
{'module': mod}))
suites = [unittest.makeSuite(c) for c in classes]
tests.addTests(suites)
return tests
if __name__ == "__main__":
unittest.main()

303
Lib/test/test_dbm_dumb.py Normal file
View File

@@ -0,0 +1,303 @@
"""Test script for the dumbdbm module
Original by Roger E. Masse
"""
import contextlib
import io
import operator
import os
import stat
import unittest
import dbm.dumb as dumbdbm
from test import support
from functools import partial
_fname = support.TESTFN
def _delete_files():
for ext in [".dir", ".dat", ".bak"]:
try:
os.unlink(_fname + ext)
except OSError:
pass
class DumbDBMTestCase(unittest.TestCase):
_dict = {b'0': b'',
b'a': b'Python:',
b'b': b'Programming',
b'c': b'the',
b'd': b'way',
b'f': b'Guido',
b'g': b'intended',
'\u00fc'.encode('utf-8') : b'!',
}
def test_dumbdbm_creation(self):
with contextlib.closing(dumbdbm.open(_fname, 'c')) as f:
self.assertEqual(list(f.keys()), [])
for key in self._dict:
f[key] = self._dict[key]
self.read_helper(f)
@unittest.skipUnless(hasattr(os, 'umask'), 'test needs os.umask()')
def test_dumbdbm_creation_mode(self):
try:
old_umask = os.umask(0o002)
f = dumbdbm.open(_fname, 'c', 0o637)
f.close()
finally:
os.umask(old_umask)
expected_mode = 0o635
if os.name != 'posix':
# Windows only supports setting the read-only attribute.
# This shouldn't fail, but doesn't work like Unix either.
expected_mode = 0o666
import stat
st = os.stat(_fname + '.dat')
self.assertEqual(stat.S_IMODE(st.st_mode), expected_mode)
st = os.stat(_fname + '.dir')
self.assertEqual(stat.S_IMODE(st.st_mode), expected_mode)
def test_close_twice(self):
f = dumbdbm.open(_fname)
f[b'a'] = b'b'
self.assertEqual(f[b'a'], b'b')
f.close()
f.close()
def test_dumbdbm_modification(self):
self.init_db()
with contextlib.closing(dumbdbm.open(_fname, 'w')) as f:
self._dict[b'g'] = f[b'g'] = b"indented"
self.read_helper(f)
# setdefault() works as in the dict interface
self.assertEqual(f.setdefault(b'xxx', b'foo'), b'foo')
self.assertEqual(f[b'xxx'], b'foo')
def test_dumbdbm_read(self):
self.init_db()
with contextlib.closing(dumbdbm.open(_fname, 'r')) as f:
self.read_helper(f)
with self.assertRaisesRegex(dumbdbm.error,
'The database is opened for reading only'):
f[b'g'] = b'x'
with self.assertRaisesRegex(dumbdbm.error,
'The database is opened for reading only'):
del f[b'a']
# get() works as in the dict interface
self.assertEqual(f.get(b'a'), self._dict[b'a'])
self.assertEqual(f.get(b'xxx', b'foo'), b'foo')
self.assertIsNone(f.get(b'xxx'))
with self.assertRaises(KeyError):
f[b'xxx']
def test_dumbdbm_keys(self):
self.init_db()
with contextlib.closing(dumbdbm.open(_fname)) as f:
keys = self.keys_helper(f)
def test_write_contains(self):
with contextlib.closing(dumbdbm.open(_fname)) as f:
f[b'1'] = b'hello'
self.assertIn(b'1', f)
def test_write_write_read(self):
# test for bug #482460
with contextlib.closing(dumbdbm.open(_fname)) as f:
f[b'1'] = b'hello'
f[b'1'] = b'hello2'
with contextlib.closing(dumbdbm.open(_fname)) as f:
self.assertEqual(f[b'1'], b'hello2')
def test_str_read(self):
self.init_db()
with contextlib.closing(dumbdbm.open(_fname, 'r')) as f:
self.assertEqual(f['\u00fc'], self._dict['\u00fc'.encode('utf-8')])
def test_str_write_contains(self):
self.init_db()
with contextlib.closing(dumbdbm.open(_fname)) as f:
f['\u00fc'] = b'!'
f['1'] = 'a'
with contextlib.closing(dumbdbm.open(_fname, 'r')) as f:
self.assertIn('\u00fc', f)
self.assertEqual(f['\u00fc'.encode('utf-8')],
self._dict['\u00fc'.encode('utf-8')])
self.assertEqual(f[b'1'], b'a')
def test_line_endings(self):
# test for bug #1172763: dumbdbm would die if the line endings
# weren't what was expected.
with contextlib.closing(dumbdbm.open(_fname)) as f:
f[b'1'] = b'hello'
f[b'2'] = b'hello2'
# Mangle the file by changing the line separator to Windows or Unix
with io.open(_fname + '.dir', 'rb') as file:
data = file.read()
if os.linesep == '\n':
data = data.replace(b'\n', b'\r\n')
else:
data = data.replace(b'\r\n', b'\n')
with io.open(_fname + '.dir', 'wb') as file:
file.write(data)
f = dumbdbm.open(_fname)
self.assertEqual(f[b'1'], b'hello')
self.assertEqual(f[b'2'], b'hello2')
def read_helper(self, f):
keys = self.keys_helper(f)
for key in self._dict:
self.assertEqual(self._dict[key], f[key])
def init_db(self):
with contextlib.closing(dumbdbm.open(_fname, 'n')) as f:
for k in self._dict:
f[k] = self._dict[k]
def keys_helper(self, f):
keys = sorted(f.keys())
dkeys = sorted(self._dict.keys())
self.assertEqual(keys, dkeys)
return keys
# Perform randomized operations. This doesn't make assumptions about
# what *might* fail.
def test_random(self):
import random
d = {} # mirror the database
for dummy in range(5):
with contextlib.closing(dumbdbm.open(_fname)) as f:
for dummy in range(100):
k = random.choice('abcdefghijklm')
if random.random() < 0.2:
if k in d:
del d[k]
del f[k]
else:
v = random.choice((b'a', b'b', b'c')) * random.randrange(10000)
d[k] = v
f[k] = v
self.assertEqual(f[k], v)
with contextlib.closing(dumbdbm.open(_fname)) as f:
expected = sorted((k.encode("latin-1"), v) for k, v in d.items())
got = sorted(f.items())
self.assertEqual(expected, got)
def test_context_manager(self):
with dumbdbm.open(_fname, 'c') as db:
db["dumbdbm context manager"] = "context manager"
with dumbdbm.open(_fname, 'r') as db:
self.assertEqual(list(db.keys()), [b"dumbdbm context manager"])
with self.assertRaises(dumbdbm.error):
db.keys()
def test_check_closed(self):
f = dumbdbm.open(_fname, 'c')
f.close()
for meth in (partial(operator.delitem, f),
partial(operator.setitem, f, 'b'),
partial(operator.getitem, f),
partial(operator.contains, f)):
with self.assertRaises(dumbdbm.error) as cm:
meth('test')
self.assertEqual(str(cm.exception),
"DBM object has already been closed")
for meth in (operator.methodcaller('keys'),
operator.methodcaller('iterkeys'),
operator.methodcaller('items'),
len):
with self.assertRaises(dumbdbm.error) as cm:
meth(f)
self.assertEqual(str(cm.exception),
"DBM object has already been closed")
def test_create_new(self):
with dumbdbm.open(_fname, 'n') as f:
for k in self._dict:
f[k] = self._dict[k]
with dumbdbm.open(_fname, 'n') as f:
self.assertEqual(f.keys(), [])
def test_eval(self):
with open(_fname + '.dir', 'w') as stream:
stream.write("str(print('Hacked!')), 0\n")
with support.captured_stdout() as stdout:
with self.assertRaises(ValueError):
with dumbdbm.open(_fname) as f:
pass
self.assertEqual(stdout.getvalue(), '')
def test_missing_data(self):
for value in ('r', 'w'):
_delete_files()
with self.assertRaises(FileNotFoundError):
dumbdbm.open(_fname, value)
self.assertFalse(os.path.exists(_fname + '.dir'))
self.assertFalse(os.path.exists(_fname + '.bak'))
def test_missing_index(self):
with dumbdbm.open(_fname, 'n') as f:
pass
os.unlink(_fname + '.dir')
for value in ('r', 'w'):
with self.assertRaises(FileNotFoundError):
dumbdbm.open(_fname, value)
self.assertFalse(os.path.exists(_fname + '.dir'))
self.assertFalse(os.path.exists(_fname + '.bak'))
def test_invalid_flag(self):
for flag in ('x', 'rf', None):
with self.assertRaisesRegex(ValueError,
"Flag must be one of "
"'r', 'w', 'c', or 'n'"):
dumbdbm.open(_fname, flag)
def test_readonly_files(self):
with support.temp_dir() as dir:
fname = os.path.join(dir, 'db')
with dumbdbm.open(fname, 'n') as f:
self.assertEqual(list(f.keys()), [])
for key in self._dict:
f[key] = self._dict[key]
os.chmod(fname + ".dir", stat.S_IRUSR)
os.chmod(fname + ".dat", stat.S_IRUSR)
os.chmod(dir, stat.S_IRUSR|stat.S_IXUSR)
with dumbdbm.open(fname, 'r') as f:
self.assertEqual(sorted(f.keys()), sorted(self._dict))
f.close() # don't write
@unittest.skipUnless(support.TESTFN_NONASCII,
'requires OS support of non-ASCII encodings')
def test_nonascii_filename(self):
filename = support.TESTFN_NONASCII
for suffix in ['.dir', '.dat', '.bak']:
self.addCleanup(support.unlink, filename + suffix)
with dumbdbm.open(filename, 'c') as db:
db[b'key'] = b'value'
self.assertTrue(os.path.exists(filename + '.dat'))
self.assertTrue(os.path.exists(filename + '.dir'))
with dumbdbm.open(filename, 'r') as db:
self.assertEqual(list(db.keys()), [b'key'])
self.assertTrue(b'key' in db)
self.assertEqual(db[b'key'], b'value')
def tearDown(self):
_delete_files()
def setUp(self):
_delete_files()
if __name__ == "__main__":
unittest.main()