Compare commits

...

29 Commits

Author SHA1 Message Date
Jeong, YunWon
bd94d8d50c Remove uu.py and test_uu.py (#5607)
* Remove uu.py and test_uu.py

* patch email.message
2025-03-14 11:42:26 +09:00
Ashwin Naren
7fab64ed9c Revert "Update statistics to 3.13.2 (#5592)" (#5606)
This reverts commit ff970b0e1c.
2025-03-14 11:41:14 +09:00
Ashwin Naren
8e22c399df partially fix sys.getwindowsversion() (#5595) 2025-03-14 11:38:35 +09:00
Ashwin Naren
7546ea91a9 patch email.message 2025-03-13 10:16:51 -07:00
Ashwin Naren
8da66978bf Remove uu.py and test_uu.py 2025-03-13 09:49:10 -07:00
Ashwin Naren
8484bfa2e0 Remove cgi module (#5597)
* no cgi

* mark failing test
2025-03-12 09:47:34 +09:00
Ashwin Naren
ff970b0e1c Update statistics to 3.13.2 (#5592)
* statistics to 3.13.2

* set flaky test
2025-03-11 22:37:26 +09:00
Jeong, YunWon
8be7e4327d Merge pull request #5596 from arihant2math/osx-support-313
_osx_support update to 3.13.2
2025-03-11 15:58:19 +09:00
Jeong, YunWon
82eeb237dc Merge pull request #5598 from arihant2math/fix-whats-left-again
Fixed whats left
2025-03-11 15:57:57 +09:00
Ashwin Naren
cbbadf562f Fixed whats left 2025-03-10 23:27:05 -07:00
Ashwin Naren
4308321f39 _osx_support update to 3.13 2025-03-10 23:13:32 -07:00
Jeong, YunWon
985eebf9b0 Merge pull request #5589 from youknowone/pyattr-const
Replace pyattr(once) to constant
2025-03-11 10:12:50 +09:00
Ashwin Naren
bf28152a32 add os support modules 2025-03-10 11:43:53 +09:00
Ashwin Naren
bae0ad3aeb Fix extra newline in module.csv generation in cron ci (#5591) 2025-03-10 11:43:26 +09:00
Jeong YunWon
87fae150da Replace pyattr(once) to constant 2025-03-09 12:39:12 +09:00
Ashwin Naren
97853bf0f1 Fix module.csv generation in cron ci (#5586) 2025-03-06 13:20:04 +09:00
Noa
cc0a1ce9e2 Update webpack (#5585)
* Update webpack

* Build demo before notebook

* Use with instead of env for actions-gh-pages
2025-03-05 16:15:14 -06:00
Daniel O'Hear
58ebf04bac Add JIT compilation support for integer multiplication, division, and exponents (#5561)
* Initial commit for power function

* Float power jit developed

* Addded support for Floats and Ints

* Integration Testing for JITPower implementation.

* Update instructions.rs

cranelift more like painlift

* Update instructions.rs

* Update instructions.rs

* initial commit for making stable PR ready features

* fixed final edge case for compile_ipow

* fixed final edge case for compile_ipow

* commenting out compile_ipow

* fixed spelling errors

* removed unused tests

* forgot to run clippy

---------

Co-authored-by: Nicholas Paulick <paulicknicholas@gmail.com>
Co-authored-by: Nick <nick@Samanthas-MBP.wi.rr.com>
Co-authored-by: JoeLoparco <loparcojoseph@gmail.com>
Co-authored-by: Nathan Rusch <nathan.rusch@icloud.com>
Co-authored-by: dohear <daniel.ohear@marquette.edu>
2025-03-05 14:41:45 -06:00
Ashwin Naren
7fea1e1b4a fix what is left data upload to website and trigger cron-ci on workflow update 2025-03-05 13:50:52 -06:00
Ashwin Naren
d2bf31724f fix clippy 2025-03-05 13:49:37 -06:00
Ashwin Naren
b4929d258d formatting 2025-03-05 13:49:37 -06:00
Ashwin Naren
ddf2e591c6 resolve comments 2025-03-05 13:49:37 -06:00
Ashwin Naren
33940726a8 upgrade to windows-sys 0.59.0 2025-03-05 13:49:37 -06:00
Ashwin Naren
05cb8c0b73 Update socket.rs
Co-authored-by: Jeong, YunWon <69878+youknowone@users.noreply.github.com>
2025-03-05 13:49:37 -06:00
Ashwin Naren
8d2c6807d2 fix non-windows build 2025-03-05 13:49:37 -06:00
Ashwin Naren
4d9804f188 formatting 2025-03-05 13:49:37 -06:00
Ashwin Naren
3ae1160868 Remove winapi dependency 2025-03-05 13:49:37 -06:00
Ashwin Naren
6804dd4363 use rust-toolchain targets options instead of using rustup 2025-03-05 13:45:31 -06:00
Ashwin Naren
defcadafbb publish demo on weekly release 2025-03-05 13:45:31 -06:00
28 changed files with 823 additions and 2241 deletions

View File

@@ -2,6 +2,9 @@ on:
schedule:
- cron: '0 0 * * 6'
workflow_dispatch:
push:
paths:
- .github/workflows/cron-ci.yaml
name: Periodic checks/tasks
@@ -97,9 +100,9 @@ jobs:
cd website
[ -f ./_data/whats_left.temp ] && cp ./_data/whats_left.temp ./_data/whats_left_lastrun.temp
cp ../whats_left.temp ./_data/whats_left.temp
rm _data/whats_left/modules.csv
cat _data/whats_left.temp | grep "(entire module)" | cut -d ' ' -f 1 | sort >> ../_data/whats_left/modules.csv
rm ./_data/whats_left/modules.csv
echo -e "module" > ./_data/whats_left/modules.csv
cat ./_data/whats_left.temp | grep "(entire module)" | cut -d ' ' -f 1 | sort >> ./_data/whats_left/modules.csv
git add -A
if git -c user.name="Github Actions" -c user.email="actions@github.com" commit -m "Update what is left results" --author="$GITHUB_ACTOR"; then
git push

View File

@@ -89,10 +89,8 @@ jobs:
steps:
- uses: actions/checkout@v4
- uses: dtolnay/rust-toolchain@stable
- name: Set up Environment
shell: bash
run: rustup target add wasm32-wasip1
with:
targets: wasm32-wasip1
- name: Build RustPython
run: cargo build --target wasm32-wasip1 --no-default-features --features freeze-stdlib,stdlib --release
@@ -106,6 +104,34 @@ jobs:
name: rustpython-release-wasm32-wasip1
path: target/rustpython-release-wasm32-wasip1.wasm
- name: install wasm-pack
run: curl https://rustwasm.github.io/wasm-pack/installer/init.sh -sSf | sh
- uses: actions/setup-node@v4
- uses: mwilliamson/setup-wabt-action@v3
with: { wabt-version: "1.0.30" }
- name: build demo
run: |
npm install
npm run dist
env:
NODE_OPTIONS: "--openssl-legacy-provider"
working-directory: ./wasm/demo
- name: build notebook demo
run: |
npm install
npm run dist
mv dist ../demo/dist/notebook
env:
NODE_OPTIONS: "--openssl-legacy-provider"
working-directory: ./wasm/notebook
- name: Deploy demo to Github Pages
uses: peaceiris/actions-gh-pages@v4
with:
deploy_key: ${{ secrets.ACTIONS_DEMO_DEPLOY_KEY }}
publish_dir: ./wasm/demo/dist
external_repository: RustPython/demo
publish_branch: master
release:
runs-on: ubuntu-latest
needs: [build, build-wasm]
@@ -142,4 +168,4 @@ jobs:
--target="$tag" \
--generate-notes \
$PRERELEASE_ARG \
bin/rustpython-release-*
bin/rustpython-release-*

7
Cargo.lock generated
View File

@@ -2023,7 +2023,7 @@ dependencies = [
"siphasher 0.3.11",
"volatile",
"widestring",
"windows-sys 0.52.0",
"windows-sys 0.59.0",
]
[[package]]
@@ -2252,8 +2252,7 @@ dependencies = [
"unicode_names2",
"uuid",
"widestring",
"winapi",
"windows-sys 0.52.0",
"windows-sys 0.59.0",
"xml-rs",
]
@@ -2333,7 +2332,7 @@ dependencies = [
"which",
"widestring",
"windows",
"windows-sys 0.52.0",
"windows-sys 0.59.0",
"winreg",
]

View File

@@ -181,7 +181,7 @@ thiserror = "2.0"
thread_local = "1.1.8"
unicode_names2 = "1.3.0"
widestring = "1.1.0"
windows-sys = "0.52.0"
windows-sys = "0.59.0"
wasm-bindgen = "0.2.100"
# Lints

181
Lib/_android_support.py vendored Normal file
View File

@@ -0,0 +1,181 @@
import io
import sys
from threading import RLock
from time import sleep, time
# The maximum length of a log message in bytes, including the level marker and
# tag, is defined as LOGGER_ENTRY_MAX_PAYLOAD at
# https://cs.android.com/android/platform/superproject/+/android-14.0.0_r1:system/logging/liblog/include/log/log.h;l=71.
# Messages longer than this will be truncated by logcat. This limit has already
# been reduced at least once in the history of Android (from 4076 to 4068 between
# API level 23 and 26), so leave some headroom.
MAX_BYTES_PER_WRITE = 4000
# UTF-8 uses a maximum of 4 bytes per character, so limiting text writes to this
# size ensures that we can always avoid exceeding MAX_BYTES_PER_WRITE.
# However, if the actual number of bytes per character is smaller than that,
# then we may still join multiple consecutive text writes into binary
# writes containing a larger number of characters.
MAX_CHARS_PER_WRITE = MAX_BYTES_PER_WRITE // 4
# When embedded in an app on current versions of Android, there's no easy way to
# monitor the C-level stdout and stderr. The testbed comes with a .c file to
# redirect them to the system log using a pipe, but that wouldn't be convenient
# or appropriate for all apps. So we redirect at the Python level instead.
def init_streams(android_log_write, stdout_prio, stderr_prio):
if sys.executable:
return # Not embedded in an app.
global logcat
logcat = Logcat(android_log_write)
sys.stdout = TextLogStream(
stdout_prio, "python.stdout", sys.stdout.fileno())
sys.stderr = TextLogStream(
stderr_prio, "python.stderr", sys.stderr.fileno())
class TextLogStream(io.TextIOWrapper):
def __init__(self, prio, tag, fileno=None, **kwargs):
# The default is surrogateescape for stdout and backslashreplace for
# stderr, but in the context of an Android log, readability is more
# important than reversibility.
kwargs.setdefault("encoding", "UTF-8")
kwargs.setdefault("errors", "backslashreplace")
super().__init__(BinaryLogStream(prio, tag, fileno), **kwargs)
self._lock = RLock()
self._pending_bytes = []
self._pending_bytes_count = 0
def __repr__(self):
return f"<TextLogStream {self.buffer.tag!r}>"
def write(self, s):
if not isinstance(s, str):
raise TypeError(
f"write() argument must be str, not {type(s).__name__}")
# In case `s` is a str subclass that writes itself to stdout or stderr
# when we call its methods, convert it to an actual str.
s = str.__str__(s)
# We want to emit one log message per line wherever possible, so split
# the string into lines first. Note that "".splitlines() == [], so
# nothing will be logged for an empty string.
with self._lock:
for line in s.splitlines(keepends=True):
while line:
chunk = line[:MAX_CHARS_PER_WRITE]
line = line[MAX_CHARS_PER_WRITE:]
self._write_chunk(chunk)
return len(s)
# The size and behavior of TextIOWrapper's buffer is not part of its public
# API, so we handle buffering ourselves to avoid truncation.
def _write_chunk(self, s):
b = s.encode(self.encoding, self.errors)
if self._pending_bytes_count + len(b) > MAX_BYTES_PER_WRITE:
self.flush()
self._pending_bytes.append(b)
self._pending_bytes_count += len(b)
if (
self.write_through
or b.endswith(b"\n")
or self._pending_bytes_count > MAX_BYTES_PER_WRITE
):
self.flush()
def flush(self):
with self._lock:
self.buffer.write(b"".join(self._pending_bytes))
self._pending_bytes.clear()
self._pending_bytes_count = 0
# Since this is a line-based logging system, line buffering cannot be turned
# off, i.e. a newline always causes a flush.
@property
def line_buffering(self):
return True
class BinaryLogStream(io.RawIOBase):
def __init__(self, prio, tag, fileno=None):
self.prio = prio
self.tag = tag
self._fileno = fileno
def __repr__(self):
return f"<BinaryLogStream {self.tag!r}>"
def writable(self):
return True
def write(self, b):
if type(b) is not bytes:
try:
b = bytes(memoryview(b))
except TypeError:
raise TypeError(
f"write() argument must be bytes-like, not {type(b).__name__}"
) from None
# Writing an empty string to the stream should have no effect.
if b:
logcat.write(self.prio, self.tag, b)
return len(b)
# This is needed by the test suite --timeout option, which uses faulthandler.
def fileno(self):
if self._fileno is None:
raise io.UnsupportedOperation("fileno")
return self._fileno
# When a large volume of data is written to logcat at once, e.g. when a test
# module fails in --verbose3 mode, there's a risk of overflowing logcat's own
# buffer and losing messages. We avoid this by imposing a rate limit using the
# token bucket algorithm, based on a conservative estimate of how fast `adb
# logcat` can consume data.
MAX_BYTES_PER_SECOND = 1024 * 1024
# The logcat buffer size of a device can be determined by running `logcat -g`.
# We set the token bucket size to half of the buffer size of our current minimum
# API level, because other things on the system will be producing messages as
# well.
BUCKET_SIZE = 128 * 1024
# https://cs.android.com/android/platform/superproject/+/android-14.0.0_r1:system/logging/liblog/include/log/log_read.h;l=39
PER_MESSAGE_OVERHEAD = 28
class Logcat:
def __init__(self, android_log_write):
self.android_log_write = android_log_write
self._lock = RLock()
self._bucket_level = 0
self._prev_write_time = time()
def write(self, prio, tag, message):
# Encode null bytes using "modified UTF-8" to avoid them truncating the
# message.
message = message.replace(b"\x00", b"\xc0\x80")
with self._lock:
now = time()
self._bucket_level += (
(now - self._prev_write_time) * MAX_BYTES_PER_SECOND)
# If the bucket level is still below zero, the clock must have gone
# backwards, so reset it to zero and continue.
self._bucket_level = max(0, min(self._bucket_level, BUCKET_SIZE))
self._prev_write_time = now
self._bucket_level -= PER_MESSAGE_OVERHEAD + len(tag) + len(message)
if self._bucket_level < 0:
sleep(-self._bucket_level / MAX_BYTES_PER_SECOND)
self.android_log_write(prio, tag, message)

66
Lib/_apple_support.py vendored Normal file
View File

@@ -0,0 +1,66 @@
import io
import sys
def init_streams(log_write, stdout_level, stderr_level):
# Redirect stdout and stderr to the Apple system log. This method is
# invoked by init_apple_streams() (initconfig.c) if config->use_system_logger
# is enabled.
sys.stdout = SystemLog(log_write, stdout_level, errors=sys.stderr.errors)
sys.stderr = SystemLog(log_write, stderr_level, errors=sys.stderr.errors)
class SystemLog(io.TextIOWrapper):
def __init__(self, log_write, level, **kwargs):
kwargs.setdefault("encoding", "UTF-8")
kwargs.setdefault("line_buffering", True)
super().__init__(LogStream(log_write, level), **kwargs)
def __repr__(self):
return f"<SystemLog (level {self.buffer.level})>"
def write(self, s):
if not isinstance(s, str):
raise TypeError(
f"write() argument must be str, not {type(s).__name__}")
# In case `s` is a str subclass that writes itself to stdout or stderr
# when we call its methods, convert it to an actual str.
s = str.__str__(s)
# We want to emit one log message per line, so split
# the string before sending it to the superclass.
for line in s.splitlines(keepends=True):
super().write(line)
return len(s)
class LogStream(io.RawIOBase):
def __init__(self, log_write, level):
self.log_write = log_write
self.level = level
def __repr__(self):
return f"<LogStream (level {self.level!r})>"
def writable(self):
return True
def write(self, b):
if type(b) is not bytes:
try:
b = bytes(memoryview(b))
except TypeError:
raise TypeError(
f"write() argument must be bytes-like, not {type(b).__name__}"
) from None
# Writing an empty string to the stream should have no effect.
if b:
# Encode null bytes using "modified UTF-8" to avoid truncating the
# message. This should not affect the return value, as the caller
# may be expecting it to match the length of the input.
self.log_write(self.level, b.replace(b"\x00", b"\xc0\x80"))
return len(b)

71
Lib/_ios_support.py vendored Normal file
View File

@@ -0,0 +1,71 @@
import sys
try:
from ctypes import cdll, c_void_p, c_char_p, util
except ImportError:
# ctypes is an optional module. If it's not present, we're limited in what
# we can tell about the system, but we don't want to prevent the module
# from working.
print("ctypes isn't available; iOS system calls will not be available", file=sys.stderr)
objc = None
else:
# ctypes is available. Load the ObjC library, and wrap the objc_getClass,
# sel_registerName methods
lib = util.find_library("objc")
if lib is None:
# Failed to load the objc library
raise ImportError("ObjC runtime library couldn't be loaded")
objc = cdll.LoadLibrary(lib)
objc.objc_getClass.restype = c_void_p
objc.objc_getClass.argtypes = [c_char_p]
objc.sel_registerName.restype = c_void_p
objc.sel_registerName.argtypes = [c_char_p]
def get_platform_ios():
# Determine if this is a simulator using the multiarch value
is_simulator = sys.implementation._multiarch.endswith("simulator")
# We can't use ctypes; abort
if not objc:
return None
# Most of the methods return ObjC objects
objc.objc_msgSend.restype = c_void_p
# All the methods used have no arguments.
objc.objc_msgSend.argtypes = [c_void_p, c_void_p]
# Equivalent of:
# device = [UIDevice currentDevice]
UIDevice = objc.objc_getClass(b"UIDevice")
SEL_currentDevice = objc.sel_registerName(b"currentDevice")
device = objc.objc_msgSend(UIDevice, SEL_currentDevice)
# Equivalent of:
# device_systemVersion = [device systemVersion]
SEL_systemVersion = objc.sel_registerName(b"systemVersion")
device_systemVersion = objc.objc_msgSend(device, SEL_systemVersion)
# Equivalent of:
# device_systemName = [device systemName]
SEL_systemName = objc.sel_registerName(b"systemName")
device_systemName = objc.objc_msgSend(device, SEL_systemName)
# Equivalent of:
# device_model = [device model]
SEL_model = objc.sel_registerName(b"model")
device_model = objc.objc_msgSend(device, SEL_model)
# UTF8String returns a const char*;
SEL_UTF8String = objc.sel_registerName(b"UTF8String")
objc.objc_msgSend.restype = c_char_p
# Equivalent of:
# system = [device_systemName UTF8String]
# release = [device_systemVersion UTF8String]
# model = [device_model UTF8String]
system = objc.objc_msgSend(device_systemName, SEL_UTF8String).decode()
release = objc.objc_msgSend(device_systemVersion, SEL_UTF8String).decode()
model = objc.objc_msgSend(device_model, SEL_UTF8String).decode()
return system, release, model, is_simulator

5
Lib/_osx_support.py vendored
View File

@@ -507,6 +507,11 @@ def get_platform_osx(_config_vars, osname, release, machine):
# MACOSX_DEPLOYMENT_TARGET.
macver = _config_vars.get('MACOSX_DEPLOYMENT_TARGET', '')
if macver and '.' not in macver:
# Ensure that the version includes at least a major
# and minor version, even if MACOSX_DEPLOYMENT_TARGET
# is set to a single-label version like "14".
macver += '.0'
macrelease = _get_system_version() or macver
macver = macver or macrelease

1012
Lib/cgi.py vendored

File diff suppressed because it is too large Load Diff

39
Lib/email/message.py vendored
View File

@@ -7,7 +7,6 @@
__all__ = ['Message', 'EmailMessage']
import re
import uu
import quopri
from io import BytesIO, StringIO
@@ -101,6 +100,35 @@ def _unquotevalue(value):
return utils.unquote(value)
def _decode_uu(encoded):
"""Decode uuencoded data."""
decoded_lines = []
encoded_lines_iter = iter(encoded.splitlines())
for line in encoded_lines_iter:
if line.startswith(b"begin "):
mode, _, path = line.removeprefix(b"begin ").partition(b" ")
try:
int(mode, base=8)
except ValueError:
continue
else:
break
else:
raise ValueError("`begin` line not found")
for line in encoded_lines_iter:
if not line:
raise ValueError("Truncated input")
elif line.strip(b' \t\r\n\f') == b'end':
break
try:
decoded_line = binascii.a2b_uu(line)
except binascii.Error:
# Workaround for broken uuencoders by /Fredrik Lundh
nbytes = (((line[0]-32) & 63) * 4 + 5) // 3
decoded_line = binascii.a2b_uu(line[:nbytes])
decoded_lines.append(decoded_line)
return b''.join(decoded_lines)
class Message:
"""Basic message object.
@@ -288,13 +316,10 @@ class Message:
self.policy.handle_defect(self, defect)
return value
elif cte in ('x-uuencode', 'uuencode', 'uue', 'x-uue'):
in_file = BytesIO(bpayload)
out_file = BytesIO()
try:
uu.decode(in_file, out_file, quiet=True)
return out_file.getvalue()
except uu.Error:
# Some decoding problem
return _decode_uu(bpayload)
except ValueError:
# Some decoding problem.
return bpayload
if isinstance(payload, str):
return bpayload

645
Lib/test/test_cgi.py vendored
View File

@@ -1,645 +0,0 @@
import os
import sys
import tempfile
import unittest
from collections import namedtuple
from io import StringIO, BytesIO
from test import support
from test.support import warnings_helper
cgi = warnings_helper.import_deprecated("cgi")
class HackedSysModule:
# The regression test will have real values in sys.argv, which
# will completely confuse the test of the cgi module
argv = []
stdin = sys.stdin
cgi.sys = HackedSysModule()
class ComparableException:
def __init__(self, err):
self.err = err
def __str__(self):
return str(self.err)
def __eq__(self, anExc):
if not isinstance(anExc, Exception):
return NotImplemented
return (self.err.__class__ == anExc.__class__ and
self.err.args == anExc.args)
def __getattr__(self, attr):
return getattr(self.err, attr)
def do_test(buf, method):
env = {}
if method == "GET":
fp = None
env['REQUEST_METHOD'] = 'GET'
env['QUERY_STRING'] = buf
elif method == "POST":
fp = BytesIO(buf.encode('latin-1')) # FieldStorage expects bytes
env['REQUEST_METHOD'] = 'POST'
env['CONTENT_TYPE'] = 'application/x-www-form-urlencoded'
env['CONTENT_LENGTH'] = str(len(buf))
else:
raise ValueError("unknown method: %s" % method)
try:
return cgi.parse(fp, env, strict_parsing=1)
except Exception as err:
return ComparableException(err)
parse_strict_test_cases = [
("", {}),
("&", ValueError("bad query field: ''")),
("&&", ValueError("bad query field: ''")),
# Should the next few really be valid?
("=", {}),
("=&=", {}),
# This rest seem to make sense
("=a", {'': ['a']}),
("&=a", ValueError("bad query field: ''")),
("=a&", ValueError("bad query field: ''")),
("=&a", ValueError("bad query field: 'a'")),
("b=a", {'b': ['a']}),
("b+=a", {'b ': ['a']}),
("a=b=a", {'a': ['b=a']}),
("a=+b=a", {'a': [' b=a']}),
("&b=a", ValueError("bad query field: ''")),
("b&=a", ValueError("bad query field: 'b'")),
("a=a+b&b=b+c", {'a': ['a b'], 'b': ['b c']}),
("a=a+b&a=b+a", {'a': ['a b', 'b a']}),
("x=1&y=2.0&z=2-3.%2b0", {'x': ['1'], 'y': ['2.0'], 'z': ['2-3.+0']}),
("Hbc5161168c542333633315dee1182227:key_store_seqid=400006&cuyer=r&view=bustomer&order_id=0bb2e248638833d48cb7fed300000f1b&expire=964546263&lobale=en-US&kid=130003.300038&ss=env",
{'Hbc5161168c542333633315dee1182227:key_store_seqid': ['400006'],
'cuyer': ['r'],
'expire': ['964546263'],
'kid': ['130003.300038'],
'lobale': ['en-US'],
'order_id': ['0bb2e248638833d48cb7fed300000f1b'],
'ss': ['env'],
'view': ['bustomer'],
}),
("group_id=5470&set=custom&_assigned_to=31392&_status=1&_category=100&SUBMIT=Browse",
{'SUBMIT': ['Browse'],
'_assigned_to': ['31392'],
'_category': ['100'],
'_status': ['1'],
'group_id': ['5470'],
'set': ['custom'],
})
]
def norm(seq):
return sorted(seq, key=repr)
def first_elts(list):
return [p[0] for p in list]
def first_second_elts(list):
return [(p[0], p[1][0]) for p in list]
def gen_result(data, environ):
encoding = 'latin-1'
fake_stdin = BytesIO(data.encode(encoding))
fake_stdin.seek(0)
form = cgi.FieldStorage(fp=fake_stdin, environ=environ, encoding=encoding)
result = {}
for k, v in dict(form).items():
result[k] = isinstance(v, list) and form.getlist(k) or v.value
return result
class CgiTests(unittest.TestCase):
def test_parse_multipart(self):
fp = BytesIO(POSTDATA.encode('latin1'))
env = {'boundary': BOUNDARY.encode('latin1'),
'CONTENT-LENGTH': '558'}
result = cgi.parse_multipart(fp, env)
expected = {'submit': [' Add '], 'id': ['1234'],
'file': [b'Testing 123.\n'], 'title': ['']}
self.assertEqual(result, expected)
def test_parse_multipart_without_content_length(self):
POSTDATA = '''--JfISa01
Content-Disposition: form-data; name="submit-name"
just a string
--JfISa01--
'''
fp = BytesIO(POSTDATA.encode('latin1'))
env = {'boundary': 'JfISa01'.encode('latin1')}
result = cgi.parse_multipart(fp, env)
expected = {'submit-name': ['just a string\n']}
self.assertEqual(result, expected)
# TODO RUSTPYTHON - see https://github.com/RustPython/RustPython/issues/935
@unittest.expectedFailure
def test_parse_multipart_invalid_encoding(self):
BOUNDARY = "JfISa01"
POSTDATA = """--JfISa01
Content-Disposition: form-data; name="submit-name"
Content-Length: 3
\u2603
--JfISa01"""
fp = BytesIO(POSTDATA.encode('utf8'))
env = {'boundary': BOUNDARY.encode('latin1'),
'CONTENT-LENGTH': str(len(POSTDATA.encode('utf8')))}
result = cgi.parse_multipart(fp, env, encoding="ascii",
errors="surrogateescape")
expected = {'submit-name': ["\udce2\udc98\udc83"]}
self.assertEqual(result, expected)
self.assertEqual("\u2603".encode('utf8'),
result["submit-name"][0].encode('utf8', 'surrogateescape'))
def test_fieldstorage_properties(self):
fs = cgi.FieldStorage()
self.assertFalse(fs)
self.assertIn("FieldStorage", repr(fs))
self.assertEqual(list(fs), list(fs.keys()))
fs.list.append(namedtuple('MockFieldStorage', 'name')('fieldvalue'))
self.assertTrue(fs)
def test_fieldstorage_invalid(self):
self.assertRaises(TypeError, cgi.FieldStorage, "not-a-file-obj",
environ={"REQUEST_METHOD":"PUT"})
self.assertRaises(TypeError, cgi.FieldStorage, "foo", "bar")
fs = cgi.FieldStorage(headers={'content-type':'text/plain'})
self.assertRaises(TypeError, bool, fs)
# TODO: RUSTPYTHON
@unittest.expectedFailure
def test_strict(self):
for orig, expect in parse_strict_test_cases:
# Test basic parsing
d = do_test(orig, "GET")
self.assertEqual(d, expect, "Error parsing %s method GET" % repr(orig))
d = do_test(orig, "POST")
self.assertEqual(d, expect, "Error parsing %s method POST" % repr(orig))
env = {'QUERY_STRING': orig}
fs = cgi.FieldStorage(environ=env)
if isinstance(expect, dict):
# test dict interface
self.assertEqual(len(expect), len(fs))
self.assertCountEqual(expect.keys(), fs.keys())
##self.assertEqual(norm(expect.values()), norm(fs.values()))
##self.assertEqual(norm(expect.items()), norm(fs.items()))
self.assertEqual(fs.getvalue("nonexistent field", "default"), "default")
# test individual fields
for key in expect.keys():
expect_val = expect[key]
self.assertIn(key, fs)
if len(expect_val) > 1:
self.assertEqual(fs.getvalue(key), expect_val)
else:
self.assertEqual(fs.getvalue(key), expect_val[0])
def test_separator(self):
parse_semicolon = [
("x=1;y=2.0", {'x': ['1'], 'y': ['2.0']}),
("x=1;y=2.0;z=2-3.%2b0", {'x': ['1'], 'y': ['2.0'], 'z': ['2-3.+0']}),
(";", ValueError("bad query field: ''")),
(";;", ValueError("bad query field: ''")),
("=;a", ValueError("bad query field: 'a'")),
(";b=a", ValueError("bad query field: ''")),
("b;=a", ValueError("bad query field: 'b'")),
("a=a+b;b=b+c", {'a': ['a b'], 'b': ['b c']}),
("a=a+b;a=b+a", {'a': ['a b', 'b a']}),
]
for orig, expect in parse_semicolon:
env = {'QUERY_STRING': orig}
fs = cgi.FieldStorage(separator=';', environ=env)
if isinstance(expect, dict):
for key in expect.keys():
expect_val = expect[key]
self.assertIn(key, fs)
if len(expect_val) > 1:
self.assertEqual(fs.getvalue(key), expect_val)
else:
self.assertEqual(fs.getvalue(key), expect_val[0])
@warnings_helper.ignore_warnings(category=DeprecationWarning)
def test_log(self):
cgi.log("Testing")
cgi.logfp = StringIO()
cgi.initlog("%s", "Testing initlog 1")
cgi.log("%s", "Testing log 2")
self.assertEqual(cgi.logfp.getvalue(), "Testing initlog 1\nTesting log 2\n")
if os.path.exists(os.devnull):
cgi.logfp = None
cgi.logfile = os.devnull
cgi.initlog("%s", "Testing log 3")
self.addCleanup(cgi.closelog)
cgi.log("Testing log 4")
def test_fieldstorage_readline(self):
# FieldStorage uses readline, which has the capacity to read all
# contents of the input file into memory; we use readline's size argument
# to prevent that for files that do not contain any newlines in
# non-GET/HEAD requests
class TestReadlineFile:
def __init__(self, file):
self.file = file
self.numcalls = 0
def readline(self, size=None):
self.numcalls += 1
if size:
return self.file.readline(size)
else:
return self.file.readline()
def __getattr__(self, name):
file = self.__dict__['file']
a = getattr(file, name)
if not isinstance(a, int):
setattr(self, name, a)
return a
f = TestReadlineFile(tempfile.TemporaryFile("wb+"))
self.addCleanup(f.close)
f.write(b'x' * 256 * 1024)
f.seek(0)
env = {'REQUEST_METHOD':'PUT'}
fs = cgi.FieldStorage(fp=f, environ=env)
self.addCleanup(fs.file.close)
# if we're not chunking properly, readline is only called twice
# (by read_binary); if we are chunking properly, it will be called 5 times
# as long as the chunksize is 1 << 16.
self.assertGreater(f.numcalls, 2)
f.close()
def test_fieldstorage_multipart(self):
#Test basic FieldStorage multipart parsing
env = {
'REQUEST_METHOD': 'POST',
'CONTENT_TYPE': 'multipart/form-data; boundary={}'.format(BOUNDARY),
'CONTENT_LENGTH': '558'}
fp = BytesIO(POSTDATA.encode('latin-1'))
fs = cgi.FieldStorage(fp, environ=env, encoding="latin-1")
self.assertEqual(len(fs.list), 4)
expect = [{'name':'id', 'filename':None, 'value':'1234'},
{'name':'title', 'filename':None, 'value':''},
{'name':'file', 'filename':'test.txt', 'value':b'Testing 123.\n'},
{'name':'submit', 'filename':None, 'value':' Add '}]
for x in range(len(fs.list)):
for k, exp in expect[x].items():
got = getattr(fs.list[x], k)
self.assertEqual(got, exp)
def test_fieldstorage_multipart_leading_whitespace(self):
env = {
'REQUEST_METHOD': 'POST',
'CONTENT_TYPE': 'multipart/form-data; boundary={}'.format(BOUNDARY),
'CONTENT_LENGTH': '560'}
# Add some leading whitespace to our post data that will cause the
# first line to not be the innerboundary.
fp = BytesIO(b"\r\n" + POSTDATA.encode('latin-1'))
fs = cgi.FieldStorage(fp, environ=env, encoding="latin-1")
self.assertEqual(len(fs.list), 4)
expect = [{'name':'id', 'filename':None, 'value':'1234'},
{'name':'title', 'filename':None, 'value':''},
{'name':'file', 'filename':'test.txt', 'value':b'Testing 123.\n'},
{'name':'submit', 'filename':None, 'value':' Add '}]
for x in range(len(fs.list)):
for k, exp in expect[x].items():
got = getattr(fs.list[x], k)
self.assertEqual(got, exp)
def test_fieldstorage_multipart_non_ascii(self):
#Test basic FieldStorage multipart parsing
env = {'REQUEST_METHOD':'POST',
'CONTENT_TYPE': 'multipart/form-data; boundary={}'.format(BOUNDARY),
'CONTENT_LENGTH':'558'}
for encoding in ['iso-8859-1','utf-8']:
fp = BytesIO(POSTDATA_NON_ASCII.encode(encoding))
fs = cgi.FieldStorage(fp, environ=env,encoding=encoding)
self.assertEqual(len(fs.list), 1)
expect = [{'name':'id', 'filename':None, 'value':'\xe7\xf1\x80'}]
for x in range(len(fs.list)):
for k, exp in expect[x].items():
got = getattr(fs.list[x], k)
self.assertEqual(got, exp)
def test_fieldstorage_multipart_maxline(self):
# Issue #18167
maxline = 1 << 16
self.maxDiff = None
def check(content):
data = """---123
Content-Disposition: form-data; name="upload"; filename="fake.txt"
Content-Type: text/plain
%s
---123--
""".replace('\n', '\r\n') % content
environ = {
'CONTENT_LENGTH': str(len(data)),
'CONTENT_TYPE': 'multipart/form-data; boundary=-123',
'REQUEST_METHOD': 'POST',
}
self.assertEqual(gen_result(data, environ),
{'upload': content.encode('latin1')})
check('x' * (maxline - 1))
check('x' * (maxline - 1) + '\r')
check('x' * (maxline - 1) + '\r' + 'y' * (maxline - 1))
def test_fieldstorage_multipart_w3c(self):
# Test basic FieldStorage multipart parsing (W3C sample)
env = {
'REQUEST_METHOD': 'POST',
'CONTENT_TYPE': 'multipart/form-data; boundary={}'.format(BOUNDARY_W3),
'CONTENT_LENGTH': str(len(POSTDATA_W3))}
fp = BytesIO(POSTDATA_W3.encode('latin-1'))
fs = cgi.FieldStorage(fp, environ=env, encoding="latin-1")
self.assertEqual(len(fs.list), 2)
self.assertEqual(fs.list[0].name, 'submit-name')
self.assertEqual(fs.list[0].value, 'Larry')
self.assertEqual(fs.list[1].name, 'files')
files = fs.list[1].value
self.assertEqual(len(files), 2)
expect = [{'name': None, 'filename': 'file1.txt', 'value': b'... contents of file1.txt ...'},
{'name': None, 'filename': 'file2.gif', 'value': b'...contents of file2.gif...'}]
for x in range(len(files)):
for k, exp in expect[x].items():
got = getattr(files[x], k)
self.assertEqual(got, exp)
def test_fieldstorage_part_content_length(self):
BOUNDARY = "JfISa01"
POSTDATA = """--JfISa01
Content-Disposition: form-data; name="submit-name"
Content-Length: 5
Larry
--JfISa01"""
env = {
'REQUEST_METHOD': 'POST',
'CONTENT_TYPE': 'multipart/form-data; boundary={}'.format(BOUNDARY),
'CONTENT_LENGTH': str(len(POSTDATA))}
fp = BytesIO(POSTDATA.encode('latin-1'))
fs = cgi.FieldStorage(fp, environ=env, encoding="latin-1")
self.assertEqual(len(fs.list), 1)
self.assertEqual(fs.list[0].name, 'submit-name')
self.assertEqual(fs.list[0].value, 'Larry')
def test_field_storage_multipart_no_content_length(self):
fp = BytesIO(b"""--MyBoundary
Content-Disposition: form-data; name="my-arg"; filename="foo"
Test
--MyBoundary--
""")
env = {
"REQUEST_METHOD": "POST",
"CONTENT_TYPE": "multipart/form-data; boundary=MyBoundary",
"wsgi.input": fp,
}
fields = cgi.FieldStorage(fp, environ=env)
self.assertEqual(len(fields["my-arg"].file.read()), 5)
def test_fieldstorage_as_context_manager(self):
fp = BytesIO(b'x' * 10)
env = {'REQUEST_METHOD': 'PUT'}
with cgi.FieldStorage(fp=fp, environ=env) as fs:
content = fs.file.read()
self.assertFalse(fs.file.closed)
self.assertTrue(fs.file.closed)
self.assertEqual(content, 'x' * 10)
with self.assertRaisesRegex(ValueError, 'I/O operation on closed file'):
fs.file.read()
_qs_result = {
'key1': 'value1',
'key2': ['value2x', 'value2y'],
'key3': 'value3',
'key4': 'value4'
}
def testQSAndUrlEncode(self):
data = "key2=value2x&key3=value3&key4=value4"
environ = {
'CONTENT_LENGTH': str(len(data)),
'CONTENT_TYPE': 'application/x-www-form-urlencoded',
'QUERY_STRING': 'key1=value1&key2=value2y',
'REQUEST_METHOD': 'POST',
}
v = gen_result(data, environ)
self.assertEqual(self._qs_result, v)
def test_max_num_fields(self):
# For application/x-www-form-urlencoded
data = '&'.join(['a=a']*11)
environ = {
'CONTENT_LENGTH': str(len(data)),
'CONTENT_TYPE': 'application/x-www-form-urlencoded',
'REQUEST_METHOD': 'POST',
}
with self.assertRaises(ValueError):
cgi.FieldStorage(
fp=BytesIO(data.encode()),
environ=environ,
max_num_fields=10,
)
# For multipart/form-data
data = """---123
Content-Disposition: form-data; name="a"
3
---123
Content-Type: application/x-www-form-urlencoded
a=4
---123
Content-Type: application/x-www-form-urlencoded
a=5
---123--
"""
environ = {
'CONTENT_LENGTH': str(len(data)),
'CONTENT_TYPE': 'multipart/form-data; boundary=-123',
'QUERY_STRING': 'a=1&a=2',
'REQUEST_METHOD': 'POST',
}
# 2 GET entities
# 1 top level POST entities
# 1 entity within the second POST entity
# 1 entity within the third POST entity
with self.assertRaises(ValueError):
cgi.FieldStorage(
fp=BytesIO(data.encode()),
environ=environ,
max_num_fields=4,
)
cgi.FieldStorage(
fp=BytesIO(data.encode()),
environ=environ,
max_num_fields=5,
)
def testQSAndFormData(self):
data = """---123
Content-Disposition: form-data; name="key2"
value2y
---123
Content-Disposition: form-data; name="key3"
value3
---123
Content-Disposition: form-data; name="key4"
value4
---123--
"""
environ = {
'CONTENT_LENGTH': str(len(data)),
'CONTENT_TYPE': 'multipart/form-data; boundary=-123',
'QUERY_STRING': 'key1=value1&key2=value2x',
'REQUEST_METHOD': 'POST',
}
v = gen_result(data, environ)
self.assertEqual(self._qs_result, v)
def testQSAndFormDataFile(self):
data = """---123
Content-Disposition: form-data; name="key2"
value2y
---123
Content-Disposition: form-data; name="key3"
value3
---123
Content-Disposition: form-data; name="key4"
value4
---123
Content-Disposition: form-data; name="upload"; filename="fake.txt"
Content-Type: text/plain
this is the content of the fake file
---123--
"""
environ = {
'CONTENT_LENGTH': str(len(data)),
'CONTENT_TYPE': 'multipart/form-data; boundary=-123',
'QUERY_STRING': 'key1=value1&key2=value2x',
'REQUEST_METHOD': 'POST',
}
result = self._qs_result.copy()
result.update({
'upload': b'this is the content of the fake file\n'
})
v = gen_result(data, environ)
self.assertEqual(result, v)
def test_parse_header(self):
self.assertEqual(
cgi.parse_header("text/plain"),
("text/plain", {}))
self.assertEqual(
cgi.parse_header("text/vnd.just.made.this.up ; "),
("text/vnd.just.made.this.up", {}))
self.assertEqual(
cgi.parse_header("text/plain;charset=us-ascii"),
("text/plain", {"charset": "us-ascii"}))
self.assertEqual(
cgi.parse_header('text/plain ; charset="us-ascii"'),
("text/plain", {"charset": "us-ascii"}))
self.assertEqual(
cgi.parse_header('text/plain ; charset="us-ascii"; another=opt'),
("text/plain", {"charset": "us-ascii", "another": "opt"}))
self.assertEqual(
cgi.parse_header('attachment; filename="silly.txt"'),
("attachment", {"filename": "silly.txt"}))
self.assertEqual(
cgi.parse_header('attachment; filename="strange;name"'),
("attachment", {"filename": "strange;name"}))
self.assertEqual(
cgi.parse_header('attachment; filename="strange;name";size=123;'),
("attachment", {"filename": "strange;name", "size": "123"}))
self.assertEqual(
cgi.parse_header('form-data; name="files"; filename="fo\\"o;bar"'),
("form-data", {"name": "files", "filename": 'fo"o;bar'}))
def test_all(self):
not_exported = {
"logfile", "logfp", "initlog", "dolog", "nolog", "closelog", "log",
"maxlen", "valid_boundary"}
support.check__all__(self, cgi, not_exported=not_exported)
BOUNDARY = "---------------------------721837373350705526688164684"
POSTDATA = """-----------------------------721837373350705526688164684
Content-Disposition: form-data; name="id"
1234
-----------------------------721837373350705526688164684
Content-Disposition: form-data; name="title"
-----------------------------721837373350705526688164684
Content-Disposition: form-data; name="file"; filename="test.txt"
Content-Type: text/plain
Testing 123.
-----------------------------721837373350705526688164684
Content-Disposition: form-data; name="submit"
Add\x20
-----------------------------721837373350705526688164684--
"""
POSTDATA_NON_ASCII = """-----------------------------721837373350705526688164684
Content-Disposition: form-data; name="id"
\xe7\xf1\x80
-----------------------------721837373350705526688164684
"""
# http://www.w3.org/TR/html401/interact/forms.html#h-17.13.4
BOUNDARY_W3 = "AaB03x"
POSTDATA_W3 = """--AaB03x
Content-Disposition: form-data; name="submit-name"
Larry
--AaB03x
Content-Disposition: form-data; name="files"
Content-Type: multipart/mixed; boundary=BbC04y
--BbC04y
Content-Disposition: file; filename="file1.txt"
Content-Type: text/plain
... contents of file1.txt ...
--BbC04y
Content-Disposition: file; filename="file2.gif"
Content-Type: image/gif
Content-Transfer-Encoding: binary
...contents of file2.gif...
--BbC04y--
--AaB03x--
"""
if __name__ == '__main__':
unittest.main()

View File

@@ -778,6 +778,7 @@ class CGIHTTPServerTestCase(BaseTestCase):
# TODO: RUSTPYTHON
@unittest.skipIf(sys.platform != 'win32', "TODO: RUSTPYTHON; works only on windows")
@unittest.expectedFailure
def test_post(self):
params = urllib.parse.urlencode(
{'spam' : 1, 'eggs' : 'python', 'bacon' : 123456})

258
Lib/test/test_uu.py vendored
View File

@@ -1,258 +0,0 @@
"""
Tests for uu module.
Nick Mathewson
"""
import unittest
from test.support import os_helper
import os
import stat
import sys
import uu
import io
plaintext = b"The symbols on top of your keyboard are !@#$%^&*()_+|~\n"
encodedtext = b"""\
M5&AE('-Y;6)O;',@;VX@=&]P(&]F('EO=7(@:V5Y8F]A<F0@87)E("% (R0E
*7B8J*"E?*WQ^"@ """
# Stolen from io.py
class FakeIO(io.TextIOWrapper):
"""Text I/O implementation using an in-memory buffer.
Can be a used as a drop-in replacement for sys.stdin and sys.stdout.
"""
# XXX This is really slow, but fully functional
def __init__(self, initial_value="", encoding="utf-8",
errors="strict", newline="\n"):
super(FakeIO, self).__init__(io.BytesIO(),
encoding=encoding,
errors=errors,
newline=newline)
self._encoding = encoding
self._errors = errors
if initial_value:
if not isinstance(initial_value, str):
initial_value = str(initial_value)
self.write(initial_value)
self.seek(0)
def getvalue(self):
self.flush()
return self.buffer.getvalue().decode(self._encoding, self._errors)
def encodedtextwrapped(mode, filename, backtick=False):
if backtick:
res = (bytes("begin %03o %s\n" % (mode, filename), "ascii") +
encodedtext.replace(b' ', b'`') + b"\n`\nend\n")
else:
res = (bytes("begin %03o %s\n" % (mode, filename), "ascii") +
encodedtext + b"\n \nend\n")
return res
class UUTest(unittest.TestCase):
# TODO: RUSTPYTHON
@unittest.expectedFailure
def test_encode(self):
inp = io.BytesIO(plaintext)
out = io.BytesIO()
uu.encode(inp, out, "t1")
self.assertEqual(out.getvalue(), encodedtextwrapped(0o666, "t1"))
inp = io.BytesIO(plaintext)
out = io.BytesIO()
uu.encode(inp, out, "t1", 0o644)
self.assertEqual(out.getvalue(), encodedtextwrapped(0o644, "t1"))
inp = io.BytesIO(plaintext)
out = io.BytesIO()
uu.encode(inp, out, "t1", backtick=True)
self.assertEqual(out.getvalue(), encodedtextwrapped(0o666, "t1", True))
with self.assertRaises(TypeError):
uu.encode(inp, out, "t1", 0o644, True)
def test_decode(self):
for backtick in True, False:
inp = io.BytesIO(encodedtextwrapped(0o666, "t1", backtick=backtick))
out = io.BytesIO()
uu.decode(inp, out)
self.assertEqual(out.getvalue(), plaintext)
inp = io.BytesIO(
b"UUencoded files may contain many lines,\n" +
b"even some that have 'begin' in them.\n" +
encodedtextwrapped(0o666, "t1", backtick=backtick)
)
out = io.BytesIO()
uu.decode(inp, out)
self.assertEqual(out.getvalue(), plaintext)
def test_truncatedinput(self):
inp = io.BytesIO(b"begin 644 t1\n" + encodedtext)
out = io.BytesIO()
try:
uu.decode(inp, out)
self.fail("No exception raised")
except uu.Error as e:
self.assertEqual(str(e), "Truncated input file")
def test_missingbegin(self):
inp = io.BytesIO(b"")
out = io.BytesIO()
try:
uu.decode(inp, out)
self.fail("No exception raised")
except uu.Error as e:
self.assertEqual(str(e), "No valid begin line found in input file")
def test_garbage_padding(self):
# Issue #22406
encodedtext1 = (
b"begin 644 file\n"
# length 1; bits 001100 111111 111111 111111
b"\x21\x2C\x5F\x5F\x5F\n"
b"\x20\n"
b"end\n"
)
encodedtext2 = (
b"begin 644 file\n"
# length 1; bits 001100 111111 111111 111111
b"\x21\x2C\x5F\x5F\x5F\n"
b"\x60\n"
b"end\n"
)
plaintext = b"\x33" # 00110011
for encodedtext in encodedtext1, encodedtext2:
with self.subTest("uu.decode()"):
inp = io.BytesIO(encodedtext)
out = io.BytesIO()
uu.decode(inp, out, quiet=True)
self.assertEqual(out.getvalue(), plaintext)
with self.subTest("uu_codec"):
import codecs
decoded = codecs.decode(encodedtext, "uu_codec")
self.assertEqual(decoded, plaintext)
# TODO: RUSTPYTHON
@unittest.expectedFailure
def test_newlines_escaped(self):
# Test newlines are escaped with uu.encode
inp = io.BytesIO(plaintext)
out = io.BytesIO()
filename = "test.txt\n\roverflow.txt"
safefilename = b"test.txt\\n\\roverflow.txt"
uu.encode(inp, out, filename)
self.assertIn(safefilename, out.getvalue())
class UUStdIOTest(unittest.TestCase):
def setUp(self):
self.stdin = sys.stdin
self.stdout = sys.stdout
def tearDown(self):
sys.stdin = self.stdin
sys.stdout = self.stdout
def test_encode(self):
sys.stdin = FakeIO(plaintext.decode("ascii"))
sys.stdout = FakeIO()
uu.encode("-", "-", "t1", 0o666)
self.assertEqual(sys.stdout.getvalue(),
encodedtextwrapped(0o666, "t1").decode("ascii"))
def test_decode(self):
sys.stdin = FakeIO(encodedtextwrapped(0o666, "t1").decode("ascii"))
sys.stdout = FakeIO()
uu.decode("-", "-")
stdout = sys.stdout
sys.stdout = self.stdout
sys.stdin = self.stdin
self.assertEqual(stdout.getvalue(), plaintext.decode("ascii"))
class UUFileTest(unittest.TestCase):
def setUp(self):
# uu.encode() supports only ASCII file names
self.tmpin = os_helper.TESTFN_ASCII + "i"
self.tmpout = os_helper.TESTFN_ASCII + "o"
self.addCleanup(os_helper.unlink, self.tmpin)
self.addCleanup(os_helper.unlink, self.tmpout)
def test_encode(self):
with open(self.tmpin, 'wb') as fin:
fin.write(plaintext)
with open(self.tmpin, 'rb') as fin:
with open(self.tmpout, 'wb') as fout:
uu.encode(fin, fout, self.tmpin, mode=0o644)
with open(self.tmpout, 'rb') as fout:
s = fout.read()
self.assertEqual(s, encodedtextwrapped(0o644, self.tmpin))
# in_file and out_file as filenames
uu.encode(self.tmpin, self.tmpout, self.tmpin, mode=0o644)
with open(self.tmpout, 'rb') as fout:
s = fout.read()
self.assertEqual(s, encodedtextwrapped(0o644, self.tmpin))
def test_decode(self):
with open(self.tmpin, 'wb') as f:
f.write(encodedtextwrapped(0o644, self.tmpout))
with open(self.tmpin, 'rb') as f:
uu.decode(f)
with open(self.tmpout, 'rb') as f:
s = f.read()
self.assertEqual(s, plaintext)
# XXX is there an xp way to verify the mode?
def test_decode_filename(self):
with open(self.tmpin, 'wb') as f:
f.write(encodedtextwrapped(0o644, self.tmpout))
uu.decode(self.tmpin)
with open(self.tmpout, 'rb') as f:
s = f.read()
self.assertEqual(s, plaintext)
def test_decodetwice(self):
# Verify that decode() will refuse to overwrite an existing file
with open(self.tmpin, 'wb') as f:
f.write(encodedtextwrapped(0o644, self.tmpout))
with open(self.tmpin, 'rb') as f:
uu.decode(f)
with open(self.tmpin, 'rb') as f:
self.assertRaises(uu.Error, uu.decode, f)
# TODO: RUSTPYTHON
@unittest.expectedFailure
def test_decode_mode(self):
# Verify that decode() will set the given mode for the out_file
expected_mode = 0o444
with open(self.tmpin, 'wb') as f:
f.write(encodedtextwrapped(expected_mode, self.tmpout))
# make file writable again, so it can be removed (Windows only)
self.addCleanup(os.chmod, self.tmpout, expected_mode | stat.S_IWRITE)
with open(self.tmpin, 'rb') as f:
uu.decode(f)
self.assertEqual(
stat.S_IMODE(os.stat(self.tmpout).st_mode),
expected_mode
)
if __name__=="__main__":
unittest.main()

199
Lib/uu.py vendored
View File

@@ -1,199 +0,0 @@
#! /usr/bin/env python3
# Copyright 1994 by Lance Ellinghouse
# Cathedral City, California Republic, United States of America.
# All Rights Reserved
# Permission to use, copy, modify, and distribute this software and its
# documentation for any purpose and without fee is hereby granted,
# provided that the above copyright notice appear in all copies and that
# both that copyright notice and this permission notice appear in
# supporting documentation, and that the name of Lance Ellinghouse
# not be used in advertising or publicity pertaining to distribution
# of the software without specific, written prior permission.
# LANCE ELLINGHOUSE DISCLAIMS ALL WARRANTIES WITH REGARD TO
# THIS SOFTWARE, INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND
# FITNESS, IN NO EVENT SHALL LANCE ELLINGHOUSE CENTRUM BE LIABLE
# FOR ANY SPECIAL, INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
#
# Modified by Jack Jansen, CWI, July 1995:
# - Use binascii module to do the actual line-by-line conversion
# between ascii and binary. This results in a 1000-fold speedup. The C
# version is still 5 times faster, though.
# - Arguments more compliant with python standard
"""Implementation of the UUencode and UUdecode functions.
encode(in_file, out_file [,name, mode])
decode(in_file [, out_file, mode])
"""
import binascii
import os
import sys
__all__ = ["Error", "encode", "decode"]
class Error(Exception):
pass
def encode(in_file, out_file, name=None, mode=None):
"""Uuencode file"""
#
# If in_file is a pathname open it and change defaults
#
opened_files = []
try:
if in_file == '-':
in_file = sys.stdin.buffer
elif isinstance(in_file, str):
if name is None:
name = os.path.basename(in_file)
if mode is None:
try:
mode = os.stat(in_file).st_mode
except AttributeError:
pass
in_file = open(in_file, 'rb')
opened_files.append(in_file)
#
# Open out_file if it is a pathname
#
if out_file == '-':
out_file = sys.stdout.buffer
elif isinstance(out_file, str):
out_file = open(out_file, 'wb')
opened_files.append(out_file)
#
# Set defaults for name and mode
#
if name is None:
name = '-'
if mode is None:
mode = 0o666
#
# Write the data
#
out_file.write(('begin %o %s\n' % ((mode & 0o777), name)).encode("ascii"))
data = in_file.read(45)
while len(data) > 0:
out_file.write(binascii.b2a_uu(data))
data = in_file.read(45)
out_file.write(b' \nend\n')
finally:
for f in opened_files:
f.close()
def decode(in_file, out_file=None, mode=None, quiet=False):
"""Decode uuencoded file"""
#
# Open the input file, if needed.
#
opened_files = []
if in_file == '-':
in_file = sys.stdin.buffer
elif isinstance(in_file, str):
in_file = open(in_file, 'rb')
opened_files.append(in_file)
try:
#
# Read until a begin is encountered or we've exhausted the file
#
while True:
hdr = in_file.readline()
if not hdr:
raise Error('No valid begin line found in input file')
if not hdr.startswith(b'begin'):
continue
hdrfields = hdr.split(b' ', 2)
if len(hdrfields) == 3 and hdrfields[0] == b'begin':
try:
int(hdrfields[1], 8)
break
except ValueError:
pass
if out_file is None:
# If the filename isn't ASCII, what's up with that?!?
out_file = hdrfields[2].rstrip(b' \t\r\n\f').decode("ascii")
if os.path.exists(out_file):
raise Error('Cannot overwrite existing file: %s' % out_file)
if mode is None:
mode = int(hdrfields[1], 8)
#
# Open the output file
#
if out_file == '-':
out_file = sys.stdout.buffer
elif isinstance(out_file, str):
fp = open(out_file, 'wb')
try:
os.path.chmod(out_file, mode)
except AttributeError:
pass
out_file = fp
opened_files.append(out_file)
#
# Main decoding loop
#
s = in_file.readline()
while s and s.strip(b' \t\r\n\f') != b'end':
try:
data = binascii.a2b_uu(s)
except binascii.Error as v:
# Workaround for broken uuencoders by /Fredrik Lundh
nbytes = (((s[0]-32) & 63) * 4 + 5) // 3
data = binascii.a2b_uu(s[:nbytes])
if not quiet:
sys.stderr.write("Warning: %s\n" % v)
out_file.write(data)
s = in_file.readline()
if not s:
raise Error('Truncated input file')
finally:
for f in opened_files:
f.close()
def test():
"""uuencode/uudecode main program"""
import optparse
parser = optparse.OptionParser(usage='usage: %prog [-d] [-t] [input [output]]')
parser.add_option('-d', '--decode', dest='decode', help='Decode (instead of encode)?', default=False, action='store_true')
parser.add_option('-t', '--text', dest='text', help='data is text, encoded format unix-compatible text?', default=False, action='store_true')
(options, args) = parser.parse_args()
if len(args) > 2:
parser.error('incorrect number of arguments')
sys.exit(1)
# Use the binary streams underlying stdin/stdout
input = sys.stdin.buffer
output = sys.stdout.buffer
if len(args) > 0:
input = args[0]
if len(args) > 1:
output = args[1]
if options.decode:
if options.text:
if isinstance(output, str):
output = open(output, 'wb')
else:
print(sys.argv[0], ': cannot do -t to stdout')
sys.exit(1)
decode(input, output)
else:
if options.text:
if isinstance(input, str):
input = open(input, 'rb')
else:
print(sys.argv[0], ': cannot do -t from stdin')
sys.exit(1)
encode(input, output)
if __name__ == '__main__':
test()

View File

@@ -116,7 +116,7 @@ pub mod windows {
let h = h?;
// reset stat?
let file_type = unsafe { GetFileType(h) };
let file_type = unsafe { GetFileType(h as _) };
if file_type == FILE_TYPE_UNKNOWN {
return Err(std::io::Error::last_os_error());
}
@@ -138,10 +138,10 @@ pub mod windows {
let mut basic_info: FILE_BASIC_INFO = unsafe { std::mem::zeroed() };
let mut id_info: FILE_ID_INFO = unsafe { std::mem::zeroed() };
if unsafe { GetFileInformationByHandle(h, &mut info) } == 0
if unsafe { GetFileInformationByHandle(h as _, &mut info) } == 0
|| unsafe {
GetFileInformationByHandleEx(
h,
h as _,
FileBasicInfo,
&mut basic_info as *mut _ as *mut _,
std::mem::size_of_val(&basic_info) as u32,
@@ -153,7 +153,7 @@ pub mod windows {
let p_id_info = if unsafe {
GetFileInformationByHandleEx(
h,
h as _,
FileIdInfo,
&mut id_info as *mut _ as *mut _,
std::mem::size_of_val(&id_info) as u32,
@@ -320,7 +320,7 @@ pub mod windows {
.get_or_init(|| {
let library_name = OsString::from("api-ms-win-core-file-l2-1-4").to_wide_with_nul();
let module = unsafe { LoadLibraryW(library_name.as_ptr()) };
if module == 0 {
if module.is_null() {
return None;
}
let name = CString::new("GetFileInformationByName").unwrap();

View File

@@ -81,7 +81,7 @@ if sys.platform.startswith("win"):
0x00000100 | 0x00000001 | 0x00000020 | 0x00002000 | 0x00000010 | 0x00008000 | 0x00020000
# We really can't test if the results are correct, so it just checks for meaningful value
assert winver.major > 0
assert winver.major > 6
assert winver.minor >= 0
assert winver.build > 0
assert winver.platform == 2
@@ -91,8 +91,8 @@ if sys.platform.startswith("win"):
# XXX if platform_version is implemented correctly, this'll break on compatiblity mode or a build without manifest
# these fields can mismatch in CPython
# assert winver.major == winver.platform_version[0]
# assert winver.minor == winver.platform_version[1]
assert winver.major == winver.platform_version[0]
assert winver.minor == winver.platform_version[1]
# assert winver.build == winver.platform_version[2]
# test int_max_str_digits getter and setter

View File

@@ -425,12 +425,25 @@ impl<'a, 'b> FunctionCompiler<'a, 'b> {
(BinaryOperator::Subtract, JitValue::Int(a), JitValue::Int(b)) => {
JitValue::Int(self.compile_sub(a, b))
}
(BinaryOperator::Multiply, JitValue::Int(a), JitValue::Int(b)) => {
JitValue::Int(self.builder.ins().imul(a, b))
}
(BinaryOperator::FloorDivide, JitValue::Int(a), JitValue::Int(b)) => {
JitValue::Int(self.builder.ins().sdiv(a, b))
}
(BinaryOperator::Divide, JitValue::Int(a), JitValue::Int(b)) => {
// Convert to float for regular division
let a_float = self.builder.ins().fcvt_from_sint(types::F64, a);
let b_float = self.builder.ins().fcvt_from_sint(types::F64, b);
JitValue::Float(self.builder.ins().fdiv(a_float, b_float))
}
(BinaryOperator::Modulo, JitValue::Int(a), JitValue::Int(b)) => {
JitValue::Int(self.builder.ins().srem(a, b))
}
// Todo: This should return int when possible
(BinaryOperator::Power, JitValue::Int(a), JitValue::Int(b)) => {
JitValue::Float(self.compile_ipow(a, b))
}
(
BinaryOperator::Lshift | BinaryOperator::Rshift,
JitValue::Int(a),
@@ -562,4 +575,154 @@ impl<'a, 'b> FunctionCompiler<'a, 'b> {
.trapif(IntCC::Overflow, carry, TrapCode::IntegerOverflow);
out
}
fn compile_ipow(&mut self, a: Value, b: Value) -> Value {
// Convert base to float since result might not always be a Int
let float_base = self.builder.ins().fcvt_from_sint(types::F64, a);
// Create code blocks
let check_block1 = self.builder.create_block();
let check_block2 = self.builder.create_block();
let check_block3 = self.builder.create_block();
let handle_neg_exp = self.builder.create_block();
let loop_block = self.builder.create_block();
let continue_block = self.builder.create_block();
let exit_block = self.builder.create_block();
// Set code block params
// Set code block params
self.builder.append_block_param(check_block1, types::F64);
self.builder.append_block_param(check_block1, types::I64);
self.builder.append_block_param(check_block2, types::F64);
self.builder.append_block_param(check_block2, types::I64);
self.builder.append_block_param(check_block3, types::F64);
self.builder.append_block_param(check_block3, types::I64);
self.builder.append_block_param(handle_neg_exp, types::F64);
self.builder.append_block_param(handle_neg_exp, types::I64);
self.builder.append_block_param(loop_block, types::F64); //base
self.builder.append_block_param(loop_block, types::F64); //result
self.builder.append_block_param(loop_block, types::I64); //exponent
self.builder.append_block_param(continue_block, types::F64); //base
self.builder.append_block_param(continue_block, types::F64); //result
self.builder.append_block_param(continue_block, types::I64); //exponent
self.builder.append_block_param(exit_block, types::F64);
// Begin evaluating by jumping to first check block
self.builder.ins().jump(check_block1, &[float_base, b]);
// Check block one:
// Checks if input is O ** n where n > 0
// Jumps to exit_block as 0 if true
self.builder.switch_to_block(check_block1);
let paramsc1 = self.builder.block_params(check_block1);
let basec1 = paramsc1[0];
let expc1 = paramsc1[1];
let zero_f64 = self.builder.ins().f64const(0.0);
let zero_i64 = self.builder.ins().iconst(types::I64, 0);
let is_base_zero = self.builder.ins().fcmp(FloatCC::Equal, zero_f64, basec1);
let is_exp_positive = self
.builder
.ins()
.icmp(IntCC::SignedGreaterThan, expc1, zero_i64);
let is_zero_to_positive = self.builder.ins().band(is_base_zero, is_exp_positive);
self.builder
.ins()
.brnz(is_zero_to_positive, exit_block, &[zero_f64]);
self.builder.ins().jump(check_block2, &[basec1, expc1]);
// Check block two:
// Checks if exponent is negative
// Jumps to a special handle_neg_exponent block if true
self.builder.switch_to_block(check_block2);
let paramsc2 = self.builder.block_params(check_block2);
let basec2 = paramsc2[0];
let expc2 = paramsc2[1];
let zero_i64 = self.builder.ins().iconst(types::I64, 0);
let is_neg = self
.builder
.ins()
.icmp(IntCC::SignedLessThan, expc2, zero_i64);
self.builder
.ins()
.brnz(is_neg, handle_neg_exp, &[basec2, expc2]);
self.builder.ins().jump(check_block3, &[basec2, expc2]);
// Check block three:
// Checks if exponent is one
// jumps to exit block with the base of the exponents value
self.builder.switch_to_block(check_block3);
let paramsc3 = self.builder.block_params(check_block3);
let basec3 = paramsc3[0];
let expc3 = paramsc3[1];
let resc3 = self.builder.ins().f64const(1.0);
let one_i64 = self.builder.ins().iconst(types::I64, 1);
let is_one = self.builder.ins().icmp(IntCC::Equal, expc3, one_i64);
self.builder.ins().brnz(is_one, exit_block, &[basec3]);
self.builder.ins().jump(loop_block, &[basec3, resc3, expc3]);
// Handles negative Exponents
// calculates x^(-n) = (1/x)^n
// then proceeds to the loop to evaluate
self.builder.switch_to_block(handle_neg_exp);
let paramshn = self.builder.block_params(handle_neg_exp);
let basehn = paramshn[0];
let exphn = paramshn[1];
let one_f64 = self.builder.ins().f64const(1.0);
let base_inverse = self.builder.ins().fdiv(one_f64, basehn);
let pos_exp = self.builder.ins().ineg(exphn);
self.builder
.ins()
.jump(loop_block, &[base_inverse, one_f64, pos_exp]);
// Main loop block
// checks loop condition (exp > 0)
// Jumps to continue block if true, exit block if false
self.builder.switch_to_block(loop_block);
let paramslb = self.builder.block_params(loop_block);
let baselb = paramslb[0];
let reslb = paramslb[1];
let explb = paramslb[2];
let zero = self.builder.ins().iconst(types::I64, 0);
let is_zero = self.builder.ins().icmp(IntCC::Equal, explb, zero);
self.builder.ins().brnz(is_zero, exit_block, &[reslb]);
self.builder
.ins()
.jump(continue_block, &[baselb, reslb, explb]);
// Continue block
// Main math logic
// Always jumps back to loob_block
self.builder.switch_to_block(continue_block);
let paramscb = self.builder.block_params(continue_block);
let basecb = paramscb[0];
let rescb = paramscb[1];
let expcb = paramscb[2];
let is_odd = self.builder.ins().band_imm(expcb, 1);
let is_odd = self.builder.ins().icmp_imm(IntCC::Equal, is_odd, 1);
let mul_result = self.builder.ins().fmul(rescb, basecb);
let new_result = self.builder.ins().select(is_odd, mul_result, rescb);
let squared_base = self.builder.ins().fmul(basecb, basecb);
let new_exp = self.builder.ins().sshr_imm(expcb, 1);
self.builder
.ins()
.jump(loop_block, &[squared_base, new_result, new_exp]);
self.builder.switch_to_block(exit_block);
let result = self.builder.block_params(exit_block)[0];
self.builder.seal_block(check_block1);
self.builder.seal_block(check_block2);
self.builder.seal_block(check_block3);
self.builder.seal_block(handle_neg_exp);
self.builder.seal_block(loop_block);
self.builder.seal_block(continue_block);
self.builder.seal_block(exit_block);
result
}
}

View File

@@ -1,3 +1,5 @@
use core::f64;
#[test]
fn test_add() {
let add = jit_function! { add(a:i64, b:i64) -> i64 => r##"
@@ -23,6 +25,51 @@ fn test_sub() {
assert_eq!(sub(-3, -10), Ok(7));
}
#[test]
fn test_mul() {
let mul = jit_function! { mul(a:i64, b:i64) -> i64 => r##"
def mul(a: int, b: int):
return a * b
"## };
assert_eq!(mul(5, 10), Ok(50));
assert_eq!(mul(0, 5), Ok(0));
assert_eq!(mul(5, 0), Ok(0));
assert_eq!(mul(0, 0), Ok(0));
assert_eq!(mul(-5, 10), Ok(-50));
assert_eq!(mul(5, -10), Ok(-50));
assert_eq!(mul(-5, -10), Ok(50));
assert_eq!(mul(999999, 999999), Ok(999998000001));
assert_eq!(mul(i64::MAX, 1), Ok(i64::MAX));
assert_eq!(mul(1, i64::MAX), Ok(i64::MAX));
}
#[test]
fn test_div() {
let div = jit_function! { div(a:i64, b:i64) -> f64 => r##"
def div(a: int, b: int):
return a / b
"## };
assert_eq!(div(0, 1), Ok(0.0));
assert_eq!(div(5, 1), Ok(5.0));
assert_eq!(div(5, 10), Ok(0.5));
assert_eq!(div(5, 2), Ok(2.5));
assert_eq!(div(12, 10), Ok(1.2));
assert_eq!(div(7, 10), Ok(0.7));
assert_eq!(div(-3, -1), Ok(3.0));
assert_eq!(div(-3, 1), Ok(-3.0));
assert_eq!(div(1, 1000), Ok(0.001));
assert_eq!(div(1, 100000), Ok(0.00001));
assert_eq!(div(2, 3), Ok(0.6666666666666666));
assert_eq!(div(1, 3), Ok(0.3333333333333333));
assert_eq!(div(i64::MAX, 2), Ok(4611686018427387904.0));
assert_eq!(div(i64::MIN, 2), Ok(-4611686018427387904.0));
assert_eq!(div(i64::MIN, -1), Ok(9223372036854775808.0)); // Overflow case
assert_eq!(div(i64::MIN, i64::MAX), Ok(-1.0));
}
#[test]
fn test_floor_div() {
let floor_div = jit_function! { floor_div(a:i64, b:i64) -> i64 => r##"
@@ -35,7 +82,28 @@ fn test_floor_div() {
assert_eq!(floor_div(12, 10), Ok(1));
assert_eq!(floor_div(7, 10), Ok(0));
assert_eq!(floor_div(-3, -1), Ok(3));
assert_eq!(floor_div(-3, 1), Ok(-3));
}
#[test]
fn test_exp() {
let exp = jit_function! { exp(a: i64, b: i64) -> f64 => r##"
def exp(a: int, b: int):
return a ** b
"## };
assert_eq!(exp(2, 3), Ok(8.0));
assert_eq!(exp(3, 2), Ok(9.0));
assert_eq!(exp(5, 0), Ok(1.0));
assert_eq!(exp(0, 0), Ok(1.0));
assert_eq!(exp(-5, 0), Ok(1.0));
assert_eq!(exp(0, 1), Ok(0.0));
assert_eq!(exp(0, 5), Ok(0.0));
assert_eq!(exp(-2, 2), Ok(4.0));
assert_eq!(exp(-3, 4), Ok(81.0));
assert_eq!(exp(-2, 3), Ok(-8.0));
assert_eq!(exp(-3, 3), Ok(-27.0));
assert_eq!(exp(1000, 2), Ok(1000000.0));
}
#[test]

View File

@@ -117,12 +117,6 @@ paste = { workspace = true }
schannel = { workspace = true }
widestring = { workspace = true }
[target.'cfg(windows)'.dependencies.winapi]
version = "0.3.9"
features = [
"winsock2", "ifdef", "netioapi", "ws2tcpip",
]
[target.'cfg(windows)'.dependencies.windows-sys]
workspace = true
features = [
@@ -131,6 +125,7 @@ features = [
"Win32_NetworkManagement_Ndis",
"Win32_Security_Cryptography",
"Win32_System_Environment",
"Win32_System_IO"
]
[target.'cfg(target_os = "macos")'.dependencies]

View File

@@ -24,13 +24,18 @@ mod _overlapped {
use windows_sys::Win32::{
Foundation::{
ERROR_IO_PENDING, ERROR_NETNAME_DELETED, ERROR_OPERATION_ABORTED, ERROR_PIPE_BUSY,
ERROR_PORT_UNREACHABLE, ERROR_SEM_TIMEOUT, INVALID_HANDLE_VALUE,
ERROR_PORT_UNREACHABLE, ERROR_SEM_TIMEOUT,
},
Networking::WinSock::{
SO_UPDATE_ACCEPT_CONTEXT, SO_UPDATE_CONNECT_CONTEXT, TF_REUSE_SOCKET,
},
System::Threading::INFINITE,
};
#[pyattr]
const INVALID_HANDLE_VALUE: isize =
unsafe { std::mem::transmute(windows_sys::Win32::Foundation::INVALID_HANDLE_VALUE) };
#[pyattr]
const NULL: isize = 0;
@@ -126,7 +131,7 @@ mod _overlapped {
fn mark_as_completed(ov: &mut OVERLAPPED) {
ov.Internal = 0;
if ov.hEvent != 0 {
if !ov.hEvent.is_null() {
unsafe { windows_sys::Win32::System::Threading::SetEvent(ov.hEvent) };
}
}
@@ -164,7 +169,7 @@ mod _overlapped {
fn WSARecv_inner(
inner: &mut OverlappedInner,
handle: HANDLE,
handle: isize,
buf: &[u8],
mut flags: u32,
vm: &VirtualMachine,
@@ -209,7 +214,7 @@ mod _overlapped {
#[pymethod]
fn WSARecv(
zelf: &Py<Self>,
handle: HANDLE,
handle: isize,
size: u32,
flags: u32,
vm: &VirtualMachine,
@@ -224,9 +229,9 @@ mod _overlapped {
let buf = vec![0u8; std::cmp::max(size, 1) as usize];
let buf = vm.ctx.new_bytes(buf);
inner.handle = handle;
inner.handle = handle as _;
let r = Self::WSARecv_inner(&mut inner, handle, buf.as_bytes(), flags, vm);
let r = Self::WSARecv_inner(&mut inner, handle as _, buf.as_bytes(), flags, vm);
inner.data = OverlappedData::Read(buf);
r
}
@@ -256,7 +261,7 @@ mod _overlapped {
}
impl Constructor for Overlapped {
type Args = (HANDLE,);
type Args = (isize,);
fn py_new(cls: PyTypeRef, (mut event,): Self::Args, vm: &VirtualMachine) -> PyResult {
if event == INVALID_HANDLE_VALUE {
@@ -266,7 +271,7 @@ mod _overlapped {
Foundation::TRUE,
Foundation::FALSE,
std::ptr::null(),
)
) as isize
};
if event == NULL {
return Err(errno_err(vm));
@@ -275,11 +280,11 @@ mod _overlapped {
let mut overlapped: OVERLAPPED = unsafe { std::mem::zeroed() };
if event != NULL {
overlapped.hEvent = event;
overlapped.hEvent = event as _;
}
let inner = OverlappedInner {
overlapped,
handle: NULL,
handle: NULL as _,
error: 0,
data: OverlappedData::None,
};
@@ -292,29 +297,34 @@ mod _overlapped {
#[pyfunction]
fn CreateIoCompletionPort(
handle: HANDLE,
port: HANDLE,
handle: isize,
port: isize,
key: usize,
concurrency: u32,
vm: &VirtualMachine,
) -> PyResult<HANDLE> {
) -> PyResult<isize> {
let r = unsafe {
windows_sys::Win32::System::IO::CreateIoCompletionPort(handle, port, key, concurrency)
windows_sys::Win32::System::IO::CreateIoCompletionPort(
handle as _,
port as _,
key,
concurrency,
) as isize
};
if r == 0 {
if r as usize == 0 {
return Err(errno_err(vm));
}
Ok(r)
}
#[pyfunction]
fn GetQueuedCompletionStatus(port: HANDLE, msecs: u32, vm: &VirtualMachine) -> PyResult {
fn GetQueuedCompletionStatus(port: isize, msecs: u32, vm: &VirtualMachine) -> PyResult {
let mut bytes_transferred = 0;
let mut completion_key = 0;
let mut overlapped: *mut OVERLAPPED = std::ptr::null_mut();
let ret = unsafe {
windows_sys::Win32::System::IO::GetQueuedCompletionStatus(
port,
port as _,
&mut bytes_transferred,
&mut completion_key,
&mut overlapped,

View File

@@ -35,18 +35,11 @@ mod _socket {
use libc as c;
#[cfg(windows)]
mod c {
pub use winapi::shared::netioapi::{if_indextoname, if_nametoindex};
pub use winapi::shared::ws2def::{
pub use windows_sys::Win32::NetworkManagement::IpHelper::{if_indextoname, if_nametoindex};
pub use windows_sys::Win32::Networking::WinSock::{
INADDR_ANY, INADDR_BROADCAST, INADDR_LOOPBACK, INADDR_NONE,
};
pub use winapi::um::winsock2::{
SO_EXCLUSIVEADDRUSE, getprotobyname, getservbyname, getservbyport, getsockopt,
setsockopt,
};
pub use winapi::um::ws2tcpip::{
EAI_AGAIN, EAI_BADFLAGS, EAI_FAIL, EAI_FAMILY, EAI_MEMORY, EAI_NODATA, EAI_NONAME,
EAI_SERVICE, EAI_SOCKTYPE,
};
pub use windows_sys::Win32::Networking::WinSock::{
AF_APPLETALK, AF_DECnet, AF_IPX, AF_LINK, AI_ADDRCONFIG, AI_ALL, AI_CANONNAME,
AI_NUMERICSERV, AI_V4MAPPED, IP_ADD_MEMBERSHIP, IP_DROP_MEMBERSHIP, IP_HDRINCL,
@@ -68,6 +61,17 @@ mod _socket {
SOL_SOCKET, SOMAXCONN, TCP_NODELAY, WSAEBADF, WSAECONNRESET, WSAENOTSOCK,
WSAEWOULDBLOCK,
};
pub use windows_sys::Win32::Networking::WinSock::{
SO_REUSEADDR as SO_EXCLUSIVEADDRUSE, getprotobyname, getservbyname, getservbyport,
getsockopt, setsockopt,
};
pub use windows_sys::Win32::Networking::WinSock::{
WSA_NOT_ENOUGH_MEMORY as EAI_MEMORY, WSAEAFNOSUPPORT as EAI_FAMILY,
WSAEINVAL as EAI_BADFLAGS, WSAESOCKTNOSUPPORT as EAI_SOCKTYPE,
WSAHOST_NOT_FOUND as EAI_NODATA, WSAHOST_NOT_FOUND as EAI_NONAME,
WSANO_RECOVERY as EAI_FAIL, WSATRY_AGAIN as EAI_AGAIN,
WSATYPE_NOT_FOUND as EAI_SERVICE,
};
pub const IF_NAMESIZE: usize =
windows_sys::Win32::NetworkManagement::Ndis::IF_MAX_STRING_SIZE as _;
pub const AF_UNSPEC: i32 = windows_sys::Win32::Networking::WinSock::AF_UNSPEC as _;
@@ -755,7 +759,7 @@ mod _socket {
}
#[cfg(windows)]
use winapi::shared::netioapi;
use windows_sys::Win32::NetworkManagement::IpHelper;
fn get_raw_sock(obj: PyObjectRef, vm: &VirtualMachine) -> PyResult<RawSocket> {
#[cfg(unix)]
@@ -1755,7 +1759,7 @@ mod _socket {
.map(|s| s.to_cstring(vm))
.transpose()?;
let cstr_proto = cstr_opt_as_ptr(&cstr_proto);
let serv = unsafe { c::getservbyname(cstr_name.as_ptr(), cstr_proto) };
let serv = unsafe { c::getservbyname(cstr_name.as_ptr() as _, cstr_proto as _) };
if serv.is_null() {
return Err(vm.new_os_error("service/proto not found".to_owned()));
}
@@ -1777,11 +1781,11 @@ mod _socket {
.map(|s| s.to_cstring(vm))
.transpose()?;
let cstr_proto = cstr_opt_as_ptr(&cstr_proto);
let serv = unsafe { c::getservbyport(port.to_be() as _, cstr_proto) };
let serv = unsafe { c::getservbyport(port.to_be() as _, cstr_proto as _) };
if serv.is_null() {
return Err(vm.new_os_error("port/proto not found".to_owned()));
}
let s = unsafe { ffi::CStr::from_ptr((*serv).s_name) };
let s = unsafe { ffi::CStr::from_ptr((*serv).s_name as _) };
Ok(s.to_string_lossy().into_owned())
}
@@ -2033,7 +2037,7 @@ mod _socket {
#[pyfunction]
fn getprotobyname(name: PyStrRef, vm: &VirtualMachine) -> PyResult {
let cstr = name.to_cstring(vm)?;
let proto = unsafe { c::getprotobyname(cstr.as_ptr()) };
let proto = unsafe { c::getprotobyname(cstr.as_ptr() as _) };
if proto.is_null() {
return Err(vm.new_os_error("protocol not found".to_owned()));
}
@@ -2111,14 +2115,14 @@ mod _socket {
#[cfg(all(unix, not(target_os = "redox")))]
type IfIndex = c::c_uint;
#[cfg(windows)]
type IfIndex = winapi::shared::ifdef::NET_IFINDEX;
type IfIndex = u32; // NET_IFINDEX but windows-sys 0.59 doesn't have it
#[cfg(not(target_os = "redox"))]
#[pyfunction]
fn if_nametoindex(name: FsPath, vm: &VirtualMachine) -> PyResult<IfIndex> {
let name = name.to_cstring(vm)?;
let ret = unsafe { c::if_nametoindex(name.as_ptr()) };
let ret = unsafe { c::if_nametoindex(name.as_ptr() as _) };
if ret == 0 {
Err(vm.new_os_error("no interface with this name".to_owned()))
} else {
@@ -2134,7 +2138,7 @@ mod _socket {
if ret.is_null() {
Err(crate::vm::stdlib::os::errno_err(vm))
} else {
let buf = unsafe { ffi::CStr::from_ptr(buf.as_ptr()) };
let buf = unsafe { ffi::CStr::from_ptr(buf.as_ptr() as _) };
Ok(buf.to_string_lossy().into_owned())
}
}
@@ -2170,6 +2174,7 @@ mod _socket {
#[cfg(windows)]
{
use std::ptr;
use windows_sys::Win32::NetworkManagement::Ndis::NET_LUID_LH;
let table = MibTable::get_raw().map_err(|err| err.into_pyexception(vm))?;
let list = table.as_slice().iter().map(|entry| {
@@ -2181,12 +2186,10 @@ mod _socket {
let list = list.collect::<PyResult<_>>()?;
return Ok(list);
fn get_name(
luid: &winapi::shared::ifdef::NET_LUID,
) -> io::Result<widestring::WideCString> {
fn get_name(luid: &NET_LUID_LH) -> io::Result<widestring::WideCString> {
let mut buf = [0; c::IF_NAMESIZE + 1];
let ret = unsafe {
netioapi::ConvertInterfaceLuidToNameW(luid, buf.as_mut_ptr(), buf.len())
IpHelper::ConvertInterfaceLuidToNameW(luid, buf.as_mut_ptr(), buf.len())
};
if ret == 0 {
Ok(widestring::WideCString::from_ustr_truncate(
@@ -2197,12 +2200,12 @@ mod _socket {
}
}
struct MibTable {
ptr: ptr::NonNull<netioapi::MIB_IF_TABLE2>,
ptr: ptr::NonNull<IpHelper::MIB_IF_TABLE2>,
}
impl MibTable {
fn get_raw() -> io::Result<Self> {
let mut ptr = ptr::null_mut();
let ret = unsafe { netioapi::GetIfTable2Ex(netioapi::MibIfTableRaw, &mut ptr) };
let ret = unsafe { IpHelper::GetIfTable2Ex(IpHelper::MibIfTableRaw, &mut ptr) };
if ret == 0 {
let ptr = unsafe { ptr::NonNull::new_unchecked(ptr) };
Ok(Self { ptr })
@@ -2212,17 +2215,17 @@ mod _socket {
}
}
impl MibTable {
fn as_slice(&self) -> &[netioapi::MIB_IF_ROW2] {
fn as_slice(&self) -> &[IpHelper::MIB_IF_ROW2] {
unsafe {
let p = self.ptr.as_ptr();
let ptr = &raw const (*p).Table as *const netioapi::MIB_IF_ROW2;
let ptr = &raw const (*p).Table as *const IpHelper::MIB_IF_ROW2;
std::slice::from_raw_parts(ptr, (*p).NumEntries as usize)
}
}
}
impl Drop for MibTable {
fn drop(&mut self) {
unsafe { netioapi::FreeMibTable(self.ptr.as_ptr() as *mut _) }
unsafe { IpHelper::FreeMibTable(self.ptr.as_ptr() as *mut _) };
}
}
}

View File

@@ -144,6 +144,7 @@ features = [
"Win32_System_SystemInformation",
"Win32_System_SystemServices",
"Win32_System_Threading",
"Win32_System_WindowsProgramming",
"Win32_UI_Shell",
"Win32_UI_WindowsAndMessaging",
]

View File

@@ -150,7 +150,7 @@ pub(crate) mod module {
}
let h = unsafe { Threading::OpenProcess(Threading::PROCESS_ALL_ACCESS, 0, pid) };
if h == 0 {
if h.is_null() {
return Err(errno_err(vm));
}
let ret = unsafe { Threading::TerminateProcess(h, sig) };
@@ -172,7 +172,7 @@ pub(crate) mod module {
_ => return Err(vm.new_value_error("bad file descriptor".to_owned())),
};
let h = unsafe { Console::GetStdHandle(stdhandle) };
if h == 0 {
if h.is_null() {
return Err(vm.new_os_error("handle cannot be retrieved".to_owned()));
}
if h == INVALID_HANDLE_VALUE {

View File

@@ -22,12 +22,23 @@ mod sys {
vm::{Settings, VirtualMachine},
};
use num_traits::ToPrimitive;
#[cfg(windows)]
use std::os::windows::ffi::OsStrExt;
use std::{
env::{self, VarError},
path,
sync::atomic::Ordering,
};
#[cfg(windows)]
use windows_sys::Win32::{
Foundation::MAX_PATH,
Storage::FileSystem::{
GetFileVersionInfoSizeW, GetFileVersionInfoW, VS_FIXEDFILEINFO, VerQueryValueW,
},
System::LibraryLoader::{GetModuleFileNameW, GetModuleHandleW},
};
// not the same as CPython (e.g. rust's x86_x64-unknown-linux-gnu is just x86_64-linux-gnu)
// but hopefully that's just an implementation detail? TODO: copy CPython's multiarch exactly,
// https://github.com/python/cpython/blob/3.8/configure.ac#L725
@@ -485,6 +496,78 @@ mod sys {
vm.trace_func.borrow().clone()
}
#[cfg(windows)]
fn get_kernel32_version() -> std::io::Result<(u32, u32, u32)> {
unsafe {
// Create a wide string for "kernel32.dll"
let module_name: Vec<u16> = std::ffi::OsStr::new("kernel32.dll")
.encode_wide()
.chain(Some(0))
.collect();
let h_kernel32 = GetModuleHandleW(module_name.as_ptr());
if h_kernel32.is_null() {
return Err(std::io::Error::last_os_error());
}
// Prepare a buffer for the module file path
let mut kernel32_path = [0u16; MAX_PATH as usize];
let len = GetModuleFileNameW(
h_kernel32,
kernel32_path.as_mut_ptr(),
kernel32_path.len() as u32,
);
if len == 0 {
return Err(std::io::Error::last_os_error());
}
// Get the size of the version information block
let verblock_size =
GetFileVersionInfoSizeW(kernel32_path.as_ptr(), std::ptr::null_mut());
if verblock_size == 0 {
return Err(std::io::Error::last_os_error());
}
// Allocate a buffer to hold the version information
let mut verblock = vec![0u8; verblock_size as usize];
if GetFileVersionInfoW(
kernel32_path.as_ptr(),
0,
verblock_size,
verblock.as_mut_ptr() as *mut _,
) == 0
{
return Err(std::io::Error::last_os_error());
}
// Prepare an empty sub-block string (L"") as required by VerQueryValueW
let sub_block: Vec<u16> = std::ffi::OsStr::new("")
.encode_wide()
.chain(Some(0))
.collect();
let mut ffi_ptr: *mut VS_FIXEDFILEINFO = std::ptr::null_mut();
let mut ffi_len: u32 = 0;
if VerQueryValueW(
verblock.as_ptr() as *const _,
sub_block.as_ptr(),
&mut ffi_ptr as *mut *mut VS_FIXEDFILEINFO as *mut *mut _,
&mut ffi_len as *mut u32,
) == 0
|| ffi_ptr.is_null()
{
return Err(std::io::Error::last_os_error());
}
// Extract the version numbers from the VS_FIXEDFILEINFO structure.
let ffi = *ffi_ptr;
let real_major = (ffi.dwProductVersionMS >> 16) & 0xFFFF;
let real_minor = ffi.dwProductVersionMS & 0xFFFF;
let real_build = (ffi.dwProductVersionLS >> 16) & 0xFFFF;
Ok((real_major, real_minor, real_build))
}
}
#[cfg(windows)]
#[pyfunction]
fn getwindowsversion(vm: &VirtualMachine) -> PyResult<crate::builtins::tuple::PyTupleRef> {
@@ -519,21 +602,18 @@ mod sys {
sp.into_string()
.map_err(|_| vm.new_os_error("service pack is not ASCII".to_owned()))?
};
let real_version = get_kernel32_version().map_err(|e| vm.new_os_error(e.to_string()))?;
Ok(WindowsVersion {
major: version.dwMajorVersion,
minor: version.dwMinorVersion,
build: version.dwBuildNumber,
major: real_version.0,
minor: real_version.1,
build: real_version.2,
platform: version.dwPlatformId,
service_pack,
service_pack_major: version.wServicePackMajor,
service_pack_minor: version.wServicePackMinor,
suite_mask: version.wSuiteMask,
product_type: version.wProductType,
platform_version: (
version.dwMajorVersion,
version.dwMinorVersion,
version.dwBuildNumber,
), // TODO Provide accurate version, like CPython impl
platform_version: (real_version.0, real_version.1, real_version.2), // TODO Provide accurate version, like CPython impl
}
.into_struct_sequence(vm))
}

View File

@@ -79,7 +79,7 @@ mod _winapi {
#[pyfunction]
fn CloseHandle(handle: HANDLE) -> WindowsSysResult<BOOL> {
WindowsSysResult(unsafe { windows_sys::Win32::Foundation::CloseHandle(handle.0) })
WindowsSysResult(unsafe { windows_sys::Win32::Foundation::CloseHandle(handle.0 as _) })
}
#[pyfunction]
@@ -99,8 +99,8 @@ mod _winapi {
let mut read = std::mem::MaybeUninit::<isize>::uninit();
let mut write = std::mem::MaybeUninit::<isize>::uninit();
WindowsSysResult(windows_sys::Win32::System::Pipes::CreatePipe(
read.as_mut_ptr(),
write.as_mut_ptr(),
read.as_mut_ptr() as _,
write.as_mut_ptr() as _,
std::ptr::null(),
size,
))
@@ -122,10 +122,10 @@ mod _winapi {
let target = unsafe {
let mut target = std::mem::MaybeUninit::<isize>::uninit();
WindowsSysResult(windows_sys::Win32::Foundation::DuplicateHandle(
src_process.0,
src.0,
target_process.0,
target.as_mut_ptr(),
src_process.0 as _,
src.0 as _,
target_process.0 as _,
target.as_mut_ptr() as _,
access,
inherit,
options.unwrap_or(0),
@@ -151,7 +151,7 @@ mod _winapi {
h: HANDLE,
vm: &VirtualMachine,
) -> PyResult<windows_sys::Win32::Storage::FileSystem::FILE_TYPE> {
let file_type = unsafe { windows_sys::Win32::Storage::FileSystem::GetFileType(h.0) };
let file_type = unsafe { windows_sys::Win32::Storage::FileSystem::GetFileType(h.0 as _) };
if file_type == 0 && unsafe { windows_sys::Win32::Foundation::GetLastError() } != 0 {
Err(errno_err(vm))
} else {
@@ -274,25 +274,21 @@ mod _winapi {
};
Ok((
HANDLE(procinfo.hProcess),
HANDLE(procinfo.hThread),
HANDLE(procinfo.hProcess as _),
HANDLE(procinfo.hThread as _),
procinfo.dwProcessId,
procinfo.dwThreadId,
))
}
#[pyfunction]
fn OpenProcess(
desired_access: u32,
inherit_handle: bool,
process_id: u32,
) -> windows_sys::Win32::Foundation::HANDLE {
fn OpenProcess(desired_access: u32, inherit_handle: bool, process_id: u32) -> isize {
unsafe {
windows_sys::Win32::System::Threading::OpenProcess(
desired_access,
BOOL::from(inherit_handle),
process_id,
)
) as _
}
}
@@ -438,7 +434,8 @@ mod _winapi {
#[pyfunction]
fn WaitForSingleObject(h: HANDLE, ms: u32, vm: &VirtualMachine) -> PyResult<u32> {
let ret = unsafe { windows_sys::Win32::System::Threading::WaitForSingleObject(h.0, ms) };
let ret =
unsafe { windows_sys::Win32::System::Threading::WaitForSingleObject(h.0 as _, ms) };
if ret == windows_sys::Win32::Foundation::WAIT_FAILED {
Err(errno_err(vm))
} else {
@@ -451,7 +448,7 @@ mod _winapi {
unsafe {
let mut ec = std::mem::MaybeUninit::uninit();
WindowsSysResult(windows_sys::Win32::System::Threading::GetExitCodeProcess(
h.0,
h.0 as _,
ec.as_mut_ptr(),
))
.to_pyresult(vm)?;
@@ -462,7 +459,7 @@ mod _winapi {
#[pyfunction]
fn TerminateProcess(h: HANDLE, exit_code: u32) -> WindowsSysResult<BOOL> {
WindowsSysResult(unsafe {
windows_sys::Win32::System::Threading::TerminateProcess(h.0, exit_code)
windows_sys::Win32::System::Threading::TerminateProcess(h.0 as _, exit_code)
})
}
@@ -507,11 +504,13 @@ mod _winapi {
// if handle.is_invalid() {
// return Err(errno_err(vm));
// }
Ok(handle)
Ok(handle as _)
}
#[pyfunction]
fn ReleaseMutex(handle: isize) -> WindowsSysResult<BOOL> {
WindowsSysResult(unsafe { windows_sys::Win32::System::Threading::ReleaseMutex(handle) })
WindowsSysResult(unsafe {
windows_sys::Win32::System::Threading::ReleaseMutex(handle as _)
})
}
}

View File

@@ -23,7 +23,7 @@ impl WindowsSysResultValue for RAW_HANDLE {
*self == INVALID_HANDLE_VALUE
}
fn into_ok(self) -> Self::Ok {
HANDLE(self)
HANDLE(self as _)
}
}

View File

@@ -12,19 +12,17 @@
"xterm": "^3.8.0"
},
"devDependencies": {
"@wasm-tool/wasm-pack-plugin": "^1.1.0",
"clean-webpack-plugin": "^3.0.0",
"css-loader": "^3.4.1",
"html-webpack-plugin": "^3.2.0",
"mini-css-extract-plugin": "^0.9.0",
"raw-loader": "^4.0.0",
"serve": "^11.0.2",
"webpack": "^4.16.3",
"webpack-cli": "^3.1.0",
"webpack-dev-server": "^3.1.5"
"@wasm-tool/wasm-pack-plugin": "^1.7.0",
"css-loader": "^7.1.2",
"html-webpack-plugin": "^5.6.3",
"lezer-loader": "^0.3.0",
"mini-css-extract-plugin": "^2.9.2",
"webpack": "^5.97.1",
"webpack-cli": "^6.0.1",
"webpack-dev-server": "^5.2.0"
},
"scripts": {
"dev": "webpack-dev-server -d",
"dev": "webpack serve",
"build": "webpack",
"dist": "webpack --mode production",
"test": "webpack --mode production && cd ../tests && pytest"

View File

@@ -1,7 +1,6 @@
const HtmlWebpackPlugin = require('html-webpack-plugin');
const MiniCssExtractPlugin = require('mini-css-extract-plugin');
const WasmPackPlugin = require('@wasm-tool/wasm-pack-plugin');
const { CleanWebpackPlugin } = require('clean-webpack-plugin');
const path = require('path');
const fs = require('fs');
@@ -12,6 +11,7 @@ module.exports = (env = {}) => {
output: {
path: path.join(__dirname, 'dist'),
filename: 'index.js',
clean: true,
},
mode: 'development',
resolve: {
@@ -30,15 +30,14 @@ module.exports = (env = {}) => {
},
{
test: /\.(woff(2)?|ttf)$/,
use: {
loader: 'file-loader',
options: { name: 'fonts/[name].[ext]' },
type: 'asset/resource',
generator: {
filename: 'fonts/[name].[ext]',
},
},
],
},
plugins: [
new CleanWebpackPlugin(),
new HtmlWebpackPlugin({
filename: 'index.html',
template: 'src/index.ejs',
@@ -58,6 +57,9 @@ module.exports = (env = {}) => {
filename: 'styles.css',
}),
],
experiments: {
asyncWebAssembly: true,
},
};
if (!env.noWasmPack) {
config.plugins.push(