mirror of
https://github.com/RustPython/RustPython.git
synced 2026-06-02 19:39:49 +09:00
1235 lines
37 KiB
Rust
1235 lines
37 KiB
Rust
use crate::{
|
|
anystr::{self, AnyStr, AnyStrContainer, AnyStrWrapper},
|
|
builtins::{
|
|
pystr, PyBaseExceptionRef, PyByteArray, PyBytes, PyBytesRef, PyInt, PyIntRef, PyStr,
|
|
PyStrRef, PyTypeRef,
|
|
},
|
|
byte::bytes_from_object,
|
|
cformat::cformat_bytes,
|
|
common::hash,
|
|
function::{ArgIterable, Either, OptionalArg, OptionalOption, PyComparisonValue},
|
|
identifier,
|
|
literal::escape::Escape,
|
|
protocol::PyBuffer,
|
|
sequence::{SequenceExt, SequenceMutExt},
|
|
types::PyComparisonOp,
|
|
AsObject, PyObject, PyObjectRef, PyPayload, PyResult, TryFromBorrowedObject, VirtualMachine,
|
|
};
|
|
use bstr::ByteSlice;
|
|
use itertools::Itertools;
|
|
use malachite_bigint::BigInt;
|
|
use num_traits::ToPrimitive;
|
|
|
|
#[derive(Debug, Default, Clone)]
|
|
pub struct PyBytesInner {
|
|
pub(super) elements: Vec<u8>,
|
|
}
|
|
|
|
impl From<Vec<u8>> for PyBytesInner {
|
|
fn from(elements: Vec<u8>) -> PyBytesInner {
|
|
Self { elements }
|
|
}
|
|
}
|
|
|
|
impl<'a> TryFromBorrowedObject<'a> for PyBytesInner {
|
|
fn try_from_borrowed_object(vm: &VirtualMachine, obj: &'a PyObject) -> PyResult<Self> {
|
|
bytes_from_object(vm, obj).map(Self::from)
|
|
}
|
|
}
|
|
|
|
#[derive(FromArgs)]
|
|
pub struct ByteInnerNewOptions {
|
|
#[pyarg(any, optional)]
|
|
pub source: OptionalArg<PyObjectRef>,
|
|
#[pyarg(any, optional)]
|
|
pub encoding: OptionalArg<PyStrRef>,
|
|
#[pyarg(any, optional)]
|
|
pub errors: OptionalArg<PyStrRef>,
|
|
}
|
|
|
|
impl ByteInnerNewOptions {
|
|
fn get_value_from_string(
|
|
s: PyStrRef,
|
|
encoding: PyStrRef,
|
|
errors: OptionalArg<PyStrRef>,
|
|
vm: &VirtualMachine,
|
|
) -> PyResult<PyBytesInner> {
|
|
let bytes = pystr::encode_string(s, Some(encoding), errors.into_option(), vm)?;
|
|
Ok(bytes.as_bytes().to_vec().into())
|
|
}
|
|
|
|
fn get_value_from_source(source: PyObjectRef, vm: &VirtualMachine) -> PyResult<PyBytesInner> {
|
|
bytes_from_object(vm, &source).map(|x| x.into())
|
|
}
|
|
|
|
fn get_value_from_size(size: PyIntRef, vm: &VirtualMachine) -> PyResult<PyBytesInner> {
|
|
let size = size.as_bigint().to_isize().ok_or_else(|| {
|
|
vm.new_overflow_error("cannot fit 'int' into an index-sized integer".to_owned())
|
|
})?;
|
|
let size = if size < 0 {
|
|
return Err(vm.new_value_error("negative count".to_owned()));
|
|
} else {
|
|
size as usize
|
|
};
|
|
Ok(vec![0; size].into())
|
|
}
|
|
|
|
pub fn get_bytes(self, cls: PyTypeRef, vm: &VirtualMachine) -> PyResult<PyBytesRef> {
|
|
let inner = match (&self.source, &self.encoding, &self.errors) {
|
|
(OptionalArg::Present(obj), OptionalArg::Missing, OptionalArg::Missing) => {
|
|
let obj = obj.clone();
|
|
// construct an exact bytes from an exact bytes do not clone
|
|
let obj = if cls.is(PyBytes::class(&vm.ctx)) {
|
|
match obj.downcast_exact::<PyBytes>(vm) {
|
|
Ok(b) => return Ok(b.into_pyref()),
|
|
Err(obj) => obj,
|
|
}
|
|
} else {
|
|
obj
|
|
};
|
|
|
|
if let Some(bytes_method) = vm.get_method(obj, identifier!(vm, __bytes__)) {
|
|
// construct an exact bytes from __bytes__ slot.
|
|
// if __bytes__ return a bytes, use the bytes object except we are the subclass of the bytes
|
|
let bytes = bytes_method?.call((), vm)?;
|
|
let bytes = if cls.is(PyBytes::class(&vm.ctx)) {
|
|
match bytes.downcast::<PyBytes>() {
|
|
Ok(b) => return Ok(b),
|
|
Err(bytes) => bytes,
|
|
}
|
|
} else {
|
|
bytes
|
|
};
|
|
Some(PyBytesInner::try_from_borrowed_object(vm, &bytes))
|
|
} else {
|
|
None
|
|
}
|
|
}
|
|
_ => None,
|
|
}
|
|
.unwrap_or_else(|| self.get_bytearray_inner(vm))?;
|
|
PyBytes::from(inner).into_ref_with_type(vm, cls)
|
|
}
|
|
|
|
pub fn get_bytearray_inner(self, vm: &VirtualMachine) -> PyResult<PyBytesInner> {
|
|
const STRING_WITHOUT_ENCODING: &str = "string argument without an encoding";
|
|
const ENCODING_WITHOUT_STRING: &str = "encoding without a string argument";
|
|
|
|
match (self.source, self.encoding, self.errors) {
|
|
(OptionalArg::Present(obj), OptionalArg::Missing, OptionalArg::Missing) => {
|
|
match_class!(match obj {
|
|
i @ PyInt => {
|
|
Ok(Self::get_value_from_size(i, vm)?)
|
|
}
|
|
_s @ PyStr => Err(STRING_WITHOUT_ENCODING),
|
|
obj => {
|
|
Ok(Self::get_value_from_source(obj, vm)?)
|
|
}
|
|
})
|
|
}
|
|
(OptionalArg::Present(obj), OptionalArg::Present(encoding), errors) => {
|
|
if let Ok(s) = obj.downcast::<PyStr>() {
|
|
Ok(Self::get_value_from_string(s, encoding, errors, vm)?)
|
|
} else {
|
|
Err(ENCODING_WITHOUT_STRING)
|
|
}
|
|
}
|
|
(OptionalArg::Missing, OptionalArg::Missing, OptionalArg::Missing) => {
|
|
Ok(PyBytesInner::default())
|
|
}
|
|
(OptionalArg::Missing, OptionalArg::Present(_), _) => Err(ENCODING_WITHOUT_STRING),
|
|
(OptionalArg::Missing, _, OptionalArg::Present(_)) => {
|
|
Err("errors without a string argument")
|
|
}
|
|
(OptionalArg::Present(_), OptionalArg::Missing, OptionalArg::Present(_)) => {
|
|
Err(STRING_WITHOUT_ENCODING)
|
|
}
|
|
}
|
|
.map_err(|e| vm.new_type_error(e.to_owned()))
|
|
}
|
|
}
|
|
|
|
#[derive(FromArgs)]
|
|
pub struct ByteInnerFindOptions {
|
|
#[pyarg(positional)]
|
|
sub: Either<PyBytesInner, PyIntRef>,
|
|
#[pyarg(positional, default)]
|
|
start: Option<PyIntRef>,
|
|
#[pyarg(positional, default)]
|
|
end: Option<PyIntRef>,
|
|
}
|
|
|
|
impl ByteInnerFindOptions {
|
|
pub fn get_value(
|
|
self,
|
|
len: usize,
|
|
vm: &VirtualMachine,
|
|
) -> PyResult<(Vec<u8>, std::ops::Range<usize>)> {
|
|
let sub = match self.sub {
|
|
Either::A(v) => v.elements.to_vec(),
|
|
Either::B(int) => vec![int.as_bigint().byte_or(vm)?],
|
|
};
|
|
let range = anystr::adjust_indices(self.start, self.end, len);
|
|
Ok((sub, range))
|
|
}
|
|
}
|
|
|
|
#[derive(FromArgs)]
|
|
pub struct ByteInnerPaddingOptions {
|
|
#[pyarg(positional)]
|
|
width: isize,
|
|
#[pyarg(positional, optional)]
|
|
fillchar: OptionalArg<PyObjectRef>,
|
|
}
|
|
|
|
impl ByteInnerPaddingOptions {
|
|
fn get_value(self, fn_name: &str, vm: &VirtualMachine) -> PyResult<(isize, u8)> {
|
|
let fillchar = if let OptionalArg::Present(v) = self.fillchar {
|
|
try_as_bytes(v.clone(), |bytes| bytes.iter().copied().exactly_one().ok())
|
|
.flatten()
|
|
.ok_or_else(|| {
|
|
vm.new_type_error(format!(
|
|
"{}() argument 2 must be a byte string of length 1, not {}",
|
|
fn_name,
|
|
v.class().name()
|
|
))
|
|
})?
|
|
} else {
|
|
b' ' // default is space
|
|
};
|
|
|
|
Ok((self.width, fillchar))
|
|
}
|
|
}
|
|
|
|
#[derive(FromArgs)]
|
|
pub struct ByteInnerTranslateOptions {
|
|
#[pyarg(positional)]
|
|
table: Option<PyObjectRef>,
|
|
#[pyarg(any, optional)]
|
|
delete: OptionalArg<PyObjectRef>,
|
|
}
|
|
|
|
impl ByteInnerTranslateOptions {
|
|
pub fn get_value(self, vm: &VirtualMachine) -> PyResult<(Vec<u8>, Vec<u8>)> {
|
|
let table = self.table.map_or_else(
|
|
|| Ok((0..=255).collect::<Vec<u8>>()),
|
|
|v| {
|
|
let bytes = v
|
|
.try_into_value::<PyBytesInner>(vm)
|
|
.ok()
|
|
.filter(|v| v.elements.len() == 256)
|
|
.ok_or_else(|| {
|
|
vm.new_value_error(
|
|
"translation table must be 256 characters long".to_owned(),
|
|
)
|
|
})?;
|
|
Ok(bytes.elements.to_vec())
|
|
},
|
|
)?;
|
|
|
|
let delete = match self.delete {
|
|
OptionalArg::Present(byte) => {
|
|
let byte: PyBytesInner = byte.try_into_value(vm)?;
|
|
byte.elements
|
|
}
|
|
_ => vec![],
|
|
};
|
|
|
|
Ok((table, delete))
|
|
}
|
|
}
|
|
|
|
pub type ByteInnerSplitOptions = anystr::SplitArgs<PyBytesInner>;
|
|
|
|
impl PyBytesInner {
|
|
#[inline]
|
|
pub fn as_bytes(&self) -> &[u8] {
|
|
&self.elements
|
|
}
|
|
|
|
fn new_repr_overflow_error(vm: &VirtualMachine) -> PyBaseExceptionRef {
|
|
vm.new_overflow_error("bytes object is too large to make repr".to_owned())
|
|
}
|
|
|
|
pub fn repr_with_name(&self, class_name: &str, vm: &VirtualMachine) -> PyResult<String> {
|
|
const DECORATION_LEN: isize = 2 + 3; // 2 for (), 3 for b"" => bytearray(b"")
|
|
let escape = crate::literal::escape::AsciiEscape::new_repr(&self.elements);
|
|
let len = escape
|
|
.layout()
|
|
.len
|
|
.and_then(|len| (len as isize).checked_add(DECORATION_LEN + class_name.len() as isize))
|
|
.ok_or_else(|| Self::new_repr_overflow_error(vm))? as usize;
|
|
let mut buf = String::with_capacity(len);
|
|
buf.push_str(class_name);
|
|
buf.push('(');
|
|
escape.bytes_repr().write(&mut buf).unwrap();
|
|
buf.push(')');
|
|
debug_assert_eq!(buf.len(), len);
|
|
Ok(buf)
|
|
}
|
|
|
|
pub fn repr_bytes(&self, vm: &VirtualMachine) -> PyResult<String> {
|
|
let escape = crate::literal::escape::AsciiEscape::new_repr(&self.elements);
|
|
let len = 3 + escape
|
|
.layout()
|
|
.len
|
|
.ok_or_else(|| Self::new_repr_overflow_error(vm))?;
|
|
let mut buf = String::with_capacity(len);
|
|
escape.bytes_repr().write(&mut buf).unwrap();
|
|
debug_assert_eq!(buf.len(), len);
|
|
Ok(buf)
|
|
}
|
|
|
|
#[inline]
|
|
pub fn len(&self) -> usize {
|
|
self.elements.len()
|
|
}
|
|
|
|
#[inline]
|
|
pub fn capacity(&self) -> usize {
|
|
self.elements.capacity()
|
|
}
|
|
|
|
#[inline]
|
|
pub fn is_empty(&self) -> bool {
|
|
self.elements.is_empty()
|
|
}
|
|
|
|
pub fn cmp(
|
|
&self,
|
|
other: &PyObject,
|
|
op: PyComparisonOp,
|
|
vm: &VirtualMachine,
|
|
) -> PyComparisonValue {
|
|
// TODO: bytes can compare with any object implemented buffer protocol
|
|
// but not memoryview, and not equal if compare with unicode str(PyStr)
|
|
PyComparisonValue::from_option(
|
|
other
|
|
.try_bytes_like(vm, |other| op.eval_ord(self.elements.as_slice().cmp(other)))
|
|
.ok(),
|
|
)
|
|
}
|
|
|
|
pub fn hash(&self, vm: &VirtualMachine) -> hash::PyHash {
|
|
vm.state.hash_secret.hash_bytes(&self.elements)
|
|
}
|
|
|
|
pub fn add(&self, other: &[u8]) -> Vec<u8> {
|
|
self.elements.py_add(other)
|
|
}
|
|
|
|
pub fn contains(
|
|
&self,
|
|
needle: Either<PyBytesInner, PyIntRef>,
|
|
vm: &VirtualMachine,
|
|
) -> PyResult<bool> {
|
|
Ok(match needle {
|
|
Either::A(byte) => self.elements.contains_str(byte.elements.as_slice()),
|
|
Either::B(int) => self.elements.contains(&int.as_bigint().byte_or(vm)?),
|
|
})
|
|
}
|
|
|
|
pub fn isalnum(&self) -> bool {
|
|
!self.elements.is_empty()
|
|
&& self
|
|
.elements
|
|
.iter()
|
|
.all(|x| char::from(*x).is_alphanumeric())
|
|
}
|
|
|
|
pub fn isalpha(&self) -> bool {
|
|
!self.elements.is_empty() && self.elements.iter().all(|x| char::from(*x).is_alphabetic())
|
|
}
|
|
|
|
pub fn isascii(&self) -> bool {
|
|
self.elements.iter().all(|x| char::from(*x).is_ascii())
|
|
}
|
|
|
|
pub fn isdigit(&self) -> bool {
|
|
!self.elements.is_empty()
|
|
&& self
|
|
.elements
|
|
.iter()
|
|
.all(|x| char::from(*x).is_ascii_digit())
|
|
}
|
|
|
|
pub fn islower(&self) -> bool {
|
|
self.elements
|
|
.py_iscase(char::is_lowercase, char::is_uppercase)
|
|
}
|
|
|
|
pub fn isupper(&self) -> bool {
|
|
self.elements
|
|
.py_iscase(char::is_uppercase, char::is_lowercase)
|
|
}
|
|
|
|
pub fn isspace(&self) -> bool {
|
|
!self.elements.is_empty()
|
|
&& self
|
|
.elements
|
|
.iter()
|
|
.all(|x| char::from(*x).is_ascii_whitespace())
|
|
}
|
|
|
|
pub fn istitle(&self) -> bool {
|
|
if self.elements.is_empty() {
|
|
return false;
|
|
}
|
|
|
|
let mut iter = self.elements.iter().peekable();
|
|
let mut prev_cased = false;
|
|
|
|
while let Some(c) = iter.next() {
|
|
let current = char::from(*c);
|
|
let next = if let Some(k) = iter.peek() {
|
|
char::from(**k)
|
|
} else if current.is_uppercase() {
|
|
return !prev_cased;
|
|
} else {
|
|
return prev_cased;
|
|
};
|
|
|
|
let is_cased = current.to_uppercase().next().unwrap() != current
|
|
|| current.to_lowercase().next().unwrap() != current;
|
|
if (is_cased && next.is_uppercase() && !prev_cased)
|
|
|| (!is_cased && next.is_lowercase())
|
|
{
|
|
return false;
|
|
}
|
|
|
|
prev_cased = is_cased;
|
|
}
|
|
|
|
true
|
|
}
|
|
|
|
pub fn lower(&self) -> Vec<u8> {
|
|
self.elements.to_ascii_lowercase()
|
|
}
|
|
|
|
pub fn upper(&self) -> Vec<u8> {
|
|
self.elements.to_ascii_uppercase()
|
|
}
|
|
|
|
pub fn capitalize(&self) -> Vec<u8> {
|
|
let mut new: Vec<u8> = Vec::with_capacity(self.elements.len());
|
|
if let Some((first, second)) = self.elements.split_first() {
|
|
new.push(first.to_ascii_uppercase());
|
|
second.iter().for_each(|x| new.push(x.to_ascii_lowercase()));
|
|
}
|
|
new
|
|
}
|
|
|
|
pub fn swapcase(&self) -> Vec<u8> {
|
|
let mut new: Vec<u8> = Vec::with_capacity(self.elements.len());
|
|
for w in &self.elements {
|
|
match w {
|
|
65..=90 => new.push(w.to_ascii_lowercase()),
|
|
97..=122 => new.push(w.to_ascii_uppercase()),
|
|
x => new.push(*x),
|
|
}
|
|
}
|
|
new
|
|
}
|
|
|
|
pub fn hex(
|
|
&self,
|
|
sep: OptionalArg<Either<PyStrRef, PyBytesRef>>,
|
|
bytes_per_sep: OptionalArg<isize>,
|
|
vm: &VirtualMachine,
|
|
) -> PyResult<String> {
|
|
bytes_to_hex(self.elements.as_slice(), sep, bytes_per_sep, vm)
|
|
}
|
|
|
|
pub fn fromhex(string: &str, vm: &VirtualMachine) -> PyResult<Vec<u8>> {
|
|
let mut iter = string.bytes().enumerate();
|
|
let mut bytes: Vec<u8> = Vec::with_capacity(string.len() / 2);
|
|
let i = loop {
|
|
let (i, b) = match iter.next() {
|
|
Some(val) => val,
|
|
None => {
|
|
return Ok(bytes);
|
|
}
|
|
};
|
|
|
|
if is_py_ascii_whitespace(b) {
|
|
continue;
|
|
}
|
|
|
|
let top = match b {
|
|
b'0'..=b'9' => b - b'0',
|
|
b'a'..=b'f' => 10 + b - b'a',
|
|
b'A'..=b'F' => 10 + b - b'A',
|
|
_ => break i,
|
|
};
|
|
|
|
let (i, b) = match iter.next() {
|
|
Some(val) => val,
|
|
None => break i + 1,
|
|
};
|
|
|
|
let bot = match b {
|
|
b'0'..=b'9' => b - b'0',
|
|
b'a'..=b'f' => 10 + b - b'a',
|
|
b'A'..=b'F' => 10 + b - b'A',
|
|
_ => break i,
|
|
};
|
|
|
|
bytes.push((top << 4) + bot);
|
|
};
|
|
|
|
Err(vm.new_value_error(format!(
|
|
"non-hexadecimal number found in fromhex() arg at position {i}"
|
|
)))
|
|
}
|
|
|
|
#[inline]
|
|
fn _pad(
|
|
&self,
|
|
options: ByteInnerPaddingOptions,
|
|
pad: fn(&[u8], usize, u8, usize) -> Vec<u8>,
|
|
vm: &VirtualMachine,
|
|
) -> PyResult<Vec<u8>> {
|
|
let (width, fillchar) = options.get_value("center", vm)?;
|
|
Ok(if self.len() as isize >= width {
|
|
Vec::from(&self.elements[..])
|
|
} else {
|
|
pad(&self.elements, width as usize, fillchar, self.len())
|
|
})
|
|
}
|
|
|
|
pub fn center(
|
|
&self,
|
|
options: ByteInnerPaddingOptions,
|
|
vm: &VirtualMachine,
|
|
) -> PyResult<Vec<u8>> {
|
|
self._pad(options, AnyStr::py_center, vm)
|
|
}
|
|
|
|
pub fn ljust(
|
|
&self,
|
|
options: ByteInnerPaddingOptions,
|
|
vm: &VirtualMachine,
|
|
) -> PyResult<Vec<u8>> {
|
|
self._pad(options, AnyStr::py_ljust, vm)
|
|
}
|
|
|
|
pub fn rjust(
|
|
&self,
|
|
options: ByteInnerPaddingOptions,
|
|
vm: &VirtualMachine,
|
|
) -> PyResult<Vec<u8>> {
|
|
self._pad(options, AnyStr::py_rjust, vm)
|
|
}
|
|
|
|
pub fn count(&self, options: ByteInnerFindOptions, vm: &VirtualMachine) -> PyResult<usize> {
|
|
let (needle, range) = options.get_value(self.elements.len(), vm)?;
|
|
Ok(self
|
|
.elements
|
|
.py_count(needle.as_slice(), range, |h, n| h.find_iter(n).count()))
|
|
}
|
|
|
|
pub fn join(
|
|
&self,
|
|
iterable: ArgIterable<PyBytesInner>,
|
|
vm: &VirtualMachine,
|
|
) -> PyResult<Vec<u8>> {
|
|
let iter = iterable.iter(vm)?;
|
|
self.elements.py_join(iter)
|
|
}
|
|
|
|
#[inline]
|
|
pub fn find<F>(
|
|
&self,
|
|
options: ByteInnerFindOptions,
|
|
find: F,
|
|
vm: &VirtualMachine,
|
|
) -> PyResult<Option<usize>>
|
|
where
|
|
F: Fn(&[u8], &[u8]) -> Option<usize>,
|
|
{
|
|
let (needle, range) = options.get_value(self.elements.len(), vm)?;
|
|
Ok(self.elements.py_find(&needle, range, find))
|
|
}
|
|
|
|
pub fn maketrans(
|
|
from: PyBytesInner,
|
|
to: PyBytesInner,
|
|
vm: &VirtualMachine,
|
|
) -> PyResult<Vec<u8>> {
|
|
if from.len() != to.len() {
|
|
return Err(
|
|
vm.new_value_error("the two maketrans arguments must have equal length".to_owned())
|
|
);
|
|
}
|
|
let mut res = vec![];
|
|
|
|
for i in 0..=255 {
|
|
res.push(if let Some(position) = from.elements.find_byte(i) {
|
|
to.elements[position]
|
|
} else {
|
|
i
|
|
});
|
|
}
|
|
|
|
Ok(res)
|
|
}
|
|
|
|
pub fn translate(
|
|
&self,
|
|
options: ByteInnerTranslateOptions,
|
|
vm: &VirtualMachine,
|
|
) -> PyResult<Vec<u8>> {
|
|
let (table, delete) = options.get_value(vm)?;
|
|
|
|
let mut res = if delete.is_empty() {
|
|
Vec::with_capacity(self.elements.len())
|
|
} else {
|
|
Vec::new()
|
|
};
|
|
|
|
for i in &self.elements {
|
|
if !delete.contains(i) {
|
|
res.push(table[*i as usize]);
|
|
}
|
|
}
|
|
|
|
Ok(res)
|
|
}
|
|
|
|
pub fn strip(&self, chars: OptionalOption<PyBytesInner>) -> Vec<u8> {
|
|
self.elements
|
|
.py_strip(
|
|
chars,
|
|
|s, chars| s.trim_with(|c| chars.contains(&(c as u8))),
|
|
|s| s.trim(),
|
|
)
|
|
.to_vec()
|
|
}
|
|
|
|
pub fn lstrip(&self, chars: OptionalOption<PyBytesInner>) -> &[u8] {
|
|
self.elements.py_strip(
|
|
chars,
|
|
|s, chars| s.trim_start_with(|c| chars.contains(&(c as u8))),
|
|
|s| s.trim_start(),
|
|
)
|
|
}
|
|
|
|
pub fn rstrip(&self, chars: OptionalOption<PyBytesInner>) -> &[u8] {
|
|
self.elements.py_strip(
|
|
chars,
|
|
|s, chars| s.trim_end_with(|c| chars.contains(&(c as u8))),
|
|
|s| s.trim_end(),
|
|
)
|
|
}
|
|
|
|
// new in Python 3.9
|
|
pub fn removeprefix(&self, prefix: PyBytesInner) -> Vec<u8> {
|
|
self.elements
|
|
.py_removeprefix(&prefix.elements, prefix.elements.len(), |s, p| {
|
|
s.starts_with(p)
|
|
})
|
|
.to_vec()
|
|
}
|
|
|
|
// new in Python 3.9
|
|
pub fn removesuffix(&self, suffix: PyBytesInner) -> Vec<u8> {
|
|
self.elements
|
|
.py_removesuffix(&suffix.elements, suffix.elements.len(), |s, p| {
|
|
s.ends_with(p)
|
|
})
|
|
.to_vec()
|
|
}
|
|
|
|
pub fn split<F>(
|
|
&self,
|
|
options: ByteInnerSplitOptions,
|
|
convert: F,
|
|
vm: &VirtualMachine,
|
|
) -> PyResult<Vec<PyObjectRef>>
|
|
where
|
|
F: Fn(&[u8], &VirtualMachine) -> PyObjectRef,
|
|
{
|
|
let elements = self.elements.py_split(
|
|
options,
|
|
vm,
|
|
|v, s, vm| v.split_str(s).map(|v| convert(v, vm)).collect(),
|
|
|v, s, n, vm| v.splitn_str(n, s).map(|v| convert(v, vm)).collect(),
|
|
|v, n, vm| v.py_split_whitespace(n, |v| convert(v, vm)),
|
|
)?;
|
|
Ok(elements)
|
|
}
|
|
|
|
pub fn rsplit<F>(
|
|
&self,
|
|
options: ByteInnerSplitOptions,
|
|
convert: F,
|
|
vm: &VirtualMachine,
|
|
) -> PyResult<Vec<PyObjectRef>>
|
|
where
|
|
F: Fn(&[u8], &VirtualMachine) -> PyObjectRef,
|
|
{
|
|
let mut elements = self.elements.py_split(
|
|
options,
|
|
vm,
|
|
|v, s, vm| v.rsplit_str(s).map(|v| convert(v, vm)).collect(),
|
|
|v, s, n, vm| v.rsplitn_str(n, s).map(|v| convert(v, vm)).collect(),
|
|
|v, n, vm| v.py_rsplit_whitespace(n, |v| convert(v, vm)),
|
|
)?;
|
|
elements.reverse();
|
|
Ok(elements)
|
|
}
|
|
|
|
pub fn partition(
|
|
&self,
|
|
sub: &PyBytesInner,
|
|
vm: &VirtualMachine,
|
|
) -> PyResult<(Vec<u8>, bool, Vec<u8>)> {
|
|
self.elements.py_partition(
|
|
&sub.elements,
|
|
|| self.elements.splitn_str(2, &sub.elements),
|
|
vm,
|
|
)
|
|
}
|
|
|
|
pub fn rpartition(
|
|
&self,
|
|
sub: &PyBytesInner,
|
|
vm: &VirtualMachine,
|
|
) -> PyResult<(Vec<u8>, bool, Vec<u8>)> {
|
|
self.elements.py_partition(
|
|
&sub.elements,
|
|
|| self.elements.rsplitn_str(2, &sub.elements),
|
|
vm,
|
|
)
|
|
}
|
|
|
|
pub fn expandtabs(&self, options: anystr::ExpandTabsArgs) -> Vec<u8> {
|
|
let tabsize = options.tabsize();
|
|
let mut counter: usize = 0;
|
|
let mut res = vec![];
|
|
|
|
if tabsize == 0 {
|
|
return self
|
|
.elements
|
|
.iter()
|
|
.copied()
|
|
.filter(|x| *x != b'\t')
|
|
.collect();
|
|
}
|
|
|
|
for i in &self.elements {
|
|
if *i == b'\t' {
|
|
let len = tabsize - counter % tabsize;
|
|
res.extend_from_slice(&vec![b' '; len]);
|
|
counter += len;
|
|
} else {
|
|
res.push(*i);
|
|
if *i == b'\r' || *i == b'\n' {
|
|
counter = 0;
|
|
} else {
|
|
counter += 1;
|
|
}
|
|
}
|
|
}
|
|
|
|
res
|
|
}
|
|
|
|
pub fn splitlines<FW, W>(&self, options: anystr::SplitLinesArgs, into_wrapper: FW) -> Vec<W>
|
|
where
|
|
FW: Fn(&[u8]) -> W,
|
|
{
|
|
self.elements.py_bytes_splitlines(options, into_wrapper)
|
|
}
|
|
|
|
pub fn zfill(&self, width: isize) -> Vec<u8> {
|
|
self.elements.py_zfill(width)
|
|
}
|
|
|
|
// len(self)>=1, from="", len(to)>=1, maxcount>=1
|
|
fn replace_interleave(&self, to: PyBytesInner, maxcount: Option<usize>) -> Vec<u8> {
|
|
let place_count = self.elements.len() + 1;
|
|
let count = maxcount.map_or(place_count, |v| std::cmp::min(v, place_count)) - 1;
|
|
let capacity = self.elements.len() + count * to.len();
|
|
let mut result = Vec::with_capacity(capacity);
|
|
let to_slice = to.elements.as_slice();
|
|
result.extend_from_slice(to_slice);
|
|
for c in &self.elements[..count] {
|
|
result.push(*c);
|
|
result.extend_from_slice(to_slice);
|
|
}
|
|
result.extend_from_slice(&self.elements[count..]);
|
|
result
|
|
}
|
|
|
|
fn replace_delete(&self, from: PyBytesInner, maxcount: Option<usize>) -> Vec<u8> {
|
|
let count = count_substring(self.elements.as_slice(), from.elements.as_slice(), maxcount);
|
|
if count == 0 {
|
|
// no matches
|
|
return self.elements.clone();
|
|
}
|
|
|
|
let result_len = self.len() - (count * from.len());
|
|
debug_assert!(self.len() >= count * from.len());
|
|
|
|
let mut result = Vec::with_capacity(result_len);
|
|
let mut last_end = 0;
|
|
let mut count = count;
|
|
for offset in self.elements.find_iter(&from.elements) {
|
|
result.extend_from_slice(&self.elements[last_end..offset]);
|
|
last_end = offset + from.len();
|
|
count -= 1;
|
|
if count == 0 {
|
|
break;
|
|
}
|
|
}
|
|
result.extend_from_slice(&self.elements[last_end..]);
|
|
result
|
|
}
|
|
|
|
pub fn replace_in_place(
|
|
&self,
|
|
from: PyBytesInner,
|
|
to: PyBytesInner,
|
|
maxcount: Option<usize>,
|
|
) -> Vec<u8> {
|
|
let len = from.len();
|
|
let mut iter = self.elements.find_iter(&from.elements);
|
|
|
|
let mut new = if let Some(offset) = iter.next() {
|
|
let mut new = self.elements.clone();
|
|
new[offset..offset + len].clone_from_slice(to.elements.as_slice());
|
|
if maxcount == Some(1) {
|
|
return new;
|
|
} else {
|
|
new
|
|
}
|
|
} else {
|
|
return self.elements.clone();
|
|
};
|
|
|
|
let mut count = maxcount.unwrap_or(usize::MAX) - 1;
|
|
for offset in iter {
|
|
new[offset..offset + len].clone_from_slice(to.elements.as_slice());
|
|
count -= 1;
|
|
if count == 0 {
|
|
break;
|
|
}
|
|
}
|
|
new
|
|
}
|
|
|
|
fn replace_general(
|
|
&self,
|
|
from: PyBytesInner,
|
|
to: PyBytesInner,
|
|
maxcount: Option<usize>,
|
|
vm: &VirtualMachine,
|
|
) -> PyResult<Vec<u8>> {
|
|
let count = count_substring(self.elements.as_slice(), from.elements.as_slice(), maxcount);
|
|
if count == 0 {
|
|
// no matches, return unchanged
|
|
return Ok(self.elements.clone());
|
|
}
|
|
|
|
// Check for overflow
|
|
// result_len = self_len + count * (to_len-from_len)
|
|
debug_assert!(count > 0);
|
|
if to.len() as isize - from.len() as isize
|
|
> (isize::MAX - self.elements.len() as isize) / count as isize
|
|
{
|
|
return Err(vm.new_overflow_error("replace bytes is too long".to_owned()));
|
|
}
|
|
let result_len = (self.elements.len() as isize
|
|
+ count as isize * (to.len() as isize - from.len() as isize))
|
|
as usize;
|
|
|
|
let mut result = Vec::with_capacity(result_len);
|
|
let mut last_end = 0;
|
|
let mut count = count;
|
|
for offset in self.elements.find_iter(&from.elements) {
|
|
result.extend_from_slice(&self.elements[last_end..offset]);
|
|
result.extend_from_slice(to.elements.as_slice());
|
|
last_end = offset + from.len();
|
|
count -= 1;
|
|
if count == 0 {
|
|
break;
|
|
}
|
|
}
|
|
result.extend_from_slice(&self.elements[last_end..]);
|
|
Ok(result)
|
|
}
|
|
|
|
pub fn replace(
|
|
&self,
|
|
from: PyBytesInner,
|
|
to: PyBytesInner,
|
|
maxcount: OptionalArg<isize>,
|
|
vm: &VirtualMachine,
|
|
) -> PyResult<Vec<u8>> {
|
|
// stringlib_replace in CPython
|
|
let maxcount = match maxcount {
|
|
OptionalArg::Present(maxcount) if maxcount >= 0 => {
|
|
if maxcount == 0 || (self.elements.is_empty() && !from.is_empty()) {
|
|
// nothing to do; return the original bytes
|
|
return Ok(self.elements.clone());
|
|
} else if self.elements.is_empty() && from.is_empty() {
|
|
return Ok(to.elements);
|
|
}
|
|
Some(maxcount as usize)
|
|
}
|
|
_ => None,
|
|
};
|
|
|
|
// Handle zero-length special cases
|
|
if from.elements.is_empty() {
|
|
if to.elements.is_empty() {
|
|
// nothing to do; return the original bytes
|
|
return Ok(self.elements.clone());
|
|
}
|
|
// insert the 'to' bytes everywhere.
|
|
// >>> b"Python".replace(b"", b".")
|
|
// b'.P.y.t.h.o.n.'
|
|
return Ok(self.replace_interleave(to, maxcount));
|
|
}
|
|
|
|
// Except for b"".replace(b"", b"A") == b"A" there is no way beyond this
|
|
// point for an empty self bytes to generate a non-empty bytes
|
|
// Special case so the remaining code always gets a non-empty bytes
|
|
if self.elements.is_empty() {
|
|
return Ok(self.elements.clone());
|
|
}
|
|
|
|
if to.elements.is_empty() {
|
|
// delete all occurrences of 'from' bytes
|
|
Ok(self.replace_delete(from, maxcount))
|
|
} else if from.len() == to.len() {
|
|
// Handle special case where both bytes have the same length
|
|
Ok(self.replace_in_place(from, to, maxcount))
|
|
} else {
|
|
// Otherwise use the more generic algorithms
|
|
self.replace_general(from, to, maxcount, vm)
|
|
}
|
|
}
|
|
|
|
pub fn title(&self) -> Vec<u8> {
|
|
let mut res = vec![];
|
|
let mut spaced = true;
|
|
|
|
for i in &self.elements {
|
|
match i {
|
|
65..=90 | 97..=122 => {
|
|
if spaced {
|
|
res.push(i.to_ascii_uppercase());
|
|
spaced = false
|
|
} else {
|
|
res.push(i.to_ascii_lowercase());
|
|
}
|
|
}
|
|
_ => {
|
|
res.push(*i);
|
|
spaced = true
|
|
}
|
|
}
|
|
}
|
|
|
|
res
|
|
}
|
|
|
|
pub fn cformat(&self, values: PyObjectRef, vm: &VirtualMachine) -> PyResult<Vec<u8>> {
|
|
cformat_bytes(vm, self.elements.as_slice(), values)
|
|
}
|
|
|
|
pub fn mul(&self, n: isize, vm: &VirtualMachine) -> PyResult<Vec<u8>> {
|
|
self.elements.mul(vm, n)
|
|
}
|
|
|
|
pub fn imul(&mut self, n: isize, vm: &VirtualMachine) -> PyResult<()> {
|
|
self.elements.imul(vm, n)
|
|
}
|
|
|
|
pub fn concat(&self, other: &PyObject, vm: &VirtualMachine) -> PyResult<Vec<u8>> {
|
|
let buffer = PyBuffer::try_from_borrowed_object(vm, other)?;
|
|
let borrowed = buffer.as_contiguous();
|
|
if let Some(other) = borrowed {
|
|
let mut v = Vec::with_capacity(self.elements.len() + other.len());
|
|
v.extend_from_slice(&self.elements);
|
|
v.extend_from_slice(&other);
|
|
Ok(v)
|
|
} else {
|
|
let mut v = self.elements.clone();
|
|
buffer.append_to(&mut v);
|
|
Ok(v)
|
|
}
|
|
}
|
|
}
|
|
|
|
pub fn try_as_bytes<F, R>(obj: PyObjectRef, f: F) -> Option<R>
|
|
where
|
|
F: Fn(&[u8]) -> R,
|
|
{
|
|
match_class!(match obj {
|
|
i @ PyBytes => Some(f(i.as_bytes())),
|
|
j @ PyByteArray => Some(f(&j.borrow_buf())),
|
|
_ => None,
|
|
})
|
|
}
|
|
|
|
#[inline]
|
|
fn count_substring(haystack: &[u8], needle: &[u8], maxcount: Option<usize>) -> usize {
|
|
let substrings = haystack.find_iter(needle);
|
|
if let Some(maxcount) = maxcount {
|
|
std::cmp::min(substrings.take(maxcount).count(), maxcount)
|
|
} else {
|
|
substrings.count()
|
|
}
|
|
}
|
|
|
|
pub trait ByteOr: ToPrimitive {
|
|
fn byte_or(&self, vm: &VirtualMachine) -> PyResult<u8> {
|
|
match self.to_u8() {
|
|
Some(value) => Ok(value),
|
|
None => Err(vm.new_value_error("byte must be in range(0, 256)".to_owned())),
|
|
}
|
|
}
|
|
}
|
|
|
|
impl ByteOr for BigInt {}
|
|
|
|
impl AnyStrWrapper for PyBytesInner {
|
|
type Str = [u8];
|
|
fn as_ref(&self) -> &[u8] {
|
|
&self.elements
|
|
}
|
|
}
|
|
|
|
impl AnyStrContainer<[u8]> for Vec<u8> {
|
|
fn new() -> Self {
|
|
Vec::new()
|
|
}
|
|
|
|
fn with_capacity(capacity: usize) -> Self {
|
|
Vec::with_capacity(capacity)
|
|
}
|
|
|
|
fn push_str(&mut self, other: &[u8]) {
|
|
self.extend(other)
|
|
}
|
|
}
|
|
|
|
const ASCII_WHITESPACES: [u8; 6] = [0x20, 0x09, 0x0a, 0x0c, 0x0d, 0x0b];
|
|
|
|
impl AnyStr for [u8] {
|
|
type Char = u8;
|
|
type Container = Vec<u8>;
|
|
|
|
fn element_bytes_len(_: u8) -> usize {
|
|
1
|
|
}
|
|
|
|
fn to_container(&self) -> Self::Container {
|
|
self.to_vec()
|
|
}
|
|
|
|
fn as_bytes(&self) -> &[u8] {
|
|
self
|
|
}
|
|
|
|
fn as_utf8_str(&self) -> Result<&str, std::str::Utf8Error> {
|
|
std::str::from_utf8(self)
|
|
}
|
|
|
|
fn chars(&self) -> impl Iterator<Item = char> {
|
|
bstr::ByteSlice::chars(self)
|
|
}
|
|
|
|
fn elements(&self) -> impl Iterator<Item = u8> {
|
|
self.iter().copied()
|
|
}
|
|
|
|
fn get_bytes(&self, range: std::ops::Range<usize>) -> &Self {
|
|
&self[range]
|
|
}
|
|
|
|
fn get_chars(&self, range: std::ops::Range<usize>) -> &Self {
|
|
&self[range]
|
|
}
|
|
|
|
fn is_empty(&self) -> bool {
|
|
Self::is_empty(self)
|
|
}
|
|
|
|
fn bytes_len(&self) -> usize {
|
|
Self::len(self)
|
|
}
|
|
|
|
fn py_split_whitespace<F>(&self, maxsplit: isize, convert: F) -> Vec<PyObjectRef>
|
|
where
|
|
F: Fn(&Self) -> PyObjectRef,
|
|
{
|
|
let mut splits = Vec::new();
|
|
let mut count = maxsplit;
|
|
let mut haystack = self;
|
|
while let Some(offset) = haystack.find_byteset(ASCII_WHITESPACES) {
|
|
if offset != 0 {
|
|
if count == 0 {
|
|
break;
|
|
}
|
|
splits.push(convert(&haystack[..offset]));
|
|
count -= 1;
|
|
}
|
|
haystack = &haystack[offset + 1..];
|
|
}
|
|
if !haystack.is_empty() {
|
|
splits.push(convert(haystack));
|
|
}
|
|
splits
|
|
}
|
|
|
|
fn py_rsplit_whitespace<F>(&self, maxsplit: isize, convert: F) -> Vec<PyObjectRef>
|
|
where
|
|
F: Fn(&Self) -> PyObjectRef,
|
|
{
|
|
let mut splits = Vec::new();
|
|
let mut count = maxsplit;
|
|
let mut haystack = self;
|
|
while let Some(offset) = haystack.rfind_byteset(ASCII_WHITESPACES) {
|
|
if offset + 1 != haystack.len() {
|
|
if count == 0 {
|
|
break;
|
|
}
|
|
splits.push(convert(&haystack[offset + 1..]));
|
|
count -= 1;
|
|
}
|
|
haystack = &haystack[..offset];
|
|
}
|
|
if !haystack.is_empty() {
|
|
splits.push(convert(haystack));
|
|
}
|
|
splits
|
|
}
|
|
}
|
|
|
|
#[derive(FromArgs)]
|
|
pub struct DecodeArgs {
|
|
#[pyarg(any, default)]
|
|
encoding: Option<PyStrRef>,
|
|
#[pyarg(any, default)]
|
|
errors: Option<PyStrRef>,
|
|
}
|
|
|
|
pub fn bytes_decode(
|
|
zelf: PyObjectRef,
|
|
args: DecodeArgs,
|
|
vm: &VirtualMachine,
|
|
) -> PyResult<PyStrRef> {
|
|
let DecodeArgs { encoding, errors } = args;
|
|
let encoding = encoding
|
|
.as_ref()
|
|
.map_or(crate::codecs::DEFAULT_ENCODING, |s| s.as_str());
|
|
vm.state
|
|
.codec_registry
|
|
.decode_text(zelf, encoding, errors, vm)
|
|
}
|
|
|
|
fn hex_impl_no_sep(bytes: &[u8]) -> String {
|
|
let mut buf: Vec<u8> = vec![0; bytes.len() * 2];
|
|
hex::encode_to_slice(bytes, buf.as_mut_slice()).unwrap();
|
|
unsafe { String::from_utf8_unchecked(buf) }
|
|
}
|
|
|
|
fn hex_impl(bytes: &[u8], sep: u8, bytes_per_sep: isize) -> String {
|
|
let len = bytes.len();
|
|
|
|
let buf = if bytes_per_sep < 0 {
|
|
let bytes_per_sep = std::cmp::min(len, (-bytes_per_sep) as usize);
|
|
let chunks = (len - 1) / bytes_per_sep;
|
|
let chunked = chunks * bytes_per_sep;
|
|
let unchunked = len - chunked;
|
|
let mut buf = vec![0; len * 2 + chunks];
|
|
let mut j = 0;
|
|
for i in (0..chunks).map(|i| i * bytes_per_sep) {
|
|
hex::encode_to_slice(
|
|
&bytes[i..i + bytes_per_sep],
|
|
&mut buf[j..j + bytes_per_sep * 2],
|
|
)
|
|
.unwrap();
|
|
j += bytes_per_sep * 2;
|
|
buf[j] = sep;
|
|
j += 1;
|
|
}
|
|
hex::encode_to_slice(&bytes[chunked..], &mut buf[j..j + unchunked * 2]).unwrap();
|
|
buf
|
|
} else {
|
|
let bytes_per_sep = std::cmp::min(len, bytes_per_sep as usize);
|
|
let chunks = (len - 1) / bytes_per_sep;
|
|
let chunked = chunks * bytes_per_sep;
|
|
let unchunked = len - chunked;
|
|
let mut buf = vec![0; len * 2 + chunks];
|
|
hex::encode_to_slice(&bytes[..unchunked], &mut buf[..unchunked * 2]).unwrap();
|
|
let mut j = unchunked * 2;
|
|
for i in (0..chunks).map(|i| i * bytes_per_sep + unchunked) {
|
|
buf[j] = sep;
|
|
j += 1;
|
|
hex::encode_to_slice(
|
|
&bytes[i..i + bytes_per_sep],
|
|
&mut buf[j..j + bytes_per_sep * 2],
|
|
)
|
|
.unwrap();
|
|
j += bytes_per_sep * 2;
|
|
}
|
|
buf
|
|
};
|
|
|
|
unsafe { String::from_utf8_unchecked(buf) }
|
|
}
|
|
|
|
pub fn bytes_to_hex(
|
|
bytes: &[u8],
|
|
sep: OptionalArg<Either<PyStrRef, PyBytesRef>>,
|
|
bytes_per_sep: OptionalArg<isize>,
|
|
vm: &VirtualMachine,
|
|
) -> PyResult<String> {
|
|
if bytes.is_empty() {
|
|
return Ok("".to_owned());
|
|
}
|
|
|
|
if let OptionalArg::Present(sep) = sep {
|
|
let bytes_per_sep = bytes_per_sep.unwrap_or(1);
|
|
if bytes_per_sep == 0 {
|
|
return Ok(hex_impl_no_sep(bytes));
|
|
}
|
|
|
|
let s_guard;
|
|
let b_guard;
|
|
let sep = match &sep {
|
|
Either::A(s) => {
|
|
s_guard = s.as_str();
|
|
s_guard.as_bytes()
|
|
}
|
|
Either::B(bytes) => {
|
|
b_guard = bytes.as_bytes();
|
|
b_guard
|
|
}
|
|
};
|
|
|
|
if sep.len() != 1 {
|
|
return Err(vm.new_value_error("sep must be length 1.".to_owned()));
|
|
}
|
|
let sep = sep[0];
|
|
if sep > 127 {
|
|
return Err(vm.new_value_error("sep must be ASCII.".to_owned()));
|
|
}
|
|
|
|
Ok(hex_impl(bytes, sep, bytes_per_sep))
|
|
} else {
|
|
Ok(hex_impl_no_sep(bytes))
|
|
}
|
|
}
|
|
|
|
pub const fn is_py_ascii_whitespace(b: u8) -> bool {
|
|
matches!(b, b'\t' | b'\n' | b'\x0C' | b'\r' | b' ' | b'\x0B')
|
|
}
|