/* * Python struct module. * * Docs: https://docs.python.org/3/library/struct.html * * renamed to pystruct since struct is a rust keyword. * * Use this rust module to do byte packing: * https://docs.rs/byteorder/1.2.6/byteorder/ */ use std::io::{Cursor, Read, Write}; use std::iter::Peekable; use byteorder::{ReadBytesExt, WriteBytesExt}; use crate::function::PyFuncArgs; use crate::obj::{objbytes, objstr, objtype}; use crate::pyobject::{PyObjectRef, PyResult, TryFromObject}; use crate::VirtualMachine; #[derive(Debug)] struct FormatSpec { endianness: Endianness, codes: Vec, } #[derive(Debug)] enum Endianness { Native, Little, Big, Network, } #[derive(Debug)] struct FormatCode { code: char, } fn parse_format_string(fmt: String) -> Result { let mut chars = fmt.chars().peekable(); // First determine "<", ">","!" or "=" let endianness = parse_endiannes(&mut chars); // Now, analyze struct string furter: let codes = parse_format_codes(&mut chars)?; Ok(FormatSpec { endianness, codes }) } /// Parse endianness /// See also: https://docs.python.org/3/library/struct.html?highlight=struct#byte-order-size-and-alignment fn parse_endiannes(chars: &mut Peekable) -> Endianness where I: Sized + Iterator, { match chars.peek() { Some('@') => { chars.next().unwrap(); Endianness::Native } Some('=') => { chars.next().unwrap(); Endianness::Native } Some('<') => { chars.next().unwrap(); Endianness::Little } Some('>') => { chars.next().unwrap(); Endianness::Big } Some('!') => { chars.next().unwrap(); Endianness::Network } _ => Endianness::Native, } } fn parse_format_codes(chars: &mut Peekable) -> Result, String> where I: Sized + Iterator, { let mut codes = vec![]; while chars.peek().is_some() { // determine repeat operator: let repeat = match chars.peek() { Some('0'..='9') => { let mut repeat = 0; while let Some('0'..='9') = chars.peek() { if let Some(c) = chars.next() { let current_digit = c.to_digit(10).unwrap(); repeat = repeat * 10 + current_digit; } } Some(repeat) } _ => None, }; // determine format char: let c = chars.next(); match c { Some(c) if is_supported_format_character(c) => { if let Some(repeat) = repeat { for _ in 0..repeat { codes.push(FormatCode { code: c }) } } else { codes.push(FormatCode { code: c }) } } _ => return Err(format!("Illegal format code {:?}", c)), } } Ok(codes) } fn is_supported_format_character(c: char) -> bool { match c { 'b' | 'B' | 'h' | 'H' | 'i' | 'I' | 'l' | 'L' | 'q' | 'Q' | 'f' | 'd' => true, _ => false, } } macro_rules! make_pack_no_endianess { ($T:ty) => { paste::item! { fn [](vm: &VirtualMachine, arg: &PyObjectRef, data: &mut dyn Write) -> PyResult<()> { let v = $T::try_from_object(vm, arg.clone())?; data.[](v).unwrap(); Ok(()) } } }; } macro_rules! make_pack_with_endianess { ($T:ty) => { paste::item! { fn [](vm: &VirtualMachine, arg: &PyObjectRef, data: &mut dyn Write) -> PyResult<()> where Endianness: byteorder::ByteOrder, { let v = $T::try_from_object(vm, arg.clone())?; data.[]::(v).unwrap(); Ok(()) } } }; } make_pack_no_endianess!(i8); make_pack_no_endianess!(u8); make_pack_with_endianess!(i16); make_pack_with_endianess!(u16); make_pack_with_endianess!(i32); make_pack_with_endianess!(u32); make_pack_with_endianess!(i64); make_pack_with_endianess!(u64); make_pack_with_endianess!(f32); make_pack_with_endianess!(f64); fn pack_bool(vm: &VirtualMachine, arg: &PyObjectRef, data: &mut dyn Write) -> PyResult<()> { let v = if bool::try_from_object(vm, arg.clone())? { 1 } else { 0 }; data.write_u8(v).unwrap(); Ok(()) } fn pack_item( vm: &VirtualMachine, code: &FormatCode, arg: &PyObjectRef, data: &mut dyn Write, ) -> PyResult<()> where Endianness: byteorder::ByteOrder, { match code.code { 'b' => pack_i8(vm, arg, data)?, 'B' => pack_u8(vm, arg, data)?, '?' => pack_bool(vm, arg, data)?, 'h' => pack_i16::(vm, arg, data)?, 'H' => pack_u16::(vm, arg, data)?, 'i' | 'l' => pack_i32::(vm, arg, data)?, 'I' | 'L' => pack_u32::(vm, arg, data)?, 'q' => pack_i64::(vm, arg, data)?, 'Q' => pack_u64::(vm, arg, data)?, 'f' => pack_f32::(vm, arg, data)?, 'd' => pack_f64::(vm, arg, data)?, c => { panic!("Unsupported format code {:?}", c); } } Ok(()) } fn struct_pack(vm: &VirtualMachine, args: PyFuncArgs) -> PyResult { if args.args.is_empty() { Err(vm.new_type_error(format!( "Expected at least 1 argument (got: {})", args.args.len() ))) } else { let fmt_arg = args.args[0].clone(); if objtype::isinstance(&fmt_arg, &vm.ctx.str_type()) { let fmt_str = objstr::get_value(&fmt_arg); let format_spec = parse_format_string(fmt_str).map_err(|e| vm.new_value_error(e))?; if format_spec.codes.len() + 1 == args.args.len() { // Create data vector: let mut data = Vec::::new(); // Loop over all opcodes: for (code, arg) in format_spec.codes.iter().zip(args.args.iter().skip(1)) { debug!("code: {:?}", code); match format_spec.endianness { Endianness::Little => { pack_item::(vm, code, arg, &mut data)? } Endianness::Big => { pack_item::(vm, code, arg, &mut data)? } Endianness::Network => { pack_item::(vm, code, arg, &mut data)? } Endianness::Native => { pack_item::(vm, code, arg, &mut data)? } } } Ok(vm.ctx.new_bytes(data)) } else { Err(vm.new_type_error(format!( "Expected {} arguments (got: {})", format_spec.codes.len() + 1, args.args.len() ))) } } else { Err(vm.new_type_error("First argument must be of str type".to_string())) } } } fn unpack_i8(vm: &VirtualMachine, rdr: &mut dyn Read) -> PyResult { match rdr.read_i8() { Err(err) => panic!("Error in reading {:?}", err), Ok(v) => Ok(vm.ctx.new_int(v)), } } fn unpack_u8(vm: &VirtualMachine, rdr: &mut dyn Read) -> PyResult { match rdr.read_u8() { Err(err) => panic!("Error in reading {:?}", err), Ok(v) => Ok(vm.ctx.new_int(v)), } } fn unpack_bool(vm: &VirtualMachine, rdr: &mut dyn Read) -> PyResult { match rdr.read_u8() { Err(err) => panic!("Error in reading {:?}", err), Ok(v) => Ok(vm.ctx.new_bool(v > 0)), } } fn unpack_i16(vm: &VirtualMachine, rdr: &mut dyn Read) -> PyResult where Endianness: byteorder::ByteOrder, { match rdr.read_i16::() { Err(err) => panic!("Error in reading {:?}", err), Ok(v) => Ok(vm.ctx.new_int(v)), } } fn unpack_u16(vm: &VirtualMachine, rdr: &mut dyn Read) -> PyResult where Endianness: byteorder::ByteOrder, { match rdr.read_u16::() { Err(err) => panic!("Error in reading {:?}", err), Ok(v) => Ok(vm.ctx.new_int(v)), } } fn unpack_i32(vm: &VirtualMachine, rdr: &mut dyn Read) -> PyResult where Endianness: byteorder::ByteOrder, { match rdr.read_i32::() { Err(err) => panic!("Error in reading {:?}", err), Ok(v) => Ok(vm.ctx.new_int(v)), } } fn unpack_u32(vm: &VirtualMachine, rdr: &mut dyn Read) -> PyResult where Endianness: byteorder::ByteOrder, { match rdr.read_u32::() { Err(err) => panic!("Error in reading {:?}", err), Ok(v) => Ok(vm.ctx.new_int(v)), } } fn unpack_i64(vm: &VirtualMachine, rdr: &mut dyn Read) -> PyResult where Endianness: byteorder::ByteOrder, { match rdr.read_i64::() { Err(err) => panic!("Error in reading {:?}", err), Ok(v) => Ok(vm.ctx.new_int(v)), } } fn unpack_u64(vm: &VirtualMachine, rdr: &mut dyn Read) -> PyResult where Endianness: byteorder::ByteOrder, { match rdr.read_u64::() { Err(err) => panic!("Error in reading {:?}", err), Ok(v) => Ok(vm.ctx.new_int(v)), } } fn unpack_f32(vm: &VirtualMachine, rdr: &mut dyn Read) -> PyResult where Endianness: byteorder::ByteOrder, { match rdr.read_f32::() { Err(err) => panic!("Error in reading {:?}", err), Ok(v) => Ok(vm.ctx.new_float(f64::from(v))), } } fn unpack_f64(vm: &VirtualMachine, rdr: &mut dyn Read) -> PyResult where Endianness: byteorder::ByteOrder, { match rdr.read_f64::() { Err(err) => panic!("Error in reading {:?}", err), Ok(v) => Ok(vm.ctx.new_float(v)), } } fn struct_unpack(vm: &VirtualMachine, args: PyFuncArgs) -> PyResult { arg_check!( vm, args, required = [ (fmt, Some(vm.ctx.str_type())), (buffer, Some(vm.ctx.bytes_type())) ] ); let fmt_str = objstr::get_value(&fmt); let format_spec = parse_format_string(fmt_str).map_err(|e| vm.new_value_error(e))?; let data = objbytes::get_value(buffer).to_vec(); let mut rdr = Cursor::new(data); let mut items = vec![]; for code in format_spec.codes { debug!("unpack code: {:?}", code); let item = match format_spec.endianness { Endianness::Little => unpack_code::(vm, &code, &mut rdr)?, Endianness::Big => unpack_code::(vm, &code, &mut rdr)?, Endianness::Network => unpack_code::(vm, &code, &mut rdr)?, Endianness::Native => unpack_code::(vm, &code, &mut rdr)?, }; items.push(item); } Ok(vm.ctx.new_tuple(items)) } fn unpack_code(vm: &VirtualMachine, code: &FormatCode, rdr: &mut dyn Read) -> PyResult where Endianness: byteorder::ByteOrder, { match code.code { 'b' => unpack_i8(vm, rdr), 'B' => unpack_u8(vm, rdr), '?' => unpack_bool(vm, rdr), 'h' => unpack_i16::(vm, rdr), 'H' => unpack_u16::(vm, rdr), 'i' | 'l' => unpack_i32::(vm, rdr), 'I' | 'L' => unpack_u32::(vm, rdr), 'q' => unpack_i64::(vm, rdr), 'Q' => unpack_u64::(vm, rdr), 'f' => unpack_f32::(vm, rdr), 'd' => unpack_f64::(vm, rdr), c => { panic!("Unsupported format code {:?}", c); } } } pub fn make_module(vm: &VirtualMachine) -> PyObjectRef { let ctx = &vm.ctx; let struct_error = ctx.new_class("struct.error", ctx.object()); py_module!(vm, "struct", { "pack" => ctx.new_rustfunc(struct_pack), "unpack" => ctx.new_rustfunc(struct_unpack), "error" => struct_error, }) }