Files
RustPython/vm/src/builtins/bytes.rs

616 lines
17 KiB
Rust

use super::{PyDictRef, PyIntRef, PyStrRef, PyTupleRef, PyTypeRef};
use crate::{
anystr::{self, AnyStr},
bytesinner::{
bytes_decode, ByteInnerFindOptions, ByteInnerNewOptions, ByteInnerPaddingOptions,
ByteInnerSplitOptions, ByteInnerTranslateOptions, DecodeArgs, PyBytesInner,
},
common::hash::PyHash,
function::{ArgBytesLike, ArgIterable, OptionalArg, OptionalOption},
protocol::{BufferInternal, BufferOptions, PyBuffer},
slots::{
AsBuffer, Callable, Comparable, Hashable, Iterable, IteratorIterable, PyComparisonOp,
SlotConstructor, SlotIterator,
},
utils::Either,
IdProtocol, IntoPyObject, IntoPyResult, PyClassImpl, PyComparisonValue, PyContext, PyObjectRef,
PyRef, PyResult, PyValue, TryFromBorrowedObject, TypeProtocol, VirtualMachine,
};
use bstr::ByteSlice;
use crossbeam_utils::atomic::AtomicCell;
use rustpython_common::borrow::{BorrowedValue, BorrowedValueMut};
use std::mem::size_of;
use std::ops::Deref;
/// "bytes(iterable_of_ints) -> bytes\n\
/// bytes(string, encoding[, errors]) -> bytes\n\
/// bytes(bytes_or_buffer) -> immutable copy of bytes_or_buffer\n\
/// bytes(int) -> bytes object of size given by the parameter initialized with null bytes\n\
/// bytes() -> empty bytes object\n\nConstruct an immutable array of bytes from:\n \
/// - an iterable yielding integers in range(256)\n \
/// - a text string encoded using the specified encoding\n \
/// - any object implementing the buffer API.\n \
/// - an integer";
#[pyclass(module = false, name = "bytes")]
#[derive(Clone, Debug)]
pub struct PyBytes {
inner: PyBytesInner,
}
pub type PyBytesRef = PyRef<PyBytes>;
impl From<Vec<u8>> for PyBytes {
fn from(elements: Vec<u8>) -> Self {
Self {
inner: PyBytesInner { elements },
}
}
}
impl From<PyBytesInner> for PyBytes {
fn from(inner: PyBytesInner) -> Self {
Self { inner }
}
}
impl IntoPyObject for Vec<u8> {
fn into_pyobject(self, vm: &VirtualMachine) -> PyObjectRef {
vm.ctx.new_bytes(self)
}
}
impl Deref for PyBytes {
type Target = [u8];
fn deref(&self) -> &[u8] {
&self.inner.elements
}
}
impl AsRef<[u8]> for PyBytes {
fn as_ref(&self) -> &[u8] {
&self.inner.elements
}
}
impl AsRef<[u8]> for PyBytesRef {
fn as_ref(&self) -> &[u8] {
&self.inner.elements
}
}
impl PyValue for PyBytes {
fn class(vm: &VirtualMachine) -> &PyTypeRef {
&vm.ctx.types.bytes_type
}
}
pub(crate) fn init(context: &PyContext) {
PyBytes::extend_class(context, &context.types.bytes_type);
let bytes_type = &context.types.bytes_type;
extend_class!(context, bytes_type, {
"maketrans" => context.new_method("maketrans", bytes_type.clone(), PyBytesInner::maketrans),
});
PyBytesIterator::extend_class(context, &context.types.bytes_iterator_type);
}
impl SlotConstructor for PyBytes {
type Args = ByteInnerNewOptions;
fn py_new(cls: PyTypeRef, options: Self::Args, vm: &VirtualMachine) -> PyResult {
options.get_bytes(cls, vm).into_pyresult(vm)
}
}
#[pyimpl(
flags(BASETYPE),
with(Hashable, Comparable, AsBuffer, Iterable, SlotConstructor)
)]
impl PyBytes {
#[pymethod(magic)]
pub(crate) fn repr(&self) -> String {
self.inner.repr("", "")
}
#[pymethod(magic)]
#[inline]
pub fn len(&self) -> usize {
self.inner.len()
}
#[inline]
pub fn is_empty(&self) -> bool {
self.inner.is_empty()
}
#[inline]
pub fn as_bytes(&self) -> &[u8] {
&self.inner.elements
}
#[pymethod(magic)]
fn bytes(zelf: PyRef<Self>, vm: &VirtualMachine) -> PyRef<Self> {
if zelf.is(&vm.ctx.types.bytes_type) {
zelf
} else {
PyBytes::from(zelf.inner.clone()).into_ref(vm)
}
}
#[pymethod(magic)]
fn sizeof(&self) -> usize {
size_of::<Self>() + self.inner.elements.len() * size_of::<u8>()
}
#[pymethod(magic)]
fn add(&self, other: ArgBytesLike, vm: &VirtualMachine) -> PyObjectRef {
vm.ctx.new_bytes(self.inner.add(&*other.borrow_buf()))
}
#[pymethod(magic)]
fn contains(
&self,
needle: Either<PyBytesInner, PyIntRef>,
vm: &VirtualMachine,
) -> PyResult<bool> {
self.inner.contains(needle, vm)
}
#[pymethod(magic)]
fn getitem(&self, needle: PyObjectRef, vm: &VirtualMachine) -> PyResult {
self.inner.getitem("byte", needle, vm) // byte != Self::NAME
}
#[pymethod]
fn isalnum(&self) -> bool {
self.inner.isalnum()
}
#[pymethod]
fn isalpha(&self) -> bool {
self.inner.isalpha()
}
#[pymethod]
fn isascii(&self) -> bool {
self.inner.isascii()
}
#[pymethod]
fn isdigit(&self) -> bool {
self.inner.isdigit()
}
#[pymethod]
fn islower(&self) -> bool {
self.inner.islower()
}
#[pymethod]
fn isspace(&self) -> bool {
self.inner.isspace()
}
#[pymethod]
fn isupper(&self) -> bool {
self.inner.isupper()
}
#[pymethod]
fn istitle(&self) -> bool {
self.inner.istitle()
}
#[pymethod]
fn lower(&self) -> Self {
self.inner.lower().into()
}
#[pymethod]
fn upper(&self) -> Self {
self.inner.upper().into()
}
#[pymethod]
fn capitalize(&self) -> Self {
self.inner.capitalize().into()
}
#[pymethod]
fn swapcase(&self) -> Self {
self.inner.swapcase().into()
}
#[pymethod]
pub(crate) fn hex(
&self,
sep: OptionalArg<Either<PyStrRef, PyBytesRef>>,
bytes_per_sep: OptionalArg<isize>,
vm: &VirtualMachine,
) -> PyResult<String> {
self.inner.hex(sep, bytes_per_sep, vm)
}
#[pyclassmethod]
fn fromhex(cls: PyTypeRef, string: PyStrRef, vm: &VirtualMachine) -> PyResult {
let bytes = PyBytesInner::fromhex(string.as_str(), vm)?;
let bytes = vm.ctx.new_bytes(bytes);
Callable::call(&cls, vec![bytes].into(), vm)
}
#[pymethod]
fn center(&self, options: ByteInnerPaddingOptions, vm: &VirtualMachine) -> PyResult<PyBytes> {
Ok(self.inner.center(options, vm)?.into())
}
#[pymethod]
fn ljust(&self, options: ByteInnerPaddingOptions, vm: &VirtualMachine) -> PyResult<PyBytes> {
Ok(self.inner.ljust(options, vm)?.into())
}
#[pymethod]
fn rjust(&self, options: ByteInnerPaddingOptions, vm: &VirtualMachine) -> PyResult<PyBytes> {
Ok(self.inner.rjust(options, vm)?.into())
}
#[pymethod]
fn count(&self, options: ByteInnerFindOptions, vm: &VirtualMachine) -> PyResult<usize> {
self.inner.count(options, vm)
}
#[pymethod]
fn join(&self, iter: ArgIterable<PyBytesInner>, vm: &VirtualMachine) -> PyResult<PyBytes> {
Ok(self.inner.join(iter, vm)?.into())
}
#[pymethod]
fn endswith(&self, options: anystr::StartsEndsWithArgs, vm: &VirtualMachine) -> PyResult<bool> {
self.inner.elements[..].py_startsendswith(
options,
"endswith",
"bytes",
|s, x: &PyBytesInner| s.ends_with(&x.elements[..]),
vm,
)
}
#[pymethod]
fn startswith(
&self,
options: anystr::StartsEndsWithArgs,
vm: &VirtualMachine,
) -> PyResult<bool> {
self.inner.elements[..].py_startsendswith(
options,
"startswith",
"bytes",
|s, x: &PyBytesInner| s.starts_with(&x.elements[..]),
vm,
)
}
#[pymethod]
fn find(&self, options: ByteInnerFindOptions, vm: &VirtualMachine) -> PyResult<isize> {
let index = self.inner.find(options, |h, n| h.find(n), vm)?;
Ok(index.map_or(-1, |v| v as isize))
}
#[pymethod]
fn index(&self, options: ByteInnerFindOptions, vm: &VirtualMachine) -> PyResult<usize> {
let index = self.inner.find(options, |h, n| h.find(n), vm)?;
index.ok_or_else(|| vm.new_value_error("substring not found".to_owned()))
}
#[pymethod]
fn rfind(&self, options: ByteInnerFindOptions, vm: &VirtualMachine) -> PyResult<isize> {
let index = self.inner.find(options, |h, n| h.rfind(n), vm)?;
Ok(index.map_or(-1, |v| v as isize))
}
#[pymethod]
fn rindex(&self, options: ByteInnerFindOptions, vm: &VirtualMachine) -> PyResult<usize> {
let index = self.inner.find(options, |h, n| h.rfind(n), vm)?;
index.ok_or_else(|| vm.new_value_error("substring not found".to_owned()))
}
#[pymethod]
fn translate(
&self,
options: ByteInnerTranslateOptions,
vm: &VirtualMachine,
) -> PyResult<PyBytes> {
Ok(self.inner.translate(options, vm)?.into())
}
#[pymethod]
fn strip(&self, chars: OptionalOption<PyBytesInner>) -> Self {
self.inner.strip(chars).into()
}
#[pymethod]
fn lstrip(&self, chars: OptionalOption<PyBytesInner>) -> Self {
self.inner.lstrip(chars).into()
}
#[pymethod]
fn rstrip(&self, chars: OptionalOption<PyBytesInner>) -> Self {
self.inner.rstrip(chars).into()
}
/// removeprefix($self, prefix, /)
///
///
/// Return a bytes object with the given prefix string removed if present.
///
/// If the bytes starts with the prefix string, return string[len(prefix):]
/// Otherwise, return a copy of the original bytes.
#[pymethod]
fn removeprefix(&self, prefix: PyBytesInner) -> Self {
self.inner.removeprefix(prefix).into()
}
/// removesuffix(self, prefix, /)
///
///
/// Return a bytes object with the given suffix string removed if present.
///
/// If the bytes ends with the suffix string, return string[:len(suffix)]
/// Otherwise, return a copy of the original bytes.
#[pymethod]
fn removesuffix(&self, suffix: PyBytesInner) -> Self {
self.inner.removesuffix(suffix).into()
}
#[pymethod]
fn split(&self, options: ByteInnerSplitOptions, vm: &VirtualMachine) -> PyResult {
self.inner
.split(options, |s, vm| vm.ctx.new_bytes(s.to_vec()), vm)
}
#[pymethod]
fn rsplit(&self, options: ByteInnerSplitOptions, vm: &VirtualMachine) -> PyResult {
self.inner
.rsplit(options, |s, vm| vm.ctx.new_bytes(s.to_vec()), vm)
}
#[pymethod]
fn partition(&self, sep: PyObjectRef, vm: &VirtualMachine) -> PyResult {
let sub = PyBytesInner::try_from_borrowed_object(vm, &sep)?;
let (front, has_mid, back) = self.inner.partition(&sub, vm)?;
Ok(vm.ctx.new_tuple(vec![
vm.ctx.new_bytes(front),
if has_mid {
sep
} else {
vm.ctx.new_bytes(Vec::new())
},
vm.ctx.new_bytes(back),
]))
}
#[pymethod]
fn rpartition(&self, sep: PyObjectRef, vm: &VirtualMachine) -> PyResult {
let sub = PyBytesInner::try_from_borrowed_object(vm, &sep)?;
let (back, has_mid, front) = self.inner.rpartition(&sub, vm)?;
Ok(vm.ctx.new_tuple(vec![
vm.ctx.new_bytes(front),
if has_mid {
sep
} else {
vm.ctx.new_bytes(Vec::new())
},
vm.ctx.new_bytes(back),
]))
}
#[pymethod]
fn expandtabs(&self, options: anystr::ExpandTabsArgs) -> Self {
self.inner.expandtabs(options).into()
}
#[pymethod]
fn splitlines(&self, options: anystr::SplitLinesArgs, vm: &VirtualMachine) -> PyObjectRef {
let lines = self
.inner
.splitlines(options, |x| vm.ctx.new_bytes(x.to_vec()));
vm.ctx.new_list(lines)
}
#[pymethod]
fn zfill(&self, width: isize) -> Self {
self.inner.zfill(width).into()
}
#[pymethod]
fn replace(
&self,
old: PyBytesInner,
new: PyBytesInner,
count: OptionalArg<isize>,
vm: &VirtualMachine,
) -> PyResult<PyBytes> {
Ok(self.inner.replace(old, new, count, vm)?.into())
}
#[pymethod]
fn title(&self) -> Self {
self.inner.title().into()
}
#[pymethod(name = "__rmul__")]
#[pymethod(magic)]
fn mul(zelf: PyRef<Self>, value: isize, vm: &VirtualMachine) -> PyResult<PyRef<Self>> {
if value == 1 && zelf.class().is(&vm.ctx.types.bytes_type) {
// Special case: when some `bytes` is multiplied by `1`,
// nothing really happens, we need to return an object itself
// with the same `id()` to be compatible with CPython.
// This only works for `bytes` itself, not its subclasses.
return Ok(zelf);
}
// todo: map err to overflow.
vm.check_repeat_or_memory_error(zelf.inner.len(), value)
.map(|value| {
let bytes: PyBytes = zelf.inner.repeat(value).into();
bytes.into_ref(vm)
})
// see issue 45044 on b.p.o.
.map_err(|_| vm.new_overflow_error("repeated bytes are too long".to_owned()))
}
#[pymethod(name = "__mod__")]
fn mod_(&self, values: PyObjectRef, vm: &VirtualMachine) -> PyResult<PyBytes> {
let formatted = self.inner.cformat(values, vm)?;
Ok(formatted.into())
}
#[pymethod(magic)]
fn rmod(&self, _values: PyObjectRef, vm: &VirtualMachine) -> PyObjectRef {
vm.ctx.not_implemented()
}
/// Return a string decoded from the given bytes.
/// Default encoding is 'utf-8'.
/// Default errors is 'strict', meaning that encoding errors raise a UnicodeError.
/// Other possible values are 'ignore', 'replace'
/// For a list of possible encodings,
/// see https://docs.python.org/3/library/codecs.html#standard-encodings
/// currently, only 'utf-8' and 'ascii' emplemented
#[pymethod]
fn decode(zelf: PyRef<Self>, args: DecodeArgs, vm: &VirtualMachine) -> PyResult<PyStrRef> {
bytes_decode(zelf.into_object(), args, vm)
}
#[pymethod(magic)]
fn getnewargs(&self, vm: &VirtualMachine) -> PyTupleRef {
let param: Vec<PyObjectRef> = self
.inner
.elements
.iter()
.map(|x| x.into_pyobject(vm))
.collect();
PyTupleRef::with_elements(param, &vm.ctx)
}
#[pymethod(magic)]
fn reduce_ex(
zelf: PyRef<Self>,
_proto: usize,
vm: &VirtualMachine,
) -> (PyTypeRef, PyTupleRef, Option<PyDictRef>) {
Self::reduce(zelf, vm)
}
#[pymethod(magic)]
fn reduce(
zelf: PyRef<Self>,
vm: &VirtualMachine,
) -> (PyTypeRef, PyTupleRef, Option<PyDictRef>) {
let bytes = PyBytes::from(zelf.inner.elements.clone()).into_pyobject(vm);
(
zelf.as_object().clone_class(),
PyTupleRef::with_elements(vec![bytes], &vm.ctx),
zelf.as_object().dict(),
)
}
}
impl AsBuffer for PyBytes {
fn as_buffer(zelf: &PyRef<Self>, _vm: &VirtualMachine) -> PyResult<PyBuffer> {
let buf = PyBuffer::new(
zelf.as_object().clone(),
zelf.clone(),
BufferOptions {
len: zelf.len(),
..Default::default()
},
);
Ok(buf)
}
}
impl BufferInternal for PyRef<PyBytes> {
fn obj_bytes(&self) -> BorrowedValue<[u8]> {
self.as_bytes().into()
}
fn obj_bytes_mut(&self) -> BorrowedValueMut<[u8]> {
unreachable!("bytes is not mutable")
}
fn release(&self) {}
fn retain(&self) {}
}
impl Hashable for PyBytes {
fn hash(zelf: &PyRef<Self>, vm: &VirtualMachine) -> PyResult<PyHash> {
Ok(zelf.inner.hash(vm))
}
}
impl Comparable for PyBytes {
fn cmp(
zelf: &PyRef<Self>,
other: &PyObjectRef,
op: PyComparisonOp,
vm: &VirtualMachine,
) -> PyResult<PyComparisonValue> {
Ok(if let Some(res) = op.identical_optimization(zelf, other) {
res.into()
} else if other.isinstance(&vm.ctx.types.memoryview_type)
&& op != PyComparisonOp::Eq
&& op != PyComparisonOp::Ne
{
return Err(vm.new_type_error(format!(
"'{}' not supported between instances of '{}' and '{}'",
op.operator_token(),
zelf.class().name(),
other.class().name()
)));
} else {
zelf.inner.cmp(other, op, vm)
})
}
}
impl Iterable for PyBytes {
fn iter(zelf: PyRef<Self>, vm: &VirtualMachine) -> PyResult {
Ok(PyBytesIterator {
position: AtomicCell::new(0),
bytes: zelf,
}
.into_object(vm))
}
}
#[pyclass(module = false, name = "bytes_iterator")]
#[derive(Debug)]
pub struct PyBytesIterator {
position: AtomicCell<usize>,
bytes: PyBytesRef,
}
impl PyValue for PyBytesIterator {
fn class(vm: &VirtualMachine) -> &PyTypeRef {
&vm.ctx.types.bytes_iterator_type
}
}
#[pyimpl(with(SlotIterator))]
impl PyBytesIterator {}
impl IteratorIterable for PyBytesIterator {}
impl SlotIterator for PyBytesIterator {
fn next(zelf: &PyRef<Self>, vm: &VirtualMachine) -> PyResult {
let pos = zelf.position.fetch_add(1);
if let Some(&ret) = zelf.bytes.as_bytes().get(pos) {
Ok(vm.ctx.new_int(ret))
} else {
Err(vm.new_stop_iteration())
}
}
}
impl TryFromBorrowedObject for PyBytes {
fn try_from_borrowed_object(vm: &VirtualMachine, obj: &PyObjectRef) -> PyResult<Self> {
PyBytesInner::try_from_borrowed_object(vm, obj).map(|x| x.into())
}
}