mirror of
https://github.com/RustPython/RustPython.git
synced 2026-06-02 19:39:49 +09:00
Updated the hmac + mailbox libraries + associated tests (#6608)
* Updated hmac + test library * Updated mailbox library * Added mailbox library test (v3.13.10)
This commit is contained in:
committed by
GitHub
parent
37c47fca6b
commit
bd0aaf6f4f
2
Lib/hmac.py
vendored
2
Lib/hmac.py
vendored
@@ -53,7 +53,7 @@ class HMAC:
|
||||
raise TypeError("key: expected bytes or bytearray, but got %r" % type(key).__name__)
|
||||
|
||||
if not digestmod:
|
||||
raise TypeError("Missing required parameter 'digestmod'.")
|
||||
raise TypeError("Missing required argument 'digestmod'.")
|
||||
|
||||
if _hashopenssl and isinstance(digestmod, (str, _functype)):
|
||||
try:
|
||||
|
||||
96
Lib/mailbox.py
vendored
96
Lib/mailbox.py
vendored
@@ -395,6 +395,56 @@ class Maildir(Mailbox):
|
||||
f = open(os.path.join(self._path, self._lookup(key)), 'rb')
|
||||
return _ProxyFile(f)
|
||||
|
||||
def get_info(self, key):
|
||||
"""Get the keyed message's "info" as a string."""
|
||||
subpath = self._lookup(key)
|
||||
if self.colon in subpath:
|
||||
return subpath.split(self.colon)[-1]
|
||||
return ''
|
||||
|
||||
def set_info(self, key, info: str):
|
||||
"""Set the keyed message's "info" string."""
|
||||
if not isinstance(info, str):
|
||||
raise TypeError(f'info must be a string: {type(info)}')
|
||||
old_subpath = self._lookup(key)
|
||||
new_subpath = old_subpath.split(self.colon)[0]
|
||||
if info:
|
||||
new_subpath += self.colon + info
|
||||
if new_subpath == old_subpath:
|
||||
return
|
||||
old_path = os.path.join(self._path, old_subpath)
|
||||
new_path = os.path.join(self._path, new_subpath)
|
||||
os.rename(old_path, new_path)
|
||||
self._toc[key] = new_subpath
|
||||
|
||||
def get_flags(self, key):
|
||||
"""Return as a string the standard flags that are set on the keyed message."""
|
||||
info = self.get_info(key)
|
||||
if info.startswith('2,'):
|
||||
return info[2:]
|
||||
return ''
|
||||
|
||||
def set_flags(self, key, flags: str):
|
||||
"""Set the given flags and unset all others on the keyed message."""
|
||||
if not isinstance(flags, str):
|
||||
raise TypeError(f'flags must be a string: {type(flags)}')
|
||||
# TODO: check if flags are valid standard flag characters?
|
||||
self.set_info(key, '2,' + ''.join(sorted(set(flags))))
|
||||
|
||||
def add_flag(self, key, flag: str):
|
||||
"""Set the given flag(s) without changing others on the keyed message."""
|
||||
if not isinstance(flag, str):
|
||||
raise TypeError(f'flag must be a string: {type(flag)}')
|
||||
# TODO: check that flag is a valid standard flag character?
|
||||
self.set_flags(key, ''.join(set(self.get_flags(key)) | set(flag)))
|
||||
|
||||
def remove_flag(self, key, flag: str):
|
||||
"""Unset the given string flag(s) without changing others on the keyed message."""
|
||||
if not isinstance(flag, str):
|
||||
raise TypeError(f'flag must be a string: {type(flag)}')
|
||||
if self.get_flags(key):
|
||||
self.set_flags(key, ''.join(set(self.get_flags(key)) - set(flag)))
|
||||
|
||||
def iterkeys(self):
|
||||
"""Return an iterator over keys."""
|
||||
self._refresh()
|
||||
@@ -540,6 +590,8 @@ class Maildir(Mailbox):
|
||||
for subdir in self._toc_mtimes:
|
||||
path = self._paths[subdir]
|
||||
for entry in os.listdir(path):
|
||||
if entry.startswith('.'):
|
||||
continue
|
||||
p = os.path.join(path, entry)
|
||||
if os.path.isdir(p):
|
||||
continue
|
||||
@@ -698,9 +750,13 @@ class _singlefileMailbox(Mailbox):
|
||||
_sync_close(new_file)
|
||||
# self._file is about to get replaced, so no need to sync.
|
||||
self._file.close()
|
||||
# Make sure the new file's mode is the same as the old file's
|
||||
mode = os.stat(self._path).st_mode
|
||||
os.chmod(new_file.name, mode)
|
||||
# Make sure the new file's mode and owner are the same as the old file's
|
||||
info = os.stat(self._path)
|
||||
os.chmod(new_file.name, info.st_mode)
|
||||
try:
|
||||
os.chown(new_file.name, info.st_uid, info.st_gid)
|
||||
except (AttributeError, OSError):
|
||||
pass
|
||||
try:
|
||||
os.rename(new_file.name, self._path)
|
||||
except FileExistsError:
|
||||
@@ -778,10 +834,11 @@ class _mboxMMDF(_singlefileMailbox):
|
||||
"""Return a Message representation or raise a KeyError."""
|
||||
start, stop = self._lookup(key)
|
||||
self._file.seek(start)
|
||||
from_line = self._file.readline().replace(linesep, b'')
|
||||
from_line = self._file.readline().replace(linesep, b'').decode('ascii')
|
||||
string = self._file.read(stop - self._file.tell())
|
||||
msg = self._message_factory(string.replace(linesep, b'\n'))
|
||||
msg.set_from(from_line[5:].decode('ascii'))
|
||||
msg.set_unixfrom(from_line)
|
||||
msg.set_from(from_line[5:])
|
||||
return msg
|
||||
|
||||
def get_string(self, key, from_=False):
|
||||
@@ -1089,10 +1146,24 @@ class MH(Mailbox):
|
||||
"""Return a count of messages in the mailbox."""
|
||||
return len(list(self.iterkeys()))
|
||||
|
||||
def _open_mh_sequences_file(self, text):
|
||||
mode = '' if text else 'b'
|
||||
kwargs = {'encoding': 'ASCII'} if text else {}
|
||||
path = os.path.join(self._path, '.mh_sequences')
|
||||
while True:
|
||||
try:
|
||||
return open(path, 'r+' + mode, **kwargs)
|
||||
except FileNotFoundError:
|
||||
pass
|
||||
try:
|
||||
return open(path, 'x+' + mode, **kwargs)
|
||||
except FileExistsError:
|
||||
pass
|
||||
|
||||
def lock(self):
|
||||
"""Lock the mailbox."""
|
||||
if not self._locked:
|
||||
self._file = open(os.path.join(self._path, '.mh_sequences'), 'rb+')
|
||||
self._file = self._open_mh_sequences_file(text=False)
|
||||
_lock_file(self._file)
|
||||
self._locked = True
|
||||
|
||||
@@ -1146,7 +1217,11 @@ class MH(Mailbox):
|
||||
def get_sequences(self):
|
||||
"""Return a name-to-key-list dictionary to define each sequence."""
|
||||
results = {}
|
||||
with open(os.path.join(self._path, '.mh_sequences'), 'r', encoding='ASCII') as f:
|
||||
try:
|
||||
f = open(os.path.join(self._path, '.mh_sequences'), 'r', encoding='ASCII')
|
||||
except FileNotFoundError:
|
||||
return results
|
||||
with f:
|
||||
all_keys = set(self.keys())
|
||||
for line in f:
|
||||
try:
|
||||
@@ -1169,7 +1244,7 @@ class MH(Mailbox):
|
||||
|
||||
def set_sequences(self, sequences):
|
||||
"""Set sequences using the given name-to-key-list dictionary."""
|
||||
f = open(os.path.join(self._path, '.mh_sequences'), 'r+', encoding='ASCII')
|
||||
f = self._open_mh_sequences_file(text=True)
|
||||
try:
|
||||
os.close(os.open(f.name, os.O_WRONLY | os.O_TRUNC))
|
||||
for name, keys in sequences.items():
|
||||
@@ -1956,10 +2031,7 @@ class _ProxyFile:
|
||||
|
||||
def __iter__(self):
|
||||
"""Iterate over lines."""
|
||||
while True:
|
||||
line = self.readline()
|
||||
if not line:
|
||||
return
|
||||
while line := self.readline():
|
||||
yield line
|
||||
|
||||
def tell(self):
|
||||
|
||||
8
Lib/test/test_hmac.py
vendored
8
Lib/test/test_hmac.py
vendored
@@ -505,6 +505,14 @@ class SanityTestCase(unittest.TestCase):
|
||||
self.fail("Exception raised during normal usage of HMAC class.")
|
||||
|
||||
|
||||
class UpdateTestCase(unittest.TestCase):
|
||||
@hashlib_helper.requires_hashdigest('sha256')
|
||||
def test_with_str_update(self):
|
||||
with self.assertRaises(TypeError):
|
||||
h = hmac.new(b"key", digestmod='sha256')
|
||||
h.update("invalid update")
|
||||
|
||||
|
||||
class CopyTestCase(unittest.TestCase):
|
||||
|
||||
@hashlib_helper.requires_hashdigest('sha256')
|
||||
|
||||
2495
Lib/test/test_mailbox.py
vendored
Normal file
2495
Lib/test/test_mailbox.py
vendored
Normal file
File diff suppressed because it is too large
Load Diff
Reference in New Issue
Block a user