use crate::{ anystr::{self, AnyStr, AnyStrContainer, AnyStrWrapper}, builtins::{ pystr, PyBaseExceptionRef, PyByteArray, PyBytes, PyBytesRef, PyInt, PyIntRef, PyStr, PyStrRef, PyTypeRef, }, byte::bytes_from_object, cformat::cformat_bytes, common::hash, function::{ArgIterable, Either, OptionalArg, OptionalOption, PyComparisonValue}, identifier, literal::escape::Escape, protocol::PyBuffer, sequence::{SequenceExt, SequenceMutExt}, types::PyComparisonOp, AsObject, PyObject, PyObjectRef, PyPayload, PyResult, TryFromBorrowedObject, VirtualMachine, }; use bstr::ByteSlice; use itertools::Itertools; use malachite_bigint::BigInt; use num_traits::ToPrimitive; #[derive(Debug, Default, Clone)] pub struct PyBytesInner { pub(super) elements: Vec, } impl From> for PyBytesInner { fn from(elements: Vec) -> PyBytesInner { Self { elements } } } impl<'a> TryFromBorrowedObject<'a> for PyBytesInner { fn try_from_borrowed_object(vm: &VirtualMachine, obj: &'a PyObject) -> PyResult { bytes_from_object(vm, obj).map(Self::from) } } #[derive(FromArgs)] pub struct ByteInnerNewOptions { #[pyarg(any, optional)] pub source: OptionalArg, #[pyarg(any, optional)] pub encoding: OptionalArg, #[pyarg(any, optional)] pub errors: OptionalArg, } impl ByteInnerNewOptions { fn get_value_from_string( s: PyStrRef, encoding: PyStrRef, errors: OptionalArg, vm: &VirtualMachine, ) -> PyResult { let bytes = pystr::encode_string(s, Some(encoding), errors.into_option(), vm)?; Ok(bytes.as_bytes().to_vec().into()) } fn get_value_from_source(source: PyObjectRef, vm: &VirtualMachine) -> PyResult { bytes_from_object(vm, &source).map(|x| x.into()) } fn get_value_from_size(size: PyIntRef, vm: &VirtualMachine) -> PyResult { let size = size.as_bigint().to_isize().ok_or_else(|| { vm.new_overflow_error("cannot fit 'int' into an index-sized integer".to_owned()) })?; let size = if size < 0 { return Err(vm.new_value_error("negative count".to_owned())); } else { size as usize }; Ok(vec![0; size].into()) } pub fn get_bytes(self, cls: PyTypeRef, vm: &VirtualMachine) -> PyResult { let inner = match (&self.source, &self.encoding, &self.errors) { (OptionalArg::Present(obj), OptionalArg::Missing, OptionalArg::Missing) => { let obj = obj.clone(); // construct an exact bytes from an exact bytes do not clone let obj = if cls.is(PyBytes::class(&vm.ctx)) { match obj.downcast_exact::(vm) { Ok(b) => return Ok(b.into_pyref()), Err(obj) => obj, } } else { obj }; if let Some(bytes_method) = vm.get_method(obj, identifier!(vm, __bytes__)) { // construct an exact bytes from __bytes__ slot. // if __bytes__ return a bytes, use the bytes object except we are the subclass of the bytes let bytes = bytes_method?.call((), vm)?; let bytes = if cls.is(PyBytes::class(&vm.ctx)) { match bytes.downcast::() { Ok(b) => return Ok(b), Err(bytes) => bytes, } } else { bytes }; Some(PyBytesInner::try_from_borrowed_object(vm, &bytes)) } else { None } } _ => None, } .unwrap_or_else(|| self.get_bytearray_inner(vm))?; PyBytes::from(inner).into_ref_with_type(vm, cls) } pub fn get_bytearray_inner(self, vm: &VirtualMachine) -> PyResult { const STRING_WITHOUT_ENCODING: &str = "string argument without an encoding"; const ENCODING_WITHOUT_STRING: &str = "encoding without a string argument"; match (self.source, self.encoding, self.errors) { (OptionalArg::Present(obj), OptionalArg::Missing, OptionalArg::Missing) => { match_class!(match obj { i @ PyInt => { Ok(Self::get_value_from_size(i, vm)?) } _s @ PyStr => Err(STRING_WITHOUT_ENCODING), obj => { Ok(Self::get_value_from_source(obj, vm)?) } }) } (OptionalArg::Present(obj), OptionalArg::Present(encoding), errors) => { if let Ok(s) = obj.downcast::() { Ok(Self::get_value_from_string(s, encoding, errors, vm)?) } else { Err(ENCODING_WITHOUT_STRING) } } (OptionalArg::Missing, OptionalArg::Missing, OptionalArg::Missing) => { Ok(PyBytesInner::default()) } (OptionalArg::Missing, OptionalArg::Present(_), _) => Err(ENCODING_WITHOUT_STRING), (OptionalArg::Missing, _, OptionalArg::Present(_)) => { Err("errors without a string argument") } (OptionalArg::Present(_), OptionalArg::Missing, OptionalArg::Present(_)) => { Err(STRING_WITHOUT_ENCODING) } } .map_err(|e| vm.new_type_error(e.to_owned())) } } #[derive(FromArgs)] pub struct ByteInnerFindOptions { #[pyarg(positional)] sub: Either, #[pyarg(positional, default)] start: Option, #[pyarg(positional, default)] end: Option, } impl ByteInnerFindOptions { pub fn get_value( self, len: usize, vm: &VirtualMachine, ) -> PyResult<(Vec, std::ops::Range)> { let sub = match self.sub { Either::A(v) => v.elements.to_vec(), Either::B(int) => vec![int.as_bigint().byte_or(vm)?], }; let range = anystr::adjust_indices(self.start, self.end, len); Ok((sub, range)) } } #[derive(FromArgs)] pub struct ByteInnerPaddingOptions { #[pyarg(positional)] width: isize, #[pyarg(positional, optional)] fillchar: OptionalArg, } impl ByteInnerPaddingOptions { fn get_value(self, fn_name: &str, vm: &VirtualMachine) -> PyResult<(isize, u8)> { let fillchar = if let OptionalArg::Present(v) = self.fillchar { try_as_bytes(v.clone(), |bytes| bytes.iter().copied().exactly_one().ok()) .flatten() .ok_or_else(|| { vm.new_type_error(format!( "{}() argument 2 must be a byte string of length 1, not {}", fn_name, v.class().name() )) })? } else { b' ' // default is space }; Ok((self.width, fillchar)) } } #[derive(FromArgs)] pub struct ByteInnerTranslateOptions { #[pyarg(positional)] table: Option, #[pyarg(any, optional)] delete: OptionalArg, } impl ByteInnerTranslateOptions { pub fn get_value(self, vm: &VirtualMachine) -> PyResult<(Vec, Vec)> { let table = self.table.map_or_else( || Ok((0..=255).collect::>()), |v| { let bytes = v .try_into_value::(vm) .ok() .filter(|v| v.elements.len() == 256) .ok_or_else(|| { vm.new_value_error( "translation table must be 256 characters long".to_owned(), ) })?; Ok(bytes.elements.to_vec()) }, )?; let delete = match self.delete { OptionalArg::Present(byte) => { let byte: PyBytesInner = byte.try_into_value(vm)?; byte.elements } _ => vec![], }; Ok((table, delete)) } } pub type ByteInnerSplitOptions = anystr::SplitArgs; impl PyBytesInner { #[inline] pub fn as_bytes(&self) -> &[u8] { &self.elements } fn new_repr_overflow_error(vm: &VirtualMachine) -> PyBaseExceptionRef { vm.new_overflow_error("bytes object is too large to make repr".to_owned()) } pub fn repr_with_name(&self, class_name: &str, vm: &VirtualMachine) -> PyResult { const DECORATION_LEN: isize = 2 + 3; // 2 for (), 3 for b"" => bytearray(b"") let escape = crate::literal::escape::AsciiEscape::new_repr(&self.elements); let len = escape .layout() .len .and_then(|len| (len as isize).checked_add(DECORATION_LEN + class_name.len() as isize)) .ok_or_else(|| Self::new_repr_overflow_error(vm))? as usize; let mut buf = String::with_capacity(len); buf.push_str(class_name); buf.push('('); escape.bytes_repr().write(&mut buf).unwrap(); buf.push(')'); debug_assert_eq!(buf.len(), len); Ok(buf) } pub fn repr_bytes(&self, vm: &VirtualMachine) -> PyResult { let escape = crate::literal::escape::AsciiEscape::new_repr(&self.elements); let len = 3 + escape .layout() .len .ok_or_else(|| Self::new_repr_overflow_error(vm))?; let mut buf = String::with_capacity(len); escape.bytes_repr().write(&mut buf).unwrap(); debug_assert_eq!(buf.len(), len); Ok(buf) } #[inline] pub fn len(&self) -> usize { self.elements.len() } #[inline] pub fn capacity(&self) -> usize { self.elements.capacity() } #[inline] pub fn is_empty(&self) -> bool { self.elements.is_empty() } pub fn cmp( &self, other: &PyObject, op: PyComparisonOp, vm: &VirtualMachine, ) -> PyComparisonValue { // TODO: bytes can compare with any object implemented buffer protocol // but not memoryview, and not equal if compare with unicode str(PyStr) PyComparisonValue::from_option( other .try_bytes_like(vm, |other| op.eval_ord(self.elements.as_slice().cmp(other))) .ok(), ) } pub fn hash(&self, vm: &VirtualMachine) -> hash::PyHash { vm.state.hash_secret.hash_bytes(&self.elements) } pub fn add(&self, other: &[u8]) -> Vec { self.elements.py_add(other) } pub fn contains( &self, needle: Either, vm: &VirtualMachine, ) -> PyResult { Ok(match needle { Either::A(byte) => self.elements.contains_str(byte.elements.as_slice()), Either::B(int) => self.elements.contains(&int.as_bigint().byte_or(vm)?), }) } pub fn isalnum(&self) -> bool { !self.elements.is_empty() && self .elements .iter() .all(|x| char::from(*x).is_alphanumeric()) } pub fn isalpha(&self) -> bool { !self.elements.is_empty() && self.elements.iter().all(|x| char::from(*x).is_alphabetic()) } pub fn isascii(&self) -> bool { self.elements.iter().all(|x| char::from(*x).is_ascii()) } pub fn isdigit(&self) -> bool { !self.elements.is_empty() && self .elements .iter() .all(|x| char::from(*x).is_ascii_digit()) } pub fn islower(&self) -> bool { self.elements .py_iscase(char::is_lowercase, char::is_uppercase) } pub fn isupper(&self) -> bool { self.elements .py_iscase(char::is_uppercase, char::is_lowercase) } pub fn isspace(&self) -> bool { !self.elements.is_empty() && self .elements .iter() .all(|x| char::from(*x).is_ascii_whitespace()) } pub fn istitle(&self) -> bool { if self.elements.is_empty() { return false; } let mut iter = self.elements.iter().peekable(); let mut prev_cased = false; while let Some(c) = iter.next() { let current = char::from(*c); let next = if let Some(k) = iter.peek() { char::from(**k) } else if current.is_uppercase() { return !prev_cased; } else { return prev_cased; }; let is_cased = current.to_uppercase().next().unwrap() != current || current.to_lowercase().next().unwrap() != current; if (is_cased && next.is_uppercase() && !prev_cased) || (!is_cased && next.is_lowercase()) { return false; } prev_cased = is_cased; } true } pub fn lower(&self) -> Vec { self.elements.to_ascii_lowercase() } pub fn upper(&self) -> Vec { self.elements.to_ascii_uppercase() } pub fn capitalize(&self) -> Vec { let mut new: Vec = Vec::with_capacity(self.elements.len()); if let Some((first, second)) = self.elements.split_first() { new.push(first.to_ascii_uppercase()); second.iter().for_each(|x| new.push(x.to_ascii_lowercase())); } new } pub fn swapcase(&self) -> Vec { let mut new: Vec = Vec::with_capacity(self.elements.len()); for w in &self.elements { match w { 65..=90 => new.push(w.to_ascii_lowercase()), 97..=122 => new.push(w.to_ascii_uppercase()), x => new.push(*x), } } new } pub fn hex( &self, sep: OptionalArg>, bytes_per_sep: OptionalArg, vm: &VirtualMachine, ) -> PyResult { bytes_to_hex(self.elements.as_slice(), sep, bytes_per_sep, vm) } pub fn fromhex(string: &str, vm: &VirtualMachine) -> PyResult> { let mut iter = string.bytes().enumerate(); let mut bytes: Vec = Vec::with_capacity(string.len() / 2); let i = loop { let (i, b) = match iter.next() { Some(val) => val, None => { return Ok(bytes); } }; if is_py_ascii_whitespace(b) { continue; } let top = match b { b'0'..=b'9' => b - b'0', b'a'..=b'f' => 10 + b - b'a', b'A'..=b'F' => 10 + b - b'A', _ => break i, }; let (i, b) = match iter.next() { Some(val) => val, None => break i + 1, }; let bot = match b { b'0'..=b'9' => b - b'0', b'a'..=b'f' => 10 + b - b'a', b'A'..=b'F' => 10 + b - b'A', _ => break i, }; bytes.push((top << 4) + bot); }; Err(vm.new_value_error(format!( "non-hexadecimal number found in fromhex() arg at position {i}" ))) } #[inline] fn _pad( &self, options: ByteInnerPaddingOptions, pad: fn(&[u8], usize, u8, usize) -> Vec, vm: &VirtualMachine, ) -> PyResult> { let (width, fillchar) = options.get_value("center", vm)?; Ok(if self.len() as isize >= width { Vec::from(&self.elements[..]) } else { pad(&self.elements, width as usize, fillchar, self.len()) }) } pub fn center( &self, options: ByteInnerPaddingOptions, vm: &VirtualMachine, ) -> PyResult> { self._pad(options, AnyStr::py_center, vm) } pub fn ljust( &self, options: ByteInnerPaddingOptions, vm: &VirtualMachine, ) -> PyResult> { self._pad(options, AnyStr::py_ljust, vm) } pub fn rjust( &self, options: ByteInnerPaddingOptions, vm: &VirtualMachine, ) -> PyResult> { self._pad(options, AnyStr::py_rjust, vm) } pub fn count(&self, options: ByteInnerFindOptions, vm: &VirtualMachine) -> PyResult { let (needle, range) = options.get_value(self.elements.len(), vm)?; Ok(self .elements .py_count(needle.as_slice(), range, |h, n| h.find_iter(n).count())) } pub fn join( &self, iterable: ArgIterable, vm: &VirtualMachine, ) -> PyResult> { let iter = iterable.iter(vm)?; self.elements.py_join(iter) } #[inline] pub fn find( &self, options: ByteInnerFindOptions, find: F, vm: &VirtualMachine, ) -> PyResult> where F: Fn(&[u8], &[u8]) -> Option, { let (needle, range) = options.get_value(self.elements.len(), vm)?; Ok(self.elements.py_find(&needle, range, find)) } pub fn maketrans( from: PyBytesInner, to: PyBytesInner, vm: &VirtualMachine, ) -> PyResult> { if from.len() != to.len() { return Err( vm.new_value_error("the two maketrans arguments must have equal length".to_owned()) ); } let mut res = vec![]; for i in 0..=255 { res.push(if let Some(position) = from.elements.find_byte(i) { to.elements[position] } else { i }); } Ok(res) } pub fn translate( &self, options: ByteInnerTranslateOptions, vm: &VirtualMachine, ) -> PyResult> { let (table, delete) = options.get_value(vm)?; let mut res = if delete.is_empty() { Vec::with_capacity(self.elements.len()) } else { Vec::new() }; for i in &self.elements { if !delete.contains(i) { res.push(table[*i as usize]); } } Ok(res) } pub fn strip(&self, chars: OptionalOption) -> Vec { self.elements .py_strip( chars, |s, chars| s.trim_with(|c| chars.contains(&(c as u8))), |s| s.trim(), ) .to_vec() } pub fn lstrip(&self, chars: OptionalOption) -> &[u8] { self.elements.py_strip( chars, |s, chars| s.trim_start_with(|c| chars.contains(&(c as u8))), |s| s.trim_start(), ) } pub fn rstrip(&self, chars: OptionalOption) -> &[u8] { self.elements.py_strip( chars, |s, chars| s.trim_end_with(|c| chars.contains(&(c as u8))), |s| s.trim_end(), ) } // new in Python 3.9 pub fn removeprefix(&self, prefix: PyBytesInner) -> Vec { self.elements .py_removeprefix(&prefix.elements, prefix.elements.len(), |s, p| { s.starts_with(p) }) .to_vec() } // new in Python 3.9 pub fn removesuffix(&self, suffix: PyBytesInner) -> Vec { self.elements .py_removesuffix(&suffix.elements, suffix.elements.len(), |s, p| { s.ends_with(p) }) .to_vec() } pub fn split( &self, options: ByteInnerSplitOptions, convert: F, vm: &VirtualMachine, ) -> PyResult> where F: Fn(&[u8], &VirtualMachine) -> PyObjectRef, { let elements = self.elements.py_split( options, vm, |v, s, vm| v.split_str(s).map(|v| convert(v, vm)).collect(), |v, s, n, vm| v.splitn_str(n, s).map(|v| convert(v, vm)).collect(), |v, n, vm| v.py_split_whitespace(n, |v| convert(v, vm)), )?; Ok(elements) } pub fn rsplit( &self, options: ByteInnerSplitOptions, convert: F, vm: &VirtualMachine, ) -> PyResult> where F: Fn(&[u8], &VirtualMachine) -> PyObjectRef, { let mut elements = self.elements.py_split( options, vm, |v, s, vm| v.rsplit_str(s).map(|v| convert(v, vm)).collect(), |v, s, n, vm| v.rsplitn_str(n, s).map(|v| convert(v, vm)).collect(), |v, n, vm| v.py_rsplit_whitespace(n, |v| convert(v, vm)), )?; elements.reverse(); Ok(elements) } pub fn partition( &self, sub: &PyBytesInner, vm: &VirtualMachine, ) -> PyResult<(Vec, bool, Vec)> { self.elements.py_partition( &sub.elements, || self.elements.splitn_str(2, &sub.elements), vm, ) } pub fn rpartition( &self, sub: &PyBytesInner, vm: &VirtualMachine, ) -> PyResult<(Vec, bool, Vec)> { self.elements.py_partition( &sub.elements, || self.elements.rsplitn_str(2, &sub.elements), vm, ) } pub fn expandtabs(&self, options: anystr::ExpandTabsArgs) -> Vec { let tabsize = options.tabsize(); let mut counter: usize = 0; let mut res = vec![]; if tabsize == 0 { return self .elements .iter() .copied() .filter(|x| *x != b'\t') .collect(); } for i in &self.elements { if *i == b'\t' { let len = tabsize - counter % tabsize; res.extend_from_slice(&vec![b' '; len]); counter += len; } else { res.push(*i); if *i == b'\r' || *i == b'\n' { counter = 0; } else { counter += 1; } } } res } pub fn splitlines(&self, options: anystr::SplitLinesArgs, into_wrapper: FW) -> Vec where FW: Fn(&[u8]) -> W, { self.elements.py_bytes_splitlines(options, into_wrapper) } pub fn zfill(&self, width: isize) -> Vec { self.elements.py_zfill(width) } // len(self)>=1, from="", len(to)>=1, maxcount>=1 fn replace_interleave(&self, to: PyBytesInner, maxcount: Option) -> Vec { let place_count = self.elements.len() + 1; let count = maxcount.map_or(place_count, |v| std::cmp::min(v, place_count)) - 1; let capacity = self.elements.len() + count * to.len(); let mut result = Vec::with_capacity(capacity); let to_slice = to.elements.as_slice(); result.extend_from_slice(to_slice); for c in &self.elements[..count] { result.push(*c); result.extend_from_slice(to_slice); } result.extend_from_slice(&self.elements[count..]); result } fn replace_delete(&self, from: PyBytesInner, maxcount: Option) -> Vec { let count = count_substring(self.elements.as_slice(), from.elements.as_slice(), maxcount); if count == 0 { // no matches return self.elements.clone(); } let result_len = self.len() - (count * from.len()); debug_assert!(self.len() >= count * from.len()); let mut result = Vec::with_capacity(result_len); let mut last_end = 0; let mut count = count; for offset in self.elements.find_iter(&from.elements) { result.extend_from_slice(&self.elements[last_end..offset]); last_end = offset + from.len(); count -= 1; if count == 0 { break; } } result.extend_from_slice(&self.elements[last_end..]); result } pub fn replace_in_place( &self, from: PyBytesInner, to: PyBytesInner, maxcount: Option, ) -> Vec { let len = from.len(); let mut iter = self.elements.find_iter(&from.elements); let mut new = if let Some(offset) = iter.next() { let mut new = self.elements.clone(); new[offset..offset + len].clone_from_slice(to.elements.as_slice()); if maxcount == Some(1) { return new; } else { new } } else { return self.elements.clone(); }; let mut count = maxcount.unwrap_or(usize::MAX) - 1; for offset in iter { new[offset..offset + len].clone_from_slice(to.elements.as_slice()); count -= 1; if count == 0 { break; } } new } fn replace_general( &self, from: PyBytesInner, to: PyBytesInner, maxcount: Option, vm: &VirtualMachine, ) -> PyResult> { let count = count_substring(self.elements.as_slice(), from.elements.as_slice(), maxcount); if count == 0 { // no matches, return unchanged return Ok(self.elements.clone()); } // Check for overflow // result_len = self_len + count * (to_len-from_len) debug_assert!(count > 0); if to.len() as isize - from.len() as isize > (isize::MAX - self.elements.len() as isize) / count as isize { return Err(vm.new_overflow_error("replace bytes is too long".to_owned())); } let result_len = (self.elements.len() as isize + count as isize * (to.len() as isize - from.len() as isize)) as usize; let mut result = Vec::with_capacity(result_len); let mut last_end = 0; let mut count = count; for offset in self.elements.find_iter(&from.elements) { result.extend_from_slice(&self.elements[last_end..offset]); result.extend_from_slice(to.elements.as_slice()); last_end = offset + from.len(); count -= 1; if count == 0 { break; } } result.extend_from_slice(&self.elements[last_end..]); Ok(result) } pub fn replace( &self, from: PyBytesInner, to: PyBytesInner, maxcount: OptionalArg, vm: &VirtualMachine, ) -> PyResult> { // stringlib_replace in CPython let maxcount = match maxcount { OptionalArg::Present(maxcount) if maxcount >= 0 => { if maxcount == 0 || (self.elements.is_empty() && !from.is_empty()) { // nothing to do; return the original bytes return Ok(self.elements.clone()); } else if self.elements.is_empty() && from.is_empty() { return Ok(to.elements); } Some(maxcount as usize) } _ => None, }; // Handle zero-length special cases if from.elements.is_empty() { if to.elements.is_empty() { // nothing to do; return the original bytes return Ok(self.elements.clone()); } // insert the 'to' bytes everywhere. // >>> b"Python".replace(b"", b".") // b'.P.y.t.h.o.n.' return Ok(self.replace_interleave(to, maxcount)); } // Except for b"".replace(b"", b"A") == b"A" there is no way beyond this // point for an empty self bytes to generate a non-empty bytes // Special case so the remaining code always gets a non-empty bytes if self.elements.is_empty() { return Ok(self.elements.clone()); } if to.elements.is_empty() { // delete all occurrences of 'from' bytes Ok(self.replace_delete(from, maxcount)) } else if from.len() == to.len() { // Handle special case where both bytes have the same length Ok(self.replace_in_place(from, to, maxcount)) } else { // Otherwise use the more generic algorithms self.replace_general(from, to, maxcount, vm) } } pub fn title(&self) -> Vec { let mut res = vec![]; let mut spaced = true; for i in &self.elements { match i { 65..=90 | 97..=122 => { if spaced { res.push(i.to_ascii_uppercase()); spaced = false } else { res.push(i.to_ascii_lowercase()); } } _ => { res.push(*i); spaced = true } } } res } pub fn cformat(&self, values: PyObjectRef, vm: &VirtualMachine) -> PyResult> { cformat_bytes(vm, self.elements.as_slice(), values) } pub fn mul(&self, n: isize, vm: &VirtualMachine) -> PyResult> { self.elements.mul(vm, n) } pub fn imul(&mut self, n: isize, vm: &VirtualMachine) -> PyResult<()> { self.elements.imul(vm, n) } pub fn concat(&self, other: &PyObject, vm: &VirtualMachine) -> PyResult> { let buffer = PyBuffer::try_from_borrowed_object(vm, other)?; let borrowed = buffer.as_contiguous(); if let Some(other) = borrowed { let mut v = Vec::with_capacity(self.elements.len() + other.len()); v.extend_from_slice(&self.elements); v.extend_from_slice(&other); Ok(v) } else { let mut v = self.elements.clone(); buffer.append_to(&mut v); Ok(v) } } } pub fn try_as_bytes(obj: PyObjectRef, f: F) -> Option where F: Fn(&[u8]) -> R, { match_class!(match obj { i @ PyBytes => Some(f(i.as_bytes())), j @ PyByteArray => Some(f(&j.borrow_buf())), _ => None, }) } #[inline] fn count_substring(haystack: &[u8], needle: &[u8], maxcount: Option) -> usize { let substrings = haystack.find_iter(needle); if let Some(maxcount) = maxcount { std::cmp::min(substrings.take(maxcount).count(), maxcount) } else { substrings.count() } } pub trait ByteOr: ToPrimitive { fn byte_or(&self, vm: &VirtualMachine) -> PyResult { match self.to_u8() { Some(value) => Ok(value), None => Err(vm.new_value_error("byte must be in range(0, 256)".to_owned())), } } } impl ByteOr for BigInt {} impl AnyStrWrapper for PyBytesInner { type Str = [u8]; fn as_ref(&self) -> &[u8] { &self.elements } } impl AnyStrContainer<[u8]> for Vec { fn new() -> Self { Vec::new() } fn with_capacity(capacity: usize) -> Self { Vec::with_capacity(capacity) } fn push_str(&mut self, other: &[u8]) { self.extend(other) } } const ASCII_WHITESPACES: [u8; 6] = [0x20, 0x09, 0x0a, 0x0c, 0x0d, 0x0b]; impl AnyStr for [u8] { type Char = u8; type Container = Vec; fn element_bytes_len(_: u8) -> usize { 1 } fn to_container(&self) -> Self::Container { self.to_vec() } fn as_bytes(&self) -> &[u8] { self } fn as_utf8_str(&self) -> Result<&str, std::str::Utf8Error> { std::str::from_utf8(self) } fn chars(&self) -> impl Iterator { bstr::ByteSlice::chars(self) } fn elements(&self) -> impl Iterator { self.iter().copied() } fn get_bytes(&self, range: std::ops::Range) -> &Self { &self[range] } fn get_chars(&self, range: std::ops::Range) -> &Self { &self[range] } fn is_empty(&self) -> bool { Self::is_empty(self) } fn bytes_len(&self) -> usize { Self::len(self) } fn py_split_whitespace(&self, maxsplit: isize, convert: F) -> Vec where F: Fn(&Self) -> PyObjectRef, { let mut splits = Vec::new(); let mut count = maxsplit; let mut haystack = self; while let Some(offset) = haystack.find_byteset(ASCII_WHITESPACES) { if offset != 0 { if count == 0 { break; } splits.push(convert(&haystack[..offset])); count -= 1; } haystack = &haystack[offset + 1..]; } if !haystack.is_empty() { splits.push(convert(haystack)); } splits } fn py_rsplit_whitespace(&self, maxsplit: isize, convert: F) -> Vec where F: Fn(&Self) -> PyObjectRef, { let mut splits = Vec::new(); let mut count = maxsplit; let mut haystack = self; while let Some(offset) = haystack.rfind_byteset(ASCII_WHITESPACES) { if offset + 1 != haystack.len() { if count == 0 { break; } splits.push(convert(&haystack[offset + 1..])); count -= 1; } haystack = &haystack[..offset]; } if !haystack.is_empty() { splits.push(convert(haystack)); } splits } } #[derive(FromArgs)] pub struct DecodeArgs { #[pyarg(any, default)] encoding: Option, #[pyarg(any, default)] errors: Option, } pub fn bytes_decode( zelf: PyObjectRef, args: DecodeArgs, vm: &VirtualMachine, ) -> PyResult { let DecodeArgs { encoding, errors } = args; let encoding = encoding .as_ref() .map_or(crate::codecs::DEFAULT_ENCODING, |s| s.as_str()); vm.state .codec_registry .decode_text(zelf, encoding, errors, vm) } fn hex_impl_no_sep(bytes: &[u8]) -> String { let mut buf: Vec = vec![0; bytes.len() * 2]; hex::encode_to_slice(bytes, buf.as_mut_slice()).unwrap(); unsafe { String::from_utf8_unchecked(buf) } } fn hex_impl(bytes: &[u8], sep: u8, bytes_per_sep: isize) -> String { let len = bytes.len(); let buf = if bytes_per_sep < 0 { let bytes_per_sep = std::cmp::min(len, (-bytes_per_sep) as usize); let chunks = (len - 1) / bytes_per_sep; let chunked = chunks * bytes_per_sep; let unchunked = len - chunked; let mut buf = vec![0; len * 2 + chunks]; let mut j = 0; for i in (0..chunks).map(|i| i * bytes_per_sep) { hex::encode_to_slice( &bytes[i..i + bytes_per_sep], &mut buf[j..j + bytes_per_sep * 2], ) .unwrap(); j += bytes_per_sep * 2; buf[j] = sep; j += 1; } hex::encode_to_slice(&bytes[chunked..], &mut buf[j..j + unchunked * 2]).unwrap(); buf } else { let bytes_per_sep = std::cmp::min(len, bytes_per_sep as usize); let chunks = (len - 1) / bytes_per_sep; let chunked = chunks * bytes_per_sep; let unchunked = len - chunked; let mut buf = vec![0; len * 2 + chunks]; hex::encode_to_slice(&bytes[..unchunked], &mut buf[..unchunked * 2]).unwrap(); let mut j = unchunked * 2; for i in (0..chunks).map(|i| i * bytes_per_sep + unchunked) { buf[j] = sep; j += 1; hex::encode_to_slice( &bytes[i..i + bytes_per_sep], &mut buf[j..j + bytes_per_sep * 2], ) .unwrap(); j += bytes_per_sep * 2; } buf }; unsafe { String::from_utf8_unchecked(buf) } } pub fn bytes_to_hex( bytes: &[u8], sep: OptionalArg>, bytes_per_sep: OptionalArg, vm: &VirtualMachine, ) -> PyResult { if bytes.is_empty() { return Ok("".to_owned()); } if let OptionalArg::Present(sep) = sep { let bytes_per_sep = bytes_per_sep.unwrap_or(1); if bytes_per_sep == 0 { return Ok(hex_impl_no_sep(bytes)); } let s_guard; let b_guard; let sep = match &sep { Either::A(s) => { s_guard = s.as_str(); s_guard.as_bytes() } Either::B(bytes) => { b_guard = bytes.as_bytes(); b_guard } }; if sep.len() != 1 { return Err(vm.new_value_error("sep must be length 1.".to_owned())); } let sep = sep[0]; if sep > 127 { return Err(vm.new_value_error("sep must be ASCII.".to_owned())); } Ok(hex_impl(bytes, sep, bytes_per_sep)) } else { Ok(hex_impl_no_sep(bytes)) } } pub const fn is_py_ascii_whitespace(b: u8) -> bool { matches!(b, b'\t' | b'\n' | b'\x0C' | b'\r' | b' ' | b'\x0B') }