Merge pull request #3002 from fanninpm/fix-codecs

Update codecs.py to CPython 3.8
This commit is contained in:
Jeong YunWon
2021-09-05 15:06:29 +09:00
committed by GitHub
8 changed files with 4036 additions and 13 deletions

28
Lib/codecs.py vendored
View File

@@ -838,7 +838,7 @@ class StreamRecoder:
def writelines(self, list):
data = ''.join(list)
data = b''.join(list)
data, bytesdecoded = self.decode(data, self.errors)
return self.writer.write(data)
@@ -847,6 +847,12 @@ class StreamRecoder:
self.reader.reset()
self.writer.reset()
def seek(self, offset, whence=0):
# Seeks must be propagated to both the readers and writers
# as they might need to reset their internal buffers.
self.reader.seek(offset, whence)
self.writer.seek(offset, whence)
def __getattr__(self, name,
getattr=getattr):
@@ -862,7 +868,7 @@ class StreamRecoder:
### Shortcuts
def open(filename, mode='r', encoding=None, errors='strict', buffering=1):
def open(filename, mode='r', encoding=None, errors='strict', buffering=-1):
""" Open an encoded file using the given mode and return
a wrapped version providing transparent encoding/decoding.
@@ -883,7 +889,8 @@ def open(filename, mode='r', encoding=None, errors='strict', buffering=1):
encoding error occurs.
buffering has the same meaning as for the builtin open() API.
It defaults to line buffered.
It defaults to -1 which means that the default buffer size will
be used.
The returned wrapped file object provides an extra attribute
.encoding which allows querying the used encoding. This
@@ -898,11 +905,16 @@ def open(filename, mode='r', encoding=None, errors='strict', buffering=1):
file = builtins.open(filename, mode, buffering)
if encoding is None:
return file
info = lookup(encoding)
srw = StreamReaderWriter(file, info.streamreader, info.streamwriter, errors)
# Add attributes to simplify introspection
srw.encoding = encoding
return srw
try:
info = lookup(encoding)
srw = StreamReaderWriter(file, info.streamreader, info.streamwriter, errors)
# Add attributes to simplify introspection
srw.encoding = encoding
return srw
except:
file.close()
raise
def EncodedFile(file, data_encoding, file_encoding=None, errors='strict'):

View File

@@ -12,7 +12,7 @@
* getregentry() -> codecs.CodecInfo object
The getregentry() API must return a CodecInfo object with encoder, decoder,
incrementalencoder, incrementaldecoder, streamwriter and streamreader
atttributes which adhere to the Python Codec Interface Standard.
attributes which adhere to the Python Codec Interface Standard.
In addition, a module may optionally also define the following
APIs which are then used by the package's codec search function:
@@ -49,8 +49,7 @@ def normalize_encoding(encoding):
collapsed and replaced with a single underscore, e.g. ' -;#'
becomes '_'. Leading and trailing underscores are removed.
Note that encoding names should be ASCII only; if they do use
non-ASCII characters, these must be Latin-1 compatible.
Note that encoding names should be ASCII only.
"""
if isinstance(encoding, bytes):
@@ -58,7 +57,7 @@ def normalize_encoding(encoding):
chars = []
punct = False
for c in encoding.lower():
for c in encoding:
if c.isalnum() or c == '.':
if punct and chars:
chars.append('_')

View File

@@ -266,6 +266,8 @@ aliases = {
'roman8' : 'hp_roman8',
'r8' : 'hp_roman8',
'csHPRoman8' : 'hp_roman8',
'cp1051' : 'hp_roman8',
'ibm1051' : 'hp_roman8',
# hz codec
'hzgb' : 'hz',
@@ -534,6 +536,7 @@ aliases = {
'utf8' : 'utf_8',
'utf8_ucs2' : 'utf_8',
'utf8_ucs4' : 'utf_8',
'cp65001' : 'utf_8',
# uu_codec codec
'uu' : 'uu_codec',

View File

@@ -143,7 +143,7 @@ def decode_generalized_number(extended, extpos, bias, errors):
digit = char - 22 # 0x30-26
elif errors == "strict":
raise UnicodeError("Invalid extended code point '%s'"
% extended[extpos])
% extended[extpos-1])
else:
return extpos, None
t = T(j, bias)

View File

@@ -20,6 +20,10 @@ def uu_encode(input, errors='strict', filename='<data>', mode=0o666):
read = infile.read
write = outfile.write
# Remove newline chars from filename
filename = filename.replace('\n','\\n')
filename = filename.replace('\r','\\r')
# Encode
write(('begin %o %s\n' % (mode & 0o777, filename)).encode('ascii'))
chunk = read(45)

3713
Lib/test/test_codecs.py vendored Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -159,6 +159,18 @@ impl CodecsRegistry {
"backslashreplace",
ctx.new_function("backslashreplace_errors", backslashreplace_errors),
),
(
"namereplace",
ctx.new_function("namereplace_errors", namereplace_errors),
),
(
"surrogatepass",
ctx.new_function("surrogatepass_errors", surrogatepass_errors),
),
(
"surrogateescape",
ctx.new_function("surrogateescape_errors", surrogateescape_errors),
),
];
let errors = std::array::IntoIter::new(errors)
.map(|(name, f)| (name.to_owned(), f))
@@ -428,3 +440,255 @@ fn backslashreplace_errors(err: PyObjectRef, vm: &VirtualMachine) -> PyResult<(S
}
Ok((out, range.end))
}
fn namereplace_errors(err: PyObjectRef, vm: &VirtualMachine) -> PyResult<(String, usize)> {
if err.isinstance(&vm.ctx.exceptions.unicode_encode_error) {
let range = extract_unicode_error_range(&err, vm)?;
let s = PyStrRef::try_from_object(vm, vm.get_attribute(err, "object")?)?;
let s_after_start =
crate::common::str::try_get_chars(s.as_str(), range.start..).unwrap_or("");
let num_chars = range.len();
let mut out = String::with_capacity(num_chars * 4);
for c in s_after_start.chars().take(num_chars) {
use std::fmt::Write;
let c_u32 = c as u32;
if let Some(c_name) = unicode_names2::name(c) {
write!(out, "\\N{{{}}}", c_name.to_string()).unwrap();
} else if c_u32 >= 0x10000 {
write!(out, "\\U{:08x}", c_u32).unwrap();
} else if c_u32 >= 0x100 {
write!(out, "\\u{:04x}", c_u32).unwrap();
} else {
write!(out, "\\x{:02x}", c_u32).unwrap();
}
}
Ok((out, range.end))
} else {
Err(bad_err_type(err, vm))
}
}
#[derive(Eq, PartialEq)]
enum StandardEncoding {
Utf8,
Utf16Be,
Utf16Le,
Utf32Be,
Utf32Le,
Unknown,
}
fn get_standard_encoding(encoding: &str) -> (usize, StandardEncoding) {
if let Some(encoding) = encoding.to_lowercase().strip_prefix("utf") {
let mut byte_length: usize = 0;
let mut standard_encoding = StandardEncoding::Unknown;
let encoding = encoding
.strip_prefix(|c| ['-', '_'].contains(&c))
.unwrap_or(encoding);
if encoding == "8" {
byte_length = 3;
standard_encoding = StandardEncoding::Utf8;
} else if let Some(encoding) = encoding.strip_prefix("16") {
byte_length = 2;
if encoding.is_empty() {
if cfg!(target_endian = "little") {
standard_encoding = StandardEncoding::Utf16Le;
} else if cfg!(target_endian = "big") {
standard_encoding = StandardEncoding::Utf16Be;
}
if standard_encoding != StandardEncoding::Unknown {
return (byte_length, standard_encoding);
}
}
let encoding = encoding
.strip_prefix(|c| ['-', '_'].contains(&c))
.unwrap_or(encoding);
standard_encoding = match encoding {
"be" => StandardEncoding::Utf16Be,
"le" => StandardEncoding::Utf16Le,
_ => StandardEncoding::Unknown,
}
} else if let Some(encoding) = encoding.strip_prefix("32") {
byte_length = 4;
if encoding.is_empty() {
if cfg!(target_endian = "little") {
standard_encoding = StandardEncoding::Utf32Le;
} else if cfg!(target_endian = "big") {
standard_encoding = StandardEncoding::Utf32Be;
}
if standard_encoding != StandardEncoding::Unknown {
return (byte_length, standard_encoding);
}
}
let encoding = encoding
.strip_prefix(|c| ['-', '_'].contains(&c))
.unwrap_or(encoding);
standard_encoding = match encoding {
"be" => StandardEncoding::Utf32Be,
"le" => StandardEncoding::Utf32Le,
_ => StandardEncoding::Unknown,
}
}
return (byte_length, standard_encoding);
} else if encoding.to_lowercase() == "CP_UTF8" {
return (3, StandardEncoding::Utf8);
}
(0, StandardEncoding::Unknown)
}
fn surrogatepass_errors(err: PyObjectRef, vm: &VirtualMachine) -> PyResult<(String, usize)> {
if err.isinstance(&vm.ctx.exceptions.unicode_encode_error) {
let range = extract_unicode_error_range(&err, vm)?;
let s = PyStrRef::try_from_object(vm, vm.get_attribute(err.clone(), "object")?)?;
let s_encoding = PyStrRef::try_from_object(vm, vm.get_attribute(err.clone(), "encoding")?)?;
let (_, standard_encoding) = get_standard_encoding(s_encoding.as_str());
if let StandardEncoding::Unknown = standard_encoding {
// Not supported, fail with original exception
return Err(err.downcast().unwrap());
}
let s_after_start =
crate::common::str::try_get_chars(s.as_str(), range.start..).unwrap_or("");
let num_chars = range.len();
let mut out = String::with_capacity(num_chars * 4);
for c in s_after_start.chars().take(num_chars).map(|x| x as u32) {
use std::fmt::Write;
if !(0xd800..=0xdfff).contains(&c) {
// Not a surrogate, fail with original exception
return Err(err.downcast().unwrap());
}
match standard_encoding {
StandardEncoding::Utf8 => {
write!(out, "\\x{:x?}", (0xe0 | (c >> 12))).unwrap();
write!(out, "\\x{:x?}", (0x80 | ((c >> 6) & 0x3f))).unwrap();
write!(out, "\\x{:x?}", (0x80 | (c & 0x3f))).unwrap();
}
StandardEncoding::Utf16Le => {
write!(out, "\\x{:x?}", c).unwrap();
write!(out, "\\x{:x?}", (c >> 8)).unwrap();
}
StandardEncoding::Utf16Be => {
write!(out, "\\x{:x?}", (c >> 8)).unwrap();
write!(out, "\\x{:x?}", c).unwrap();
}
StandardEncoding::Utf32Le => {
write!(out, "\\x{:x?}", c).unwrap();
write!(out, "\\x{:x?}", (c >> 8)).unwrap();
write!(out, "\\x{:x?}", (c >> 16)).unwrap();
write!(out, "\\x{:x?}", (c >> 24)).unwrap();
}
StandardEncoding::Utf32Be => {
write!(out, "\\x{:x?}", (c >> 24)).unwrap();
write!(out, "\\x{:x?}", (c >> 16)).unwrap();
write!(out, "\\x{:x?}", (c >> 8)).unwrap();
write!(out, "\\x{:x?}", c).unwrap();
}
StandardEncoding::Unknown => {
unreachable!("NOTE: RUSTPYTHON, should've bailed out earlier")
}
}
}
Ok((out, range.end))
} else if is_decode_err(&err, vm) {
let range = extract_unicode_error_range(&err, vm)?;
let s = PyStrRef::try_from_object(vm, vm.get_attribute(err.clone(), "object")?)?;
let s_encoding = PyStrRef::try_from_object(vm, vm.get_attribute(err.clone(), "encoding")?)?;
let (byte_length, standard_encoding) = get_standard_encoding(s_encoding.as_str());
if let StandardEncoding::Unknown = standard_encoding {
// Not supported, fail with original exception
return Err(err.downcast().unwrap());
}
let mut c: u32 = 0;
// Try decoding a single surrogate character. If there are more,
// let the codec call us again.
let s_after_start = crate::common::str::try_get_chars(s.as_str(), range.start..)
.unwrap_or("")
.as_bytes();
if s_after_start.len() - range.start >= byte_length {
match standard_encoding {
StandardEncoding::Utf8 => {
if (s_after_start[0] as u32 & 0xf0) == 0xe0
&& (s_after_start[1] as u32 & 0xc0) == 0x80
&& (s_after_start[2] as u32 & 0xc0) == 0x80
{
// it's a three-byte code
c = ((s_after_start[0] as u32 & 0x0f) << 12)
+ ((s_after_start[1] as u32 & 0x3f) << 6)
+ (s_after_start[2] as u32 & 0x3f);
}
}
StandardEncoding::Utf16Le => {
c = (s_after_start[1] as u32) << 8 | s_after_start[0] as u32;
}
StandardEncoding::Utf16Be => {
c = (s_after_start[0] as u32) << 8 | s_after_start[1] as u32;
}
StandardEncoding::Utf32Le => {
c = ((s_after_start[3] as u32) << 24)
| ((s_after_start[2] as u32) << 16)
| ((s_after_start[1] as u32) << 8)
| s_after_start[0] as u32;
}
StandardEncoding::Utf32Be => {
c = ((s_after_start[0] as u32) << 24)
| ((s_after_start[1] as u32) << 16)
| ((s_after_start[2] as u32) << 8)
| s_after_start[3] as u32;
}
StandardEncoding::Unknown => {
unreachable!("NOTE: RUSTPYTHON, should've bailed out earlier")
}
}
}
if !(0xd800..=0xdfff).contains(&c) {
// Not a surrogate, fail with original exception
return Err(err.downcast().unwrap());
}
Ok((format!("\\x{:x?}", c), range.start + byte_length))
} else {
Err(bad_err_type(err, vm))
}
}
fn surrogateescape_errors(err: PyObjectRef, vm: &VirtualMachine) -> PyResult<(String, usize)> {
if err.isinstance(&vm.ctx.exceptions.unicode_encode_error) {
let range = extract_unicode_error_range(&err, vm)?;
let s = PyStrRef::try_from_object(vm, vm.get_attribute(err.clone(), "object")?)?;
let s_after_start =
crate::common::str::try_get_chars(s.as_str(), range.start..).unwrap_or("");
let num_chars = range.len();
let mut out = String::with_capacity(num_chars * 4);
for c in s_after_start.chars().take(num_chars).map(|x| x as u32) {
use std::fmt::Write;
if !(0xd800..=0xdfff).contains(&c) {
// Not a UTF-8b surrogate, fail with original exception
return Err(err.downcast().unwrap());
}
write!(out, "#{}", c - 0xdc00).unwrap();
}
Ok((out, range.end))
} else if is_decode_err(&err, vm) {
let range = extract_unicode_error_range(&err, vm)?;
let s = PyStrRef::try_from_object(vm, vm.get_attribute(err.clone(), "object")?)?;
let s_after_start = crate::common::str::try_get_chars(s.as_str(), range.start..)
.unwrap_or("")
.as_bytes();
let mut consumed = 0;
let mut replace = String::with_capacity(4 * range.len());
while consumed < 4 && consumed < range.len() {
let c = s_after_start[consumed] as u32;
if c < 128 {
// Refuse to escape ASCII bytes
break;
}
use std::fmt::Write;
write!(replace, "#{}", 0xdc00 + c).unwrap();
consumed += 1;
}
if consumed == 0 {
return Err(err.downcast().unwrap());
}
Ok((replace, range.start + consumed))
} else {
Err(bad_err_type(err, vm))
}
}

View File

@@ -143,4 +143,32 @@ mod decl {
}
encoded
}
#[pyfunction]
fn a2b_uu(s: SerializedData, vm: &VirtualMachine) -> PyResult<Vec<u8>> {
s.with_ref(|b| {
let mut buf;
let b = if memchr::memchr(b'\n', b).is_some() {
buf = b.to_vec();
buf.retain(|c| *c != b'\n');
&buf
} else {
b
};
// TODO: RUSTPYTHON, implement actual uuencoding code
base64::decode(b)
})
.map_err(|err| vm.new_value_error(format!("error decoding uuencode: {}", err)))
}
#[pyfunction]
fn b2a_uu(data: ArgBytesLike, NewlineArg { newline }: NewlineArg) -> Vec<u8> {
#[allow(clippy::redundant_closure)] // https://stackoverflow.com/questions/63916821
// TODO: RUSTPYTHON, implement actual uuencoding code
let mut encoded = data.with_ref(|b| base64::encode(b)).into_bytes();
if newline {
encoded.push(b'\n');
}
encoded
}
}