Compare commits

..

1 Commits

Author SHA1 Message Date
dependabot[bot]
855c583bf7 Bump the random group across 1 directory with 3 updates
Bumps the random group with 3 updates in the / directory: [getrandom](https://github.com/rust-random/getrandom), [mt19937](https://github.com/RustPython/mt19937) and [rand](https://github.com/rust-random/rand).


Updates `getrandom` from 0.3.4 to 0.4.2
- [Changelog](https://github.com/rust-random/getrandom/blob/master/CHANGELOG.md)
- [Commits](https://github.com/rust-random/getrandom/compare/v0.3.4...v0.4.2)

Updates `mt19937` from 3.2.0 to 3.3.0
- [Commits](https://github.com/RustPython/mt19937/commits)

Updates `rand` from 0.9.4 to 0.10.1
- [Release notes](https://github.com/rust-random/rand/releases)
- [Changelog](https://github.com/rust-random/rand/blob/master/CHANGELOG.md)
- [Commits](https://github.com/rust-random/rand/compare/0.9.4...0.10.1)

---
updated-dependencies:
- dependency-name: getrandom
  dependency-version: 0.4.2
  dependency-type: direct:production
  update-type: version-update:semver-minor
  dependency-group: random
- dependency-name: mt19937
  dependency-version: 3.3.0
  dependency-type: direct:production
  update-type: version-update:semver-minor
  dependency-group: random
- dependency-name: rand
  dependency-version: 0.10.1
  dependency-type: direct:production
  update-type: version-update:semver-minor
  dependency-group: random
...

Signed-off-by: dependabot[bot] <support@github.com>
2026-05-28 03:25:46 +00:00
37 changed files with 290 additions and 6902 deletions

View File

@@ -1,10 +0,0 @@
<!--
Thanks for your contribution!
-->
- [ ] Closes #xxxx <!-- Replace xxxx with the GitHub issue number -->
- [ ] This PR follows our [AI policy](https://github.com/RustPython/.github/blob/main/AI_POLICY.md)
## Summary
<!-- What's the purpose of the change? What does it do, and why? -->

View File

@@ -141,7 +141,7 @@ jobs:
if: runner.os == 'Linux'
- name: Test vendored OpenSSL build
run: cargo build --no-default-features --features ssl-openssl-vendor
run: cargo build --no-default-features --features ssl-vendor
if: runner.os == 'Linux'
# - name: Install tk-dev for tkinter build

40
Cargo.lock generated
View File

@@ -481,6 +481,17 @@ version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724"
[[package]]
name = "chacha20"
version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6f8d983286843e49675a4b7a2d174efe136dc93a18d69130dd18198a6c167601"
dependencies = [
"cfg-if",
"cpufeatures 0.3.0",
"rand_core 0.10.1",
]
[[package]]
name = "chrono"
version = "0.4.44"
@@ -1438,11 +1449,9 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "899def5c37c4fd7b2664648c28120ecec138e4d395b459e5ca34f9cce2dd77fd"
dependencies = [
"cfg-if",
"js-sys",
"libc",
"r-efi 5.3.0",
"wasip2",
"wasm-bindgen",
]
[[package]]
@@ -1452,10 +1461,13 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0de51e6874e94e7bf76d726fc5d13ba782deca734ff60d5bb2fb2607c7406555"
dependencies = [
"cfg-if",
"js-sys",
"libc",
"r-efi 6.0.0",
"rand_core 0.10.1",
"wasip2",
"wasip3",
"wasm-bindgen",
]
[[package]]
@@ -2373,11 +2385,11 @@ dependencies = [
[[package]]
name = "mt19937"
version = "3.2.0"
version = "3.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "56bc7ea7924ea1a79a9e817d0483e39295424cf2b1276cf2b968f9a6c9b63b54"
checksum = "25b4ab1a6f4b7820876af86b84adf545d53a52f59c5374856225aad9562d903e"
dependencies = [
"rand_core 0.9.5",
"rand_core 0.10.1",
]
[[package]]
@@ -3109,6 +3121,17 @@ dependencies = [
"rand_core 0.9.5",
]
[[package]]
name = "rand"
version = "0.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d2e8e8bcc7961af1fdac401278c6a831614941f6164ee3bf4ce61b7edb162207"
dependencies = [
"chacha20",
"getrandom 0.4.2",
"rand_core 0.10.1",
]
[[package]]
name = "rand_chacha"
version = "0.3.1"
@@ -3514,7 +3537,7 @@ version = "0.5.0"
dependencies = [
"ascii",
"bitflags 2.11.1",
"getrandom 0.3.4",
"getrandom 0.4.2",
"itertools 0.14.0",
"libc",
"lock_api",
@@ -3601,7 +3624,7 @@ name = "rustpython-host_env"
version = "0.5.0"
dependencies = [
"bitflags 2.11.1",
"getrandom 0.3.4",
"getrandom 0.4.2",
"junction",
"libc",
"libffi",
@@ -3645,7 +3668,7 @@ dependencies = [
"is-macro",
"lexical-parse-float",
"num-traits",
"rand 0.9.4",
"rand 0.10.1",
"rustpython-wtf8",
]
@@ -3739,7 +3762,6 @@ dependencies = [
"num_enum",
"optional",
"rustpython-wtf8",
"unic-ucd-category",
]
[[package]]

View File

@@ -27,7 +27,7 @@ ssl-rustls = ["ssl", "rustpython-stdlib/ssl-rustls"]
ssl-rustls-aws-lc = ["ssl-rustls", "dep:rustls", "rustls/aws_lc_rs"]
ssl-rustls-aws-lc-fips = ["ssl-rustls-aws-lc", "rustls/fips"]
ssl-openssl = ["ssl", "rustpython-stdlib/ssl-openssl"]
ssl-openssl-vendor = ["ssl-openssl", "rustpython-stdlib/ssl-openssl-vendor"]
ssl-vendor = ["ssl-openssl", "rustpython-stdlib/ssl-vendor"]
tkinter = ["rustpython-stdlib/tkinter"]
[build-dependencies]
@@ -220,7 +220,7 @@ flate2 = { version = "1.1.9", default-features = false }
# Bump only when the openssl crate bumps it
foreign-types-shared = "0.1"
gethostname = "1.0.2"
getrandom = { version = "0.3", features = ["std"] }
getrandom = { version = "0.4", features = ["std"] }
glob = "0.3"
half = "2"
hex = "0.4.3"
@@ -251,7 +251,7 @@ malachite-base = "0.9.1"
md-5 = "0.10.1"
memchr = "2.8.0"
memmap2 = "0.9.10"
mt19937 = "<=3.2" # upgrade it once rand is upgraded
mt19937 = "<=3.3" # upgrade it once rand is upgraded
num-complex = "0.4.6"
num-integer = "0.1.46"
num-traits = "0.2"
@@ -273,7 +273,7 @@ pymath = { version = "0.2.0", features = ["mul_add", "malachite-bigint", "comple
pyo3 = "0.28"
quote = "1.0.45"
radium = "1.1.1"
rand = "0.9"
rand = "0.10"
rand_core = { version = "0.9", features = ["os_rng"] }
rapidhash = "4.4.1"
result-like = "0.5.0"

View File

@@ -1,28 +1,4 @@
# Contributing to RustPython
Contributions are more than welcome, and in many cases we are happy to guide
contributors through PRs or on [**Discord**](https://discord.gg/vru8NypEhv).
## Finding ways to help
We label issues that would be good for a first time contributor as [`good first issue`](https://github.com/RustPython/RustPython/issues?q=label%3A%22good+first+issue%22+is%3Aissue+is%3Aopen+).
Also checkout the [issue tracker](https://github.com/RustPython/RustPython/issues) for all open issues.
You can enhance CPython compatibility by increasing our unittest coverage, you can see [This pinned issue](https://github.com/RustPython/RustPython/issues/6839) to see which libs and tests need be updated to our current supported python version.
Another approach is to checkout the source code: builtin functions and object
methods are often the simplest and easiest way to contribute.
You can also simply run `python -I scripts/whats_left.py` to assist in finding any unimplemented method.
## Use of AI
We **require all use of AI in contributions to follow our
[AI Policy](https://github.com/RustPython/.github/blob/main/AI_POLICY.md)**.
If your contribution does not follow the policy, it will be closed.
## RustPython Development Guide and Tips
# RustPython Development Guide and Tips
RustPython attracts developers with interest and experience in Rust, Python,
or WebAssembly. Whether you are familiar with Rust, Python, or

615
Lib/profile.py vendored
View File

@@ -1,615 +0,0 @@
#
# Class for profiling python code. rev 1.0 6/2/94
#
# Written by James Roskind
# Based on prior profile module by Sjoerd Mullender...
# which was hacked somewhat by: Guido van Rossum
"""Class for profiling Python code."""
# Copyright Disney Enterprises, Inc. All Rights Reserved.
# Licensed to PSF under a Contributor Agreement
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
# either express or implied. See the License for the specific language
# governing permissions and limitations under the License.
import importlib.machinery
import io
import sys
import time
import marshal
__all__ = ["run", "runctx", "Profile"]
# Sample timer for use with
#i_count = 0
#def integer_timer():
# global i_count
# i_count = i_count + 1
# return i_count
#itimes = integer_timer # replace with C coded timer returning integers
class _Utils:
"""Support class for utility functions which are shared by
profile.py and cProfile.py modules.
Not supposed to be used directly.
"""
def __init__(self, profiler):
self.profiler = profiler
def run(self, statement, filename, sort):
prof = self.profiler()
try:
prof.run(statement)
except SystemExit:
pass
finally:
self._show(prof, filename, sort)
def runctx(self, statement, globals, locals, filename, sort):
prof = self.profiler()
try:
prof.runctx(statement, globals, locals)
except SystemExit:
pass
finally:
self._show(prof, filename, sort)
def _show(self, prof, filename, sort):
if filename is not None:
prof.dump_stats(filename)
else:
prof.print_stats(sort)
#**************************************************************************
# The following are the static member functions for the profiler class
# Note that an instance of Profile() is *not* needed to call them.
#**************************************************************************
def run(statement, filename=None, sort=-1):
"""Run statement under profiler optionally saving results in filename
This function takes a single argument that can be passed to the
"exec" statement, and an optional file name. In all cases this
routine attempts to "exec" its first argument and gather profiling
statistics from the execution. If no file name is present, then this
function automatically prints a simple profiling report, sorted by the
standard name string (file/line/function-name) that is presented in
each line.
"""
return _Utils(Profile).run(statement, filename, sort)
def runctx(statement, globals, locals, filename=None, sort=-1):
"""Run statement under profiler, supplying your own globals and locals,
optionally saving results in filename.
statement and filename have the same semantics as profile.run
"""
return _Utils(Profile).runctx(statement, globals, locals, filename, sort)
class Profile:
"""Profiler class.
self.cur is always a tuple. Each such tuple corresponds to a stack
frame that is currently active (self.cur[-2]). The following are the
definitions of its members. We use this external "parallel stack" to
avoid contaminating the program that we are profiling. (old profiler
used to write into the frames local dictionary!!) Derived classes
can change the definition of some entries, as long as they leave
[-2:] intact (frame and previous tuple). In case an internal error is
detected, the -3 element is used as the function name.
[ 0] = Time that needs to be charged to the parent frame's function.
It is used so that a function call will not have to access the
timing data for the parent frame.
[ 1] = Total time spent in this frame's function, excluding time in
subfunctions (this latter is tallied in cur[2]).
[ 2] = Total time spent in subfunctions, excluding time executing the
frame's function (this latter is tallied in cur[1]).
[-3] = Name of the function that corresponds to this frame.
[-2] = Actual frame that we correspond to (used to sync exception handling).
[-1] = Our parent 6-tuple (corresponds to frame.f_back).
Timing data for each function is stored as a 5-tuple in the dictionary
self.timings[]. The index is always the name stored in self.cur[-3].
The following are the definitions of the members:
[0] = The number of times this function was called, not counting direct
or indirect recursion,
[1] = Number of times this function appears on the stack, minus one
[2] = Total time spent internal to this function
[3] = Cumulative time that this function was present on the stack. In
non-recursive functions, this is the total execution time from start
to finish of each invocation of a function, including time spent in
all subfunctions.
[4] = A dictionary indicating for each function name, the number of times
it was called by us.
"""
bias = 0 # calibration constant
def __init__(self, timer=None, bias=None):
self.timings = {}
self.cur = None
self.cmd = ""
self.c_func_name = ""
if bias is None:
bias = self.bias
self.bias = bias # Materialize in local dict for lookup speed.
if not timer:
self.timer = self.get_time = time.process_time
self.dispatcher = self.trace_dispatch_i
else:
self.timer = timer
t = self.timer() # test out timer function
try:
length = len(t)
except TypeError:
self.get_time = timer
self.dispatcher = self.trace_dispatch_i
else:
if length == 2:
self.dispatcher = self.trace_dispatch
else:
self.dispatcher = self.trace_dispatch_l
# This get_time() implementation needs to be defined
# here to capture the passed-in timer in the parameter
# list (for performance). Note that we can't assume
# the timer() result contains two values in all
# cases.
def get_time_timer(timer=timer, sum=sum):
return sum(timer())
self.get_time = get_time_timer
self.t = self.get_time()
self.simulate_call('profiler')
# Heavily optimized dispatch routine for time.process_time() timer
def trace_dispatch(self, frame, event, arg):
timer = self.timer
t = timer()
t = t[0] + t[1] - self.t - self.bias
if event == "c_call":
self.c_func_name = arg.__name__
if self.dispatch[event](self, frame,t):
t = timer()
self.t = t[0] + t[1]
else:
r = timer()
self.t = r[0] + r[1] - t # put back unrecorded delta
# Dispatch routine for best timer program (return = scalar, fastest if
# an integer but float works too -- and time.process_time() relies on that).
def trace_dispatch_i(self, frame, event, arg):
timer = self.timer
t = timer() - self.t - self.bias
if event == "c_call":
self.c_func_name = arg.__name__
if self.dispatch[event](self, frame, t):
self.t = timer()
else:
self.t = timer() - t # put back unrecorded delta
# Dispatch routine for macintosh (timer returns time in ticks of
# 1/60th second)
def trace_dispatch_mac(self, frame, event, arg):
timer = self.timer
t = timer()/60.0 - self.t - self.bias
if event == "c_call":
self.c_func_name = arg.__name__
if self.dispatch[event](self, frame, t):
self.t = timer()/60.0
else:
self.t = timer()/60.0 - t # put back unrecorded delta
# SLOW generic dispatch routine for timer returning lists of numbers
def trace_dispatch_l(self, frame, event, arg):
get_time = self.get_time
t = get_time() - self.t - self.bias
if event == "c_call":
self.c_func_name = arg.__name__
if self.dispatch[event](self, frame, t):
self.t = get_time()
else:
self.t = get_time() - t # put back unrecorded delta
# In the event handlers, the first 3 elements of self.cur are unpacked
# into vrbls w/ 3-letter names. The last two characters are meant to be
# mnemonic:
# _pt self.cur[0] "parent time" time to be charged to parent frame
# _it self.cur[1] "internal time" time spent directly in the function
# _et self.cur[2] "external time" time spent in subfunctions
def trace_dispatch_exception(self, frame, t):
rpt, rit, ret, rfn, rframe, rcur = self.cur
if (rframe is not frame) and rcur:
return self.trace_dispatch_return(rframe, t)
self.cur = rpt, rit+t, ret, rfn, rframe, rcur
return 1
def trace_dispatch_call(self, frame, t):
if self.cur and frame.f_back is not self.cur[-2]:
rpt, rit, ret, rfn, rframe, rcur = self.cur
if not isinstance(rframe, Profile.fake_frame):
assert rframe.f_back is frame.f_back, ("Bad call", rfn,
rframe, rframe.f_back,
frame, frame.f_back)
self.trace_dispatch_return(rframe, 0)
assert (self.cur is None or \
frame.f_back is self.cur[-2]), ("Bad call",
self.cur[-3])
fcode = frame.f_code
fn = (fcode.co_filename, fcode.co_firstlineno, fcode.co_name)
self.cur = (t, 0, 0, fn, frame, self.cur)
timings = self.timings
if fn in timings:
cc, ns, tt, ct, callers = timings[fn]
timings[fn] = cc, ns + 1, tt, ct, callers
else:
timings[fn] = 0, 0, 0, 0, {}
return 1
def trace_dispatch_c_call (self, frame, t):
fn = ("", 0, self.c_func_name)
self.cur = (t, 0, 0, fn, frame, self.cur)
timings = self.timings
if fn in timings:
cc, ns, tt, ct, callers = timings[fn]
timings[fn] = cc, ns+1, tt, ct, callers
else:
timings[fn] = 0, 0, 0, 0, {}
return 1
def trace_dispatch_return(self, frame, t):
if frame is not self.cur[-2]:
assert frame is self.cur[-2].f_back, ("Bad return", self.cur[-3])
self.trace_dispatch_return(self.cur[-2], 0)
# Prefix "r" means part of the Returning or exiting frame.
# Prefix "p" means part of the Previous or Parent or older frame.
rpt, rit, ret, rfn, frame, rcur = self.cur
rit = rit + t
frame_total = rit + ret
ppt, pit, pet, pfn, pframe, pcur = rcur
self.cur = ppt, pit + rpt, pet + frame_total, pfn, pframe, pcur
timings = self.timings
cc, ns, tt, ct, callers = timings[rfn]
if not ns:
# This is the only occurrence of the function on the stack.
# Else this is a (directly or indirectly) recursive call, and
# its cumulative time will get updated when the topmost call to
# it returns.
ct = ct + frame_total
cc = cc + 1
if pfn in callers:
callers[pfn] = callers[pfn] + 1 # hack: gather more
# stats such as the amount of time added to ct courtesy
# of this specific call, and the contribution to cc
# courtesy of this call.
else:
callers[pfn] = 1
timings[rfn] = cc, ns - 1, tt + rit, ct, callers
return 1
dispatch = {
"call": trace_dispatch_call,
"exception": trace_dispatch_exception,
"return": trace_dispatch_return,
"c_call": trace_dispatch_c_call,
"c_exception": trace_dispatch_return, # the C function returned
"c_return": trace_dispatch_return,
}
# The next few functions play with self.cmd. By carefully preloading
# our parallel stack, we can force the profiled result to include
# an arbitrary string as the name of the calling function.
# We use self.cmd as that string, and the resulting stats look
# very nice :-).
def set_cmd(self, cmd):
if self.cur[-1]: return # already set
self.cmd = cmd
self.simulate_call(cmd)
class fake_code:
def __init__(self, filename, line, name):
self.co_filename = filename
self.co_line = line
self.co_name = name
self.co_firstlineno = 0
def __repr__(self):
return repr((self.co_filename, self.co_line, self.co_name))
class fake_frame:
def __init__(self, code, prior):
self.f_code = code
self.f_back = prior
def simulate_call(self, name):
code = self.fake_code('profile', 0, name)
if self.cur:
pframe = self.cur[-2]
else:
pframe = None
frame = self.fake_frame(code, pframe)
self.dispatch['call'](self, frame, 0)
# collect stats from pending stack, including getting final
# timings for self.cmd frame.
def simulate_cmd_complete(self):
get_time = self.get_time
t = get_time() - self.t
while self.cur[-1]:
# We *can* cause assertion errors here if
# dispatch_trace_return checks for a frame match!
self.dispatch['return'](self, self.cur[-2], t)
t = 0
self.t = get_time() - t
def print_stats(self, sort=-1):
import pstats
if not isinstance(sort, tuple):
sort = (sort,)
pstats.Stats(self).strip_dirs().sort_stats(*sort).print_stats()
def dump_stats(self, file):
with open(file, 'wb') as f:
self.create_stats()
marshal.dump(self.stats, f)
def create_stats(self):
self.simulate_cmd_complete()
self.snapshot_stats()
def snapshot_stats(self):
self.stats = {}
for func, (cc, ns, tt, ct, callers) in self.timings.items():
callers = callers.copy()
nc = 0
for callcnt in callers.values():
nc += callcnt
self.stats[func] = cc, nc, tt, ct, callers
# The following two methods can be called by clients to use
# a profiler to profile a statement, given as a string.
def run(self, cmd):
import __main__
dict = __main__.__dict__
return self.runctx(cmd, dict, dict)
def runctx(self, cmd, globals, locals):
self.set_cmd(cmd)
sys.setprofile(self.dispatcher)
try:
exec(cmd, globals, locals)
finally:
sys.setprofile(None)
return self
# This method is more useful to profile a single function call.
def runcall(self, func, /, *args, **kw):
self.set_cmd(repr(func))
sys.setprofile(self.dispatcher)
try:
return func(*args, **kw)
finally:
sys.setprofile(None)
#******************************************************************
# The following calculates the overhead for using a profiler. The
# problem is that it takes a fair amount of time for the profiler
# to stop the stopwatch (from the time it receives an event).
# Similarly, there is a delay from the time that the profiler
# re-starts the stopwatch before the user's code really gets to
# continue. The following code tries to measure the difference on
# a per-event basis.
#
# Note that this difference is only significant if there are a lot of
# events, and relatively little user code per event. For example,
# code with small functions will typically benefit from having the
# profiler calibrated for the current platform. This *could* be
# done on the fly during init() time, but it is not worth the
# effort. Also note that if too large a value specified, then
# execution time on some functions will actually appear as a
# negative number. It is *normal* for some functions (with very
# low call counts) to have such negative stats, even if the
# calibration figure is "correct."
#
# One alternative to profile-time calibration adjustments (i.e.,
# adding in the magic little delta during each event) is to track
# more carefully the number of events (and cumulatively, the number
# of events during sub functions) that are seen. If this were
# done, then the arithmetic could be done after the fact (i.e., at
# display time). Currently, we track only call/return events.
# These values can be deduced by examining the callees and callers
# vectors for each functions. Hence we *can* almost correct the
# internal time figure at print time (note that we currently don't
# track exception event processing counts). Unfortunately, there
# is currently no similar information for cumulative sub-function
# time. It would not be hard to "get all this info" at profiler
# time. Specifically, we would have to extend the tuples to keep
# counts of this in each frame, and then extend the defs of timing
# tuples to include the significant two figures. I'm a bit fearful
# that this additional feature will slow the heavily optimized
# event/time ratio (i.e., the profiler would run slower, fur a very
# low "value added" feature.)
#**************************************************************
def calibrate(self, m, verbose=0):
if self.__class__ is not Profile:
raise TypeError("Subclasses must override .calibrate().")
saved_bias = self.bias
self.bias = 0
try:
return self._calibrate_inner(m, verbose)
finally:
self.bias = saved_bias
def _calibrate_inner(self, m, verbose):
get_time = self.get_time
# Set up a test case to be run with and without profiling. Include
# lots of calls, because we're trying to quantify stopwatch overhead.
# Do not raise any exceptions, though, because we want to know
# exactly how many profile events are generated (one call event, +
# one return event, per Python-level call).
def f1(n):
for i in range(n):
x = 1
def f(m, f1=f1):
for i in range(m):
f1(100)
f(m) # warm up the cache
# elapsed_noprofile <- time f(m) takes without profiling.
t0 = get_time()
f(m)
t1 = get_time()
elapsed_noprofile = t1 - t0
if verbose:
print("elapsed time without profiling =", elapsed_noprofile)
# elapsed_profile <- time f(m) takes with profiling. The difference
# is profiling overhead, only some of which the profiler subtracts
# out on its own.
p = Profile()
t0 = get_time()
p.runctx('f(m)', globals(), locals())
t1 = get_time()
elapsed_profile = t1 - t0
if verbose:
print("elapsed time with profiling =", elapsed_profile)
# reported_time <- "CPU seconds" the profiler charged to f and f1.
total_calls = 0.0
reported_time = 0.0
for (filename, line, funcname), (cc, ns, tt, ct, callers) in \
p.timings.items():
if funcname in ("f", "f1"):
total_calls += cc
reported_time += tt
if verbose:
print("'CPU seconds' profiler reported =", reported_time)
print("total # calls =", total_calls)
if total_calls != m + 1:
raise ValueError("internal error: total calls = %d" % total_calls)
# reported_time - elapsed_noprofile = overhead the profiler wasn't
# able to measure. Divide by twice the number of calls (since there
# are two profiler events per call in this test) to get the hidden
# overhead per event.
mean = (reported_time - elapsed_noprofile) / 2.0 / total_calls
if verbose:
print("mean stopwatch overhead per profile event =", mean)
return mean
#****************************************************************************
def main():
import os
from optparse import OptionParser
usage = "profile.py [-o output_file_path] [-s sort] [-m module | scriptfile] [arg] ..."
parser = OptionParser(usage=usage)
parser.allow_interspersed_args = False
parser.add_option('-o', '--outfile', dest="outfile",
help="Save stats to <outfile>", default=None)
parser.add_option('-m', dest="module", action="store_true",
help="Profile a library module.", default=False)
parser.add_option('-s', '--sort', dest="sort",
help="Sort order when printing to stdout, based on pstats.Stats class",
default=-1)
if not sys.argv[1:]:
parser.print_usage()
sys.exit(2)
(options, args) = parser.parse_args()
sys.argv[:] = args
# The script that we're profiling may chdir, so capture the absolute path
# to the output file at startup.
if options.outfile is not None:
options.outfile = os.path.abspath(options.outfile)
if len(args) > 0:
if options.module:
import runpy
code = "run_module(modname, run_name='__main__')"
globs = {
'run_module': runpy.run_module,
'modname': args[0]
}
else:
progname = args[0]
sys.path.insert(0, os.path.dirname(progname))
with io.open_code(progname) as fp:
code = compile(fp.read(), progname, 'exec')
spec = importlib.machinery.ModuleSpec(name='__main__', loader=None,
origin=progname)
globs = {
'__spec__': spec,
'__file__': spec.origin,
'__name__': spec.name,
'__package__': None,
'__cached__': None,
}
try:
runctx(code, globs, None, options.outfile, options.sort)
except BrokenPipeError as exc:
# Prevent "Exception ignored" during interpreter shutdown.
sys.stdout = None
sys.exit(exc.errno)
else:
parser.print_usage()
return parser
# When invoked as main program, invoke the profiler on a script
if __name__ == '__main__':
main()

777
Lib/pstats.py vendored
View File

@@ -1,777 +0,0 @@
"""Class for printing reports on profiled python code."""
# Written by James Roskind
# Based on prior profile module by Sjoerd Mullender...
# which was hacked somewhat by: Guido van Rossum
# Copyright Disney Enterprises, Inc. All Rights Reserved.
# Licensed to PSF under a Contributor Agreement
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
# either express or implied. See the License for the specific language
# governing permissions and limitations under the License.
import sys
import os
import time
import marshal
import re
from enum import StrEnum, _simple_enum
from functools import cmp_to_key
from dataclasses import dataclass
__all__ = ["Stats", "SortKey", "FunctionProfile", "StatsProfile"]
@_simple_enum(StrEnum)
class SortKey:
CALLS = 'calls', 'ncalls'
CUMULATIVE = 'cumulative', 'cumtime'
FILENAME = 'filename', 'module'
LINE = 'line'
NAME = 'name'
NFL = 'nfl'
PCALLS = 'pcalls'
STDNAME = 'stdname'
TIME = 'time', 'tottime'
def __new__(cls, *values):
value = values[0]
obj = str.__new__(cls, value)
obj._value_ = value
for other_value in values[1:]:
cls._value2member_map_[other_value] = obj
obj._all_values = values
return obj
@dataclass(unsafe_hash=True)
class FunctionProfile:
ncalls: str
tottime: float
percall_tottime: float
cumtime: float
percall_cumtime: float
file_name: str
line_number: int
@dataclass(unsafe_hash=True)
class StatsProfile:
'''Class for keeping track of an item in inventory.'''
total_tt: float
func_profiles: dict[str, FunctionProfile]
class Stats:
"""This class is used for creating reports from data generated by the
Profile class. It is a "friend" of that class, and imports data either
by direct access to members of Profile class, or by reading in a dictionary
that was emitted (via marshal) from the Profile class.
The big change from the previous Profiler (in terms of raw functionality)
is that an "add()" method has been provided to combine Stats from
several distinct profile runs. Both the constructor and the add()
method now take arbitrarily many file names as arguments.
All the print methods now take an argument that indicates how many lines
to print. If the arg is a floating-point number between 0 and 1.0, then
it is taken as a decimal percentage of the available lines to be printed
(e.g., .1 means print 10% of all available lines). If it is an integer,
it is taken to mean the number of lines of data that you wish to have
printed.
The sort_stats() method now processes some additional options (i.e., in
addition to the old -1, 0, 1, or 2 that are respectively interpreted as
'stdname', 'calls', 'time', and 'cumulative'). It takes either an
arbitrary number of quoted strings or SortKey enum to select the sort
order.
For example sort_stats('time', 'name') or sort_stats(SortKey.TIME,
SortKey.NAME) sorts on the major key of 'internal function time', and on
the minor key of 'the name of the function'. Look at the two tables in
sort_stats() and get_sort_arg_defs(self) for more examples.
All methods return self, so you can string together commands like:
Stats('foo', 'goo').strip_dirs().sort_stats('calls').\
print_stats(5).print_callers(5)
"""
def __init__(self, *args, stream=None):
self.stream = stream or sys.stdout
if not len(args):
arg = None
else:
arg = args[0]
args = args[1:]
self.init(arg)
self.add(*args)
def init(self, arg):
self.all_callees = None # calc only if needed
self.files = []
self.fcn_list = None
self.total_tt = 0
self.total_calls = 0
self.prim_calls = 0
self.max_name_len = 0
self.top_level = set()
self.stats = {}
self.sort_arg_dict = {}
self.load_stats(arg)
try:
self.get_top_level_stats()
except Exception:
print("Invalid timing data %s" %
(self.files[-1] if self.files else ''), file=self.stream)
raise
def load_stats(self, arg):
if arg is None:
self.stats = {}
return
elif isinstance(arg, str):
with open(arg, 'rb') as f:
self.stats = marshal.load(f)
try:
file_stats = os.stat(arg)
arg = time.ctime(file_stats.st_mtime) + " " + arg
except: # in case this is not unix
pass
self.files = [arg]
elif hasattr(arg, 'create_stats'):
arg.create_stats()
self.stats = arg.stats
arg.stats = {}
if not self.stats:
raise TypeError("Cannot create or construct a %r object from %r"
% (self.__class__, arg))
return
def get_top_level_stats(self):
for func, (cc, nc, tt, ct, callers) in self.stats.items():
self.total_calls += nc
self.prim_calls += cc
self.total_tt += tt
if ("jprofile", 0, "profiler") in callers:
self.top_level.add(func)
if len(func_std_string(func)) > self.max_name_len:
self.max_name_len = len(func_std_string(func))
def add(self, *arg_list):
if not arg_list:
return self
for item in reversed(arg_list):
if type(self) != type(item):
item = Stats(item)
self.files += item.files
self.total_calls += item.total_calls
self.prim_calls += item.prim_calls
self.total_tt += item.total_tt
for func in item.top_level:
self.top_level.add(func)
if self.max_name_len < item.max_name_len:
self.max_name_len = item.max_name_len
self.fcn_list = None
for func, stat in item.stats.items():
if func in self.stats:
old_func_stat = self.stats[func]
else:
old_func_stat = (0, 0, 0, 0, {},)
self.stats[func] = add_func_stats(old_func_stat, stat)
return self
def dump_stats(self, filename):
"""Write the profile data to a file we know how to load back."""
with open(filename, 'wb') as f:
marshal.dump(self.stats, f)
# list the tuple indices and directions for sorting,
# along with some printable description
sort_arg_dict_default = {
"calls" : (((1,-1), ), "call count"),
"ncalls" : (((1,-1), ), "call count"),
"cumtime" : (((3,-1), ), "cumulative time"),
"cumulative": (((3,-1), ), "cumulative time"),
"filename" : (((4, 1), ), "file name"),
"line" : (((5, 1), ), "line number"),
"module" : (((4, 1), ), "file name"),
"name" : (((6, 1), ), "function name"),
"nfl" : (((6, 1),(4, 1),(5, 1),), "name/file/line"),
"pcalls" : (((0,-1), ), "primitive call count"),
"stdname" : (((7, 1), ), "standard name"),
"time" : (((2,-1), ), "internal time"),
"tottime" : (((2,-1), ), "internal time"),
}
def get_sort_arg_defs(self):
"""Expand all abbreviations that are unique."""
if not self.sort_arg_dict:
self.sort_arg_dict = dict = {}
bad_list = {}
for word, tup in self.sort_arg_dict_default.items():
fragment = word
while fragment:
if fragment in dict:
bad_list[fragment] = 0
break
dict[fragment] = tup
fragment = fragment[:-1]
for word in bad_list:
del dict[word]
return self.sort_arg_dict
def sort_stats(self, *field):
if not field:
self.fcn_list = 0
return self
if len(field) == 1 and isinstance(field[0], int):
# Be compatible with old profiler
field = [ {-1: "stdname",
0: "calls",
1: "time",
2: "cumulative"}[field[0]] ]
elif len(field) >= 2:
for arg in field[1:]:
if type(arg) != type(field[0]):
raise TypeError("Can't have mixed argument type")
sort_arg_defs = self.get_sort_arg_defs()
sort_tuple = ()
self.sort_type = ""
connector = ""
for word in field:
if isinstance(word, SortKey):
word = word.value
sort_tuple = sort_tuple + sort_arg_defs[word][0]
self.sort_type += connector + sort_arg_defs[word][1]
connector = ", "
stats_list = []
for func, (cc, nc, tt, ct, callers) in self.stats.items():
stats_list.append((cc, nc, tt, ct) + func +
(func_std_string(func), func))
stats_list.sort(key=cmp_to_key(TupleComp(sort_tuple).compare))
self.fcn_list = fcn_list = []
for tuple in stats_list:
fcn_list.append(tuple[-1])
return self
def reverse_order(self):
if self.fcn_list:
self.fcn_list.reverse()
return self
def strip_dirs(self):
oldstats = self.stats
self.stats = newstats = {}
max_name_len = 0
for func, (cc, nc, tt, ct, callers) in oldstats.items():
newfunc = func_strip_path(func)
if len(func_std_string(newfunc)) > max_name_len:
max_name_len = len(func_std_string(newfunc))
newcallers = {}
for func2, caller in callers.items():
newcallers[func_strip_path(func2)] = caller
if newfunc in newstats:
newstats[newfunc] = add_func_stats(
newstats[newfunc],
(cc, nc, tt, ct, newcallers))
else:
newstats[newfunc] = (cc, nc, tt, ct, newcallers)
old_top = self.top_level
self.top_level = new_top = set()
for func in old_top:
new_top.add(func_strip_path(func))
self.max_name_len = max_name_len
self.fcn_list = None
self.all_callees = None
return self
def calc_callees(self):
if self.all_callees:
return
self.all_callees = all_callees = {}
for func, (cc, nc, tt, ct, callers) in self.stats.items():
if not func in all_callees:
all_callees[func] = {}
for func2, caller in callers.items():
if not func2 in all_callees:
all_callees[func2] = {}
all_callees[func2][func] = caller
return
#******************************************************************
# The following functions support actual printing of reports
#******************************************************************
# Optional "amount" is either a line count, or a percentage of lines.
def eval_print_amount(self, sel, list, msg):
new_list = list
if isinstance(sel, str):
try:
rex = re.compile(sel)
except re.PatternError:
msg += " <Invalid regular expression %r>\n" % sel
return new_list, msg
new_list = []
for func in list:
if rex.search(func_std_string(func)):
new_list.append(func)
else:
count = len(list)
if isinstance(sel, float) and 0.0 <= sel < 1.0:
count = int(count * sel + .5)
new_list = list[:count]
elif isinstance(sel, int) and 0 <= sel < count:
count = sel
new_list = list[:count]
if len(list) != len(new_list):
msg += " List reduced from %r to %r due to restriction <%r>\n" % (
len(list), len(new_list), sel)
return new_list, msg
def get_stats_profile(self):
"""This method returns an instance of StatsProfile, which contains a mapping
of function names to instances of FunctionProfile. Each FunctionProfile
instance holds information related to the function's profile such as how
long the function took to run, how many times it was called, etc...
"""
func_list = self.fcn_list[:] if self.fcn_list else list(self.stats.keys())
if not func_list:
return StatsProfile(0, {})
total_tt = float(f8(self.total_tt))
func_profiles = {}
stats_profile = StatsProfile(total_tt, func_profiles)
for func in func_list:
cc, nc, tt, ct, callers = self.stats[func]
file_name, line_number, func_name = func
ncalls = str(nc) if nc == cc else (str(nc) + '/' + str(cc))
tottime = float(f8(tt))
percall_tottime = -1 if nc == 0 else float(f8(tt/nc))
cumtime = float(f8(ct))
percall_cumtime = -1 if cc == 0 else float(f8(ct/cc))
func_profile = FunctionProfile(
ncalls,
tottime, # time spent in this function alone
percall_tottime,
cumtime, # time spent in the function plus all functions that this function called,
percall_cumtime,
file_name,
line_number
)
func_profiles[func_name] = func_profile
return stats_profile
def get_print_list(self, sel_list):
width = self.max_name_len
if self.fcn_list:
stat_list = self.fcn_list[:]
msg = " Ordered by: " + self.sort_type + '\n'
else:
stat_list = list(self.stats.keys())
msg = " Random listing order was used\n"
for selection in sel_list:
stat_list, msg = self.eval_print_amount(selection, stat_list, msg)
count = len(stat_list)
if not stat_list:
return 0, stat_list
print(msg, file=self.stream)
if count < len(self.stats):
width = 0
for func in stat_list:
if len(func_std_string(func)) > width:
width = len(func_std_string(func))
return width+2, stat_list
def print_stats(self, *amount):
for filename in self.files:
print(filename, file=self.stream)
if self.files:
print(file=self.stream)
indent = ' ' * 8
for func in self.top_level:
print(indent, func_get_function_name(func), file=self.stream)
print(indent, self.total_calls, "function calls", end=' ', file=self.stream)
if self.total_calls != self.prim_calls:
print("(%d primitive calls)" % self.prim_calls, end=' ', file=self.stream)
print("in %.3f seconds" % self.total_tt, file=self.stream)
print(file=self.stream)
width, list = self.get_print_list(amount)
if list:
self.print_title()
for func in list:
self.print_line(func)
print(file=self.stream)
print(file=self.stream)
return self
def print_callees(self, *amount):
width, list = self.get_print_list(amount)
if list:
self.calc_callees()
self.print_call_heading(width, "called...")
for func in list:
if func in self.all_callees:
self.print_call_line(width, func, self.all_callees[func])
else:
self.print_call_line(width, func, {})
print(file=self.stream)
print(file=self.stream)
return self
def print_callers(self, *amount):
width, list = self.get_print_list(amount)
if list:
self.print_call_heading(width, "was called by...")
for func in list:
cc, nc, tt, ct, callers = self.stats[func]
self.print_call_line(width, func, callers, "<-")
print(file=self.stream)
print(file=self.stream)
return self
def print_call_heading(self, name_size, column_title):
print("Function ".ljust(name_size) + column_title, file=self.stream)
# print sub-header only if we have new-style callers
subheader = False
for cc, nc, tt, ct, callers in self.stats.values():
if callers:
value = next(iter(callers.values()))
subheader = isinstance(value, tuple)
break
if subheader:
print(" "*name_size + " ncalls tottime cumtime", file=self.stream)
def print_call_line(self, name_size, source, call_dict, arrow="->"):
print(func_std_string(source).ljust(name_size) + arrow, end=' ', file=self.stream)
if not call_dict:
print(file=self.stream)
return
clist = sorted(call_dict.keys())
indent = ""
for func in clist:
name = func_std_string(func)
value = call_dict[func]
if isinstance(value, tuple):
nc, cc, tt, ct = value
if nc != cc:
substats = '%d/%d' % (nc, cc)
else:
substats = '%d' % (nc,)
substats = '%s %s %s %s' % (substats.rjust(7+2*len(indent)),
f8(tt), f8(ct), name)
left_width = name_size + 1
else:
substats = '%s(%r) %s' % (name, value, f8(self.stats[func][3]))
left_width = name_size + 3
print(indent*left_width + substats, file=self.stream)
indent = " "
def print_title(self):
print(' ncalls tottime percall cumtime percall', end=' ', file=self.stream)
print('filename:lineno(function)', file=self.stream)
def print_line(self, func): # hack: should print percentages
cc, nc, tt, ct, callers = self.stats[func]
c = str(nc)
if nc != cc:
c = c + '/' + str(cc)
print(c.rjust(9), end=' ', file=self.stream)
print(f8(tt), end=' ', file=self.stream)
if nc == 0:
print(' '*8, end=' ', file=self.stream)
else:
print(f8(tt/nc), end=' ', file=self.stream)
print(f8(ct), end=' ', file=self.stream)
if cc == 0:
print(' '*8, end=' ', file=self.stream)
else:
print(f8(ct/cc), end=' ', file=self.stream)
print(func_std_string(func), file=self.stream)
class TupleComp:
"""This class provides a generic function for comparing any two tuples.
Each instance records a list of tuple-indices (from most significant
to least significant), and sort direction (ascending or descending) for
each tuple-index. The compare functions can then be used as the function
argument to the system sort() function when a list of tuples need to be
sorted in the instances order."""
def __init__(self, comp_select_list):
self.comp_select_list = comp_select_list
def compare (self, left, right):
for index, direction in self.comp_select_list:
l = left[index]
r = right[index]
if l < r:
return -direction
if l > r:
return direction
return 0
#**************************************************************************
# func_name is a triple (file:string, line:int, name:string)
def func_strip_path(func_name):
filename, line, name = func_name
return os.path.basename(filename), line, name
def func_get_function_name(func):
return func[2]
def func_std_string(func_name): # match what old profile produced
if func_name[:2] == ('~', 0):
# special case for built-in functions
name = func_name[2]
if name.startswith('<') and name.endswith('>'):
return '{%s}' % name[1:-1]
else:
return name
else:
return "%s:%d(%s)" % func_name
#**************************************************************************
# The following functions combine statistics for pairs functions.
# The bulk of the processing involves correctly handling "call" lists,
# such as callers and callees.
#**************************************************************************
def add_func_stats(target, source):
"""Add together all the stats for two profile entries."""
cc, nc, tt, ct, callers = source
t_cc, t_nc, t_tt, t_ct, t_callers = target
return (cc+t_cc, nc+t_nc, tt+t_tt, ct+t_ct,
add_callers(t_callers, callers))
def add_callers(target, source):
"""Combine two caller lists in a single list."""
new_callers = {}
for func, caller in target.items():
new_callers[func] = caller
for func, caller in source.items():
if func in new_callers:
if isinstance(caller, tuple):
# format used by cProfile
new_callers[func] = tuple(i + j for i, j in zip(caller, new_callers[func]))
else:
# format used by profile
new_callers[func] += caller
else:
new_callers[func] = caller
return new_callers
def count_calls(callers):
"""Sum the caller statistics to get total number of calls received."""
nc = 0
for calls in callers.values():
nc += calls
return nc
#**************************************************************************
# The following functions support printing of reports
#**************************************************************************
def f8(x):
return "%8.3f" % x
#**************************************************************************
# Statistics browser added by ESR, April 2001
#**************************************************************************
if __name__ == '__main__':
import cmd
try:
import readline # noqa: F401
except ImportError:
pass
class ProfileBrowser(cmd.Cmd):
def __init__(self, profile=None):
cmd.Cmd.__init__(self)
self.prompt = "% "
self.stats = None
self.stream = sys.stdout
if profile is not None:
self.do_read(profile)
def generic(self, fn, line):
args = line.split()
processed = []
for term in args:
try:
processed.append(int(term))
continue
except ValueError:
pass
try:
frac = float(term)
if frac > 1 or frac < 0:
print("Fraction argument must be in [0, 1]", file=self.stream)
continue
processed.append(frac)
continue
except ValueError:
pass
processed.append(term)
if self.stats:
getattr(self.stats, fn)(*processed)
else:
print("No statistics object is loaded.", file=self.stream)
return 0
def generic_help(self):
print("Arguments may be:", file=self.stream)
print("* An integer maximum number of entries to print.", file=self.stream)
print("* A decimal fractional number between 0 and 1, controlling", file=self.stream)
print(" what fraction of selected entries to print.", file=self.stream)
print("* A regular expression; only entries with function names", file=self.stream)
print(" that match it are printed.", file=self.stream)
def do_add(self, line):
if self.stats:
try:
self.stats.add(line)
except OSError as e:
print("Failed to load statistics for %s: %s" % (line, e), file=self.stream)
else:
print("No statistics object is loaded.", file=self.stream)
return 0
def help_add(self):
print("Add profile info from given file to current statistics object.", file=self.stream)
def do_callees(self, line):
return self.generic('print_callees', line)
def help_callees(self):
print("Print callees statistics from the current stat object.", file=self.stream)
self.generic_help()
def do_callers(self, line):
return self.generic('print_callers', line)
def help_callers(self):
print("Print callers statistics from the current stat object.", file=self.stream)
self.generic_help()
def do_EOF(self, line):
print("", file=self.stream)
return 1
def help_EOF(self):
print("Leave the profile browser.", file=self.stream)
def do_quit(self, line):
return 1
def help_quit(self):
print("Leave the profile browser.", file=self.stream)
def do_read(self, line):
if line:
try:
self.stats = Stats(line)
except OSError as err:
print(err.args[1], file=self.stream)
return
except Exception as err:
print(err.__class__.__name__ + ':', err, file=self.stream)
return
self.prompt = line + "% "
elif len(self.prompt) > 2:
line = self.prompt[:-2]
self.do_read(line)
else:
print("No statistics object is current -- cannot reload.", file=self.stream)
return 0
def help_read(self):
print("Read in profile data from a specified file.", file=self.stream)
print("Without argument, reload the current file.", file=self.stream)
def do_reverse(self, line):
if self.stats:
self.stats.reverse_order()
else:
print("No statistics object is loaded.", file=self.stream)
return 0
def help_reverse(self):
print("Reverse the sort order of the profiling report.", file=self.stream)
def do_sort(self, line):
if not self.stats:
print("No statistics object is loaded.", file=self.stream)
return
abbrevs = self.stats.get_sort_arg_defs()
if line and all((x in abbrevs) for x in line.split()):
self.stats.sort_stats(*line.split())
else:
print("Valid sort keys (unique prefixes are accepted):", file=self.stream)
for (key, value) in Stats.sort_arg_dict_default.items():
print("%s -- %s" % (key, value[1]), file=self.stream)
return 0
def help_sort(self):
print("Sort profile data according to specified keys.", file=self.stream)
print("(Typing `sort' without arguments lists valid keys.)", file=self.stream)
def complete_sort(self, text, *args):
return [a for a in Stats.sort_arg_dict_default if a.startswith(text)]
def do_stats(self, line):
return self.generic('print_stats', line)
def help_stats(self):
print("Print statistics from the current stat object.", file=self.stream)
self.generic_help()
def do_strip(self, line):
if self.stats:
self.stats.strip_dirs()
else:
print("No statistics object is loaded.", file=self.stream)
def help_strip(self):
print("Strip leading path information from filenames in the report.", file=self.stream)
def help_help(self):
print("Show help for a given command.", file=self.stream)
def postcmd(self, stop, line):
if stop:
return stop
return None
if len(sys.argv) > 1:
initprofile = sys.argv[1]
else:
initprofile = None
try:
browser = ProfileBrowser(initprofile)
for profile in sys.argv[2:]:
browser.do_add(profile)
print("Welcome to the profile statistics browser.", file=browser.stream)
browser.cmdloop()
print("Goodbye.", file=browser.stream)
except KeyboardInterrupt:
pass
# That's all, folks.

View File

@@ -1,62 +0,0 @@
import sys
import types
import unittest
# bpo-46417: Test that structseq types used by the sys module are still
# valid when Py_Finalize()/Py_Initialize() are called multiple times.
class TestStructSeq(unittest.TestCase):
# test PyTypeObject members
def check_structseq(self, obj_type):
# ob_refcnt
self.assertGreaterEqual(sys.getrefcount(obj_type), 1)
# tp_base
self.assertIsSubclass(obj_type, tuple)
# tp_bases
self.assertEqual(obj_type.__bases__, (tuple,))
# tp_dict
self.assertIsInstance(obj_type.__dict__, types.MappingProxyType)
# tp_mro
self.assertEqual(obj_type.__mro__, (obj_type, tuple, object))
# tp_name
self.assertIsInstance(type.__name__, str)
# tp_subclasses
self.assertEqual(obj_type.__subclasses__(), [])
def test_sys_attrs(self):
for attr_name in (
'flags', # FlagsType
'float_info', # FloatInfoType
'hash_info', # Hash_InfoType
'int_info', # Int_InfoType
'thread_info', # ThreadInfoType
'version_info', # VersionInfoType
):
with self.subTest(attr=attr_name):
attr = getattr(sys, attr_name)
self.check_structseq(type(attr))
def test_sys_funcs(self):
func_names = ['get_asyncgen_hooks'] # AsyncGenHooksType
if hasattr(sys, 'getwindowsversion'):
func_names.append('getwindowsversion') # WindowsVersionType
for func_name in func_names:
with self.subTest(func=func_name):
func = getattr(sys, func_name)
obj = func()
self.check_structseq(type(obj))
try:
unittest.main(
module=(
'__main__'
if __name__ == '__main__'
# Avoiding a circular import:
else sys.modules['test._test_embed_structseq']
)
)
except SystemExit as exc:
if exc.args[0] != 0:
raise
print("Tests passed")

BIN
Lib/test/pstats.pck vendored

Binary file not shown.

View File

@@ -3,23 +3,17 @@
# See test_cmd_line_script.py for testing of script execution
import os
import re
import subprocess
import sys
import sysconfig
import tempfile
import textwrap
import unittest
from test import support
from test.support import os_helper
from test.support import force_not_colorized
from test.support import threading_helper
from test.support.script_helper import (
spawn_python, kill_python, assert_python_ok, assert_python_failure,
interpreter_requires_environment
)
from textwrap import dedent
if not support.has_subprocess_support:
raise unittest.SkipTest("test module requires subprocess")
@@ -39,13 +33,11 @@ class CmdLineTest(unittest.TestCase):
def verify_valid_flag(self, cmd_line):
rc, out, err = assert_python_ok(cmd_line)
if out != b'':
self.assertEndsWith(out, b'\n')
self.assertTrue(out == b'' or out.endswith(b'\n'))
self.assertNotIn(b'Traceback', out)
self.assertNotIn(b'Traceback', err)
return out
@support.cpython_only
def test_help(self):
self.verify_valid_flag('-h')
self.verify_valid_flag('-?')
@@ -56,28 +48,20 @@ class CmdLineTest(unittest.TestCase):
self.assertNotIn(b'-X dev', out)
self.assertLess(len(lines), 50)
@support.cpython_only
# TODO: RUSTPYTHON
@unittest.expectedFailure
def test_help_env(self):
out = self.verify_valid_flag('--help-env')
self.assertIn(b'PYTHONHOME', out)
# Env vars in each section should be sorted alphabetically
# (ignoring underscores so PYTHON_FOO and PYTHONFOO intermix naturally)
sort_key = lambda name: name.replace(b'_', b'').lower()
sections = out.split(b'These variables have equivalent')
for section in sections:
envvars = re.findall(rb'^(PYTHON\w+)', section, re.MULTILINE)
self.assertEqual(envvars, sorted(envvars, key=sort_key),
"env vars should be sorted alphabetically")
@support.cpython_only
# TODO: RUSTPYTHON
@unittest.expectedFailure
def test_help_xoptions(self):
out = self.verify_valid_flag('--help-xoptions')
self.assertIn(b'-X dev', out)
options = re.findall(rb'^-X (\w+)', out, re.MULTILINE)
self.assertEqual(options, sorted(options),
"options should be sorted alphabetically")
@support.cpython_only
# TODO: RUSTPYTHON
@unittest.expectedFailure
def test_help_all(self):
out = self.verify_valid_flag('--help-all')
lines = out.splitlines()
@@ -96,13 +80,12 @@ class CmdLineTest(unittest.TestCase):
def test_site_flag(self):
self.verify_valid_flag('-S')
@support.cpython_only
def test_version(self):
version = ('Python %d.%d' % sys.version_info[:2]).encode("ascii")
for switch in '-V', '--version', '-VV':
rc, out, err = assert_python_ok(switch)
self.assertNotStartsWith(err, version)
self.assertStartsWith(out, version)
self.assertFalse(err.startswith(version))
self.assertTrue(out.startswith(version))
def test_verbose(self):
# -v causes imports to write to stderr. If the write to
@@ -162,7 +145,8 @@ class CmdLineTest(unittest.TestCase):
else:
self.assertEqual(err, b'')
@support.cpython_only
# TODO: RUSTPYTHON
@unittest.expectedFailure
def test_xoption_frozen_modules(self):
tests = {
('=on', 'FrozenImporter'),
@@ -177,18 +161,6 @@ class CmdLineTest(unittest.TestCase):
res = assert_python_ok(*cmd)
self.assertRegex(res.out.decode('utf-8'), expected)
@support.cpython_only
def test_env_var_frozen_modules(self):
tests = {
('on', 'FrozenImporter'),
('off', 'SourceFileLoader'),
}
for raw, expected in tests:
cmd = ['-c', 'import os; print(os.__spec__.loader, end="")']
with self.subTest(raw):
res = assert_python_ok(*cmd, PYTHON_FROZEN_MODULES=raw)
self.assertRegex(res.out.decode('utf-8'), expected)
def test_run_module(self):
# Test expected operation of the '-m' switch
# Switch needs an argument
@@ -201,7 +173,6 @@ class CmdLineTest(unittest.TestCase):
# All good if module is located and run successfully
assert_python_ok('-m', 'timeit', '-n', '1')
@unittest.expectedFailureIf(support.is_android, "TODO: RUSTPYTHON")
def test_run_module_bug1764407(self):
# -m and -i need to play well together
# Runs the timeit module and checks the __main__
@@ -213,14 +184,6 @@ class CmdLineTest(unittest.TestCase):
self.assertTrue(data.find(b'1 loop') != -1)
self.assertTrue(data.find(b'__main__.Timer') != -1)
@support.cpython_only
def test_null_byte_in_interactive_mode(self):
# gh-140594: Fix an out of bounds read when a single NUL character
# is read from the standard input in interactive mode.
proc = spawn_python('-i')
proc.communicate(b'\x00', timeout=support.SHORT_TIMEOUT)
self.assertEqual(proc.returncode, 0)
def test_relativedir_bug46421(self):
# Test `python -m unittest` with a relative directory beginning with ./
# Note: We have to switch to the project's top module's directory, as per
@@ -259,7 +222,8 @@ class CmdLineTest(unittest.TestCase):
# command line, but how subprocess does decode bytes to unicode. Python
# doesn't decode the command line because Windows provides directly the
# arguments as unicode (using wmain() instead of main()).
@unittest.expectedFailure # TODO: RUSTPYTHON
# TODO: RUSTPYTHON
@unittest.expectedFailure
@unittest.skipIf(sys.platform == 'win32',
'Windows has a native unicode API')
def test_undecodable_code(self):
@@ -295,7 +259,8 @@ class CmdLineTest(unittest.TestCase):
if not stdout.startswith(pattern):
raise AssertionError("%a doesn't start with %a" % (stdout, pattern))
@unittest.expectedFailure # TODO: RUSTPYTHON
# TODO: RUSTPYTHON
@unittest.expectedFailure
@unittest.skipIf(sys.platform == 'win32',
'Windows has a native unicode API')
def test_invalid_utf8_arg(self):
@@ -307,10 +272,12 @@ class CmdLineTest(unittest.TestCase):
# Test with default config, in the C locale, in the Python UTF-8 Mode.
code = 'import sys, os; s=os.fsencode(sys.argv[1]); print(ascii(s))'
# TODO: RUSTPYTHON
def run_default(arg):
cmd = [sys.executable, '-c', code, arg]
return subprocess.run(cmd, stdout=subprocess.PIPE, text=True)
# TODO: RUSTPYTHON
def run_c_locale(arg):
cmd = [sys.executable, '-c', code, arg]
env = dict(os.environ)
@@ -318,6 +285,7 @@ class CmdLineTest(unittest.TestCase):
return subprocess.run(cmd, stdout=subprocess.PIPE,
text=True, env=env)
# TODO: RUSTPYTHON
def run_utf8_mode(arg):
cmd = [sys.executable, '-X', 'utf8', '-c', code, arg]
return subprocess.run(cmd, stdout=subprocess.PIPE, text=True)
@@ -362,8 +330,6 @@ class CmdLineTest(unittest.TestCase):
self.assertEqual(stdout, expected)
self.assertEqual(p.returncode, 0)
@unittest.skipIf(os.environ.get("PYTHONUNBUFFERED", "0") != "0",
"Python stdio buffering is disabled.")
def test_non_interactive_output_buffering(self):
code = textwrap.dedent("""
import sys
@@ -403,7 +369,7 @@ class CmdLineTest(unittest.TestCase):
p.stdin.flush()
data, rc = _kill_python_and_exit_code(p)
self.assertEqual(rc, 0)
self.assertStartsWith(data, b'x')
self.assertTrue(data.startswith(b'x'), data)
def test_large_PYTHONPATH(self):
path1 = "ABCDE" * 100
@@ -440,7 +406,8 @@ class CmdLineTest(unittest.TestCase):
# for empty and unset PYTHONPATH
self.assertEqual(out1, out2)
@unittest.expectedFailure # TODO: RUSTPYTHON
# TODO: RUSTPYTHON
@unittest.expectedFailure
def test_displayhook_unencodable(self):
for encoding in ('ascii', 'latin-1', 'utf-8'):
env = os.environ.copy()
@@ -508,7 +475,6 @@ class CmdLineTest(unittest.TestCase):
self.assertRegex(err.decode('ascii', 'ignore'), 'SyntaxError')
self.assertEqual(b'', out)
@unittest.expectedFailure # TODO: RUSTPYTHON
def test_stdout_flush_at_shutdown(self):
# Issue #5319: if stdout.flush() fails at shutdown, an error should
# be printed out.
@@ -520,9 +486,8 @@ class CmdLineTest(unittest.TestCase):
rc, out, err = assert_python_failure('-c', code)
self.assertEqual(b'', out)
self.assertEqual(120, rc)
self.assertIn(b'Exception ignored while flushing sys.stdout:\n'
b'OSError: '.replace(b'\n', os.linesep.encode()),
err)
self.assertRegex(err.decode('ascii', 'ignore'),
'Exception ignored in.*\nOSError: .*')
def test_closed_stdout(self):
# Issue #13444: if stdout has been explicitly closed, we should
@@ -560,19 +525,23 @@ class CmdLineTest(unittest.TestCase):
self.assertEqual(err, b'')
self.assertEqual(p.returncode, 42)
@unittest.expectedFailure # TODO: RUSTPYTHON
# TODO: RUSTPYTHON
@unittest.expectedFailure
def test_no_stdin(self):
self._test_no_stdio(['stdin'])
@unittest.expectedFailure # TODO: RUSTPYTHON
# TODO: RUSTPYTHON
@unittest.expectedFailure
def test_no_stdout(self):
self._test_no_stdio(['stdout'])
@unittest.expectedFailure # TODO: RUSTPYTHON
# TODO: RUSTPYTHON
@unittest.expectedFailure
def test_no_stderr(self):
self._test_no_stdio(['stderr'])
@unittest.expectedFailure # TODO: RUSTPYTHON
# TODO: RUSTPYTHON
@unittest.expectedFailure
def test_no_std_streams(self):
self._test_no_stdio(['stdin', 'stdout', 'stderr'])
@@ -624,7 +593,8 @@ class CmdLineTest(unittest.TestCase):
print("del sys.modules['__main__']", file=script)
assert_python_ok(filename)
@support.cpython_only
# TODO: RUSTPYTHON
@unittest.expectedFailure
def test_unknown_options(self):
rc, out, err = assert_python_failure('-E', '-z')
self.assertIn(b'Unknown option: -z', err)
@@ -671,7 +641,6 @@ class CmdLineTest(unittest.TestCase):
cwd=tmpdir)
self.assertEqual(out.strip(), b"ok")
@unittest.expectedFailure # TODO: RUSTPYTHON
def test_sys_flags_set(self):
# Issue 31845: a startup refactoring broke reading flags from env vars
for value, expected in (("", 0), ("1", 1), ("text", 1), ("2", 2)):
@@ -681,19 +650,22 @@ class CmdLineTest(unittest.TestCase):
PYTHONDONTWRITEBYTECODE=value,
PYTHONVERBOSE=value,
)
expected_bool = int(bool(value))
dont_write_bytecode = int(bool(value))
code = (
"import sys; "
"sys.stderr.write(str(sys.flags)); "
f"""sys.exit(not (
sys.flags.optimize == sys.flags.verbose == {expected}
and sys.flags.debug == sys.flags.dont_write_bytecode == {expected_bool}
sys.flags.debug == sys.flags.optimize ==
sys.flags.verbose ==
{expected}
and sys.flags.dont_write_bytecode == {dont_write_bytecode}
))"""
)
with self.subTest(envar_value=value):
assert_python_ok('-c', code, **env_vars)
@unittest.expectedFailure # TODO: RUSTPYTHON
# TODO: RUSTPYTHON
@unittest.expectedFailure
def test_set_pycache_prefix(self):
# sys.pycache_prefix can be set from either -X pycache_prefix or
# PYTHONPYCACHEPREFIX env var, with the former taking precedence.
@@ -739,7 +711,6 @@ class CmdLineTest(unittest.TestCase):
self.assertEqual(proc.returncode, 0, proc)
return proc.stdout.rstrip()
@support.cpython_only
def test_xdev(self):
# sys.flags.dev_mode
code = "import sys; print(sys.flags.dev_mode)"
@@ -776,24 +747,22 @@ class CmdLineTest(unittest.TestCase):
# Memory allocator debug hooks
try:
import _testinternalcapi # noqa: F401
import _testcapi
except ImportError:
pass
else:
code = "import _testinternalcapi; print(_testinternalcapi.pymem_getallocatorsname())"
code = "import _testcapi; print(_testcapi.pymem_getallocatorsname())"
with support.SuppressCrashReport():
out = self.run_xdev("-c", code, check_exitcode=False)
if support.with_pymalloc():
alloc_name = "pymalloc_debug"
elif support.Py_GIL_DISABLED:
alloc_name = "mimalloc_debug"
else:
alloc_name = "malloc_debug"
self.assertEqual(out, alloc_name)
# Faulthandler
try:
import faulthandler # noqa: F401
import faulthandler
except ImportError:
pass
else:
@@ -822,7 +791,8 @@ class CmdLineTest(unittest.TestCase):
self.assertEqual(proc.returncode, 0, proc)
return proc.stdout.rstrip()
@unittest.expectedFailure # TODO: RUSTPYTHON
# TODO: RUSTPYTHON
@unittest.expectedFailure
def test_warnings_filter_precedence(self):
expected_filters = ("error::BytesWarning "
"once::UserWarning "
@@ -845,7 +815,7 @@ class CmdLineTest(unittest.TestCase):
self.assertEqual(out, expected_filters)
def check_pythonmalloc(self, env_var, name):
code = 'import _testinternalcapi; print(_testinternalcapi.pymem_getallocatorsname())'
code = 'import _testcapi; print(_testcapi.pymem_getallocatorsname())'
env = dict(os.environ)
env.pop('PYTHONDEVMODE', None)
if env_var is not None:
@@ -861,16 +831,12 @@ class CmdLineTest(unittest.TestCase):
self.assertEqual(proc.stdout.rstrip(), name)
self.assertEqual(proc.returncode, 0)
@support.cpython_only
# TODO: RUSTPYTHON
@unittest.expectedFailure
def test_pythonmalloc(self):
# Test the PYTHONMALLOC environment variable
malloc = not support.Py_GIL_DISABLED
pymalloc = support.with_pymalloc()
mimalloc = support.with_mimalloc()
if support.Py_GIL_DISABLED:
default_name = 'mimalloc_debug' if support.Py_DEBUG else 'mimalloc'
default_name_debug = 'mimalloc_debug'
elif pymalloc:
if pymalloc:
default_name = 'pymalloc_debug' if support.Py_DEBUG else 'pymalloc'
default_name_debug = 'pymalloc_debug'
else:
@@ -880,28 +846,21 @@ class CmdLineTest(unittest.TestCase):
tests = [
(None, default_name),
('debug', default_name_debug),
('malloc', 'malloc'),
('malloc_debug', 'malloc_debug'),
]
if malloc:
tests.extend([
('malloc', 'malloc'),
('malloc_debug', 'malloc_debug'),
])
if pymalloc:
tests.extend((
('pymalloc', 'pymalloc'),
('pymalloc_debug', 'pymalloc_debug'),
))
if mimalloc:
tests.extend((
('mimalloc', 'mimalloc'),
('mimalloc_debug', 'mimalloc_debug'),
))
for env_var, name in tests:
with self.subTest(env_var=env_var, name=name):
self.check_pythonmalloc(env_var, name)
@unittest.expectedFailure # TODO: RUSTPYTHON
# TODO: RUSTPYTHON
@unittest.expectedFailure
def test_pythondevmode_env(self):
# Test the PYTHONDEVMODE environment variable
code = "import sys; print(sys.flags.dev_mode)"
@@ -920,138 +879,6 @@ class CmdLineTest(unittest.TestCase):
self.assertEqual(proc.stdout.rstrip(), 'True')
self.assertEqual(proc.returncode, 0, proc)
@unittest.expectedFailure # TODO: RUSTPYTHON
def test_python_gil(self):
cases = [
# (env, opt, expected, msg)
('1', None, '1', "PYTHON_GIL=1"),
(None, '1', '1', "-X gil=1"),
]
if support.Py_GIL_DISABLED:
cases.extend(
[
(None, None, 'None', "no options set"),
('0', None, '0', "PYTHON_GIL=0"),
('1', '0', '0', "-X gil=0 overrides PYTHON_GIL=1"),
(None, '0', '0', "-X gil=0"),
]
)
else:
cases.extend(
[
(None, None, '1', '-X gil=0 (unsupported by this build)'),
('1', None, '1', 'PYTHON_GIL=0 (unsupported by this build)'),
]
)
code = "import sys; print(sys.flags.gil)"
environ = dict(os.environ)
for env, opt, expected, msg in cases:
with self.subTest(msg, env=env, opt=opt):
environ.pop('PYTHON_GIL', None)
if env is not None:
environ['PYTHON_GIL'] = env
extra_args = []
if opt is not None:
extra_args = ['-X', f'gil={opt}']
proc = subprocess.run([sys.executable, *extra_args, '-c', code],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True, env=environ)
self.assertEqual(proc.returncode, 0, proc)
self.assertEqual(proc.stdout.rstrip(), expected)
self.assertEqual(proc.stderr, '')
def test_python_asyncio_debug(self):
code = "import asyncio; print(asyncio.new_event_loop().get_debug())"
rc, out, err = assert_python_ok('-c', code, PYTHONASYNCIODEBUG='1')
self.assertIn(b'True', out)
@unittest.skipUnless(sysconfig.get_config_var('Py_TRACE_REFS'), "Requires --with-trace-refs build option")
def test_python_dump_refs(self):
code = 'import sys; sys._clear_internal_caches()'
rc, out, err = assert_python_ok('-c', code, PYTHONDUMPREFS='1')
self.assertEqual(rc, 0)
@unittest.skipUnless(sysconfig.get_config_var('Py_TRACE_REFS'), "Requires --with-trace-refs build option")
def test_python_dump_refs_file(self):
with tempfile.NamedTemporaryFile() as dump_file:
code = 'import sys; sys._clear_internal_caches()'
rc, out, err = assert_python_ok('-c', code, PYTHONDUMPREFSFILE=dump_file.name)
self.assertEqual(rc, 0)
with open(dump_file.name, 'r') as file:
contents = file.read()
self.assertIn('Remaining objects', contents)
@unittest.expectedFailureIf(sys.platform == "darwin", "TODO: RUSTPYTHON")
@unittest.skipUnless(sys.platform == 'darwin', 'PYTHONEXECUTABLE only works on macOS')
def test_python_executable(self):
code = 'import sys; print(sys.executable)'
expected = "/busr/bbin/bpython"
rc, out, err = assert_python_ok('-c', code, PYTHONEXECUTABLE=expected)
self.assertIn(expected.encode(), out)
@unittest.expectedFailureIf(support.MS_WINDOWS, "TODO: RUSTPYTHON")
@unittest.skipUnless(support.MS_WINDOWS, 'Test only applicable on Windows')
def test_python_legacy_windows_fs_encoding(self):
code = "import sys; print(sys.getfilesystemencoding())"
expected = 'mbcs'
rc, out, err = assert_python_ok('-c', code, PYTHONLEGACYWINDOWSFSENCODING='1')
self.assertIn(expected.encode(), out)
@unittest.expectedFailureIf(support.MS_WINDOWS, "TODO: RUSTPYTHON")
@unittest.skipUnless(support.MS_WINDOWS, 'Test only applicable on Windows')
def test_python_legacy_windows_stdio(self):
# Test that _WindowsConsoleIO is used when PYTHONLEGACYWINDOWSSTDIO
# is not set.
# We cannot use PIPE becase it prevents creating new console.
# So we use exit code.
code = "import sys; sys.exit(type(sys.stdout.buffer.raw).__name__ != '_WindowsConsoleIO')"
env = os.environ.copy()
env["PYTHONLEGACYWINDOWSSTDIO"] = ""
p = subprocess.run([sys.executable, "-c", code],
creationflags=subprocess.CREATE_NEW_CONSOLE,
env=env)
self.assertEqual(p.returncode, 0)
# Then test that FIleIO is used when PYTHONLEGACYWINDOWSSTDIO is set.
code = "import sys; sys.exit(type(sys.stdout.buffer.raw).__name__ != 'FileIO')"
env["PYTHONLEGACYWINDOWSSTDIO"] = "1"
p = subprocess.run([sys.executable, "-c", code],
creationflags=subprocess.CREATE_NEW_CONSOLE,
env=env)
self.assertEqual(p.returncode, 0)
@unittest.expectedFailure # TODO: RUSTPYTHON
@unittest.skipIf("-fsanitize" in sysconfig.get_config_vars().get('PY_CFLAGS', ()),
"PYTHONMALLOCSTATS doesn't work with ASAN")
def test_python_malloc_stats(self):
code = "pass"
rc, out, err = assert_python_ok('-c', code, PYTHONMALLOCSTATS='1')
self.assertIn(b'Small block threshold', err)
def test_python_user_base(self):
code = "import site; print(site.USER_BASE)"
expected = "/custom/userbase"
rc, out, err = assert_python_ok('-c', code, PYTHONUSERBASE=expected)
self.assertIn(expected.encode(), out)
def test_python_basic_repl(self):
# Currently this only tests that the env var is set. See test_pyrepl.test_python_basic_repl.
code = "import os; print('PYTHON_BASIC_REPL' in os.environ)"
expected = "True"
rc, out, err = assert_python_ok('-c', code, PYTHON_BASIC_REPL='1')
self.assertIn(expected.encode(), out)
@unittest.skipUnless(sysconfig.get_config_var('HAVE_PERF_TRAMPOLINE'), "Requires HAVE_PERF_TRAMPOLINE support")
def test_python_perf_jit_support(self):
code = "import sys; print(sys.is_stack_trampoline_active())"
expected = "True"
rc, out, err = assert_python_ok('-c', code, PYTHON_PERF_JIT_SUPPORT='1')
self.assertIn(expected.encode(), out)
@unittest.skipUnless(sys.platform == 'win32',
'bpo-32457 only applies on Windows')
def test_argv0_normalization(self):
@@ -1064,15 +891,16 @@ class CmdLineTest(unittest.TestCase):
self.assertEqual(proc.returncode, 0, proc)
self.assertEqual(proc.stdout.strip(), b'0')
@support.cpython_only
# TODO: RUSTPYTHON
@unittest.expectedFailure
def test_parsing_error(self):
args = [sys.executable, '-I', '--unknown-option']
proc = subprocess.run(args,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True)
err_msg = "Unknown option: --unknown-option\nusage: "
self.assertStartsWith(proc.stderr, err_msg)
err_msg = "unknown option --unknown-option\nusage: "
self.assertTrue(proc.stderr.startswith(err_msg), proc.stderr)
self.assertNotEqual(proc.returncode, 0)
def test_int_max_str_digits(self):
@@ -1087,8 +915,11 @@ class CmdLineTest(unittest.TestCase):
assert_python_failure('-c', code, PYTHONINTMAXSTRDIGITS='foo')
assert_python_failure('-c', code, PYTHONINTMAXSTRDIGITS='100')
def res2int(res):
out = res.out.strip().decode("utf-8")
return tuple(int(i) for i in out.split())
res = assert_python_ok('-c', code)
res2int = self.res2int
current_max = sys.get_int_max_str_digits()
self.assertEqual(res2int(res), (current_max, current_max))
res = assert_python_ok('-X', 'int_max_str_digits=0', '-c', code)
@@ -1108,181 +939,6 @@ class CmdLineTest(unittest.TestCase):
)
self.assertEqual(res2int(res), (6000, 6000))
@unittest.expectedFailure # TODO: RUSTPYTHON
def test_cmd_dedent(self):
# test that -c auto-dedents its arguments
test_cases = [
(
"""
print('space-auto-dedent')
""",
"space-auto-dedent",
),
(
dedent(
"""
^^^print('tab-auto-dedent')
"""
).replace("^", "\t"),
"tab-auto-dedent",
),
(
dedent(
"""
^^if 1:
^^^^print('mixed-auto-dedent-1')
^^print('mixed-auto-dedent-2')
"""
).replace("^", "\t \t"),
"mixed-auto-dedent-1\nmixed-auto-dedent-2",
),
(
'''
data = """$
this data has an empty newline above and a newline with spaces below $
$
"""$
if 1: $
print(repr(data))$
'''.replace(
"$", ""
),
# Note: entirely blank lines are normalized to \n, even if they
# are part of a data string. This is consistent with
# textwrap.dedent behavior, but might not be intuitive.
"'\\n\\nthis data has an empty newline above and a newline with spaces below \\n\\n'",
),
(
'',
'',
),
(
' \t\n\t\n \t\t\t \t\t \t\n\t\t \n\n\n\t\t\t ',
'',
),
]
for code, expected in test_cases:
# Run the auto-dedent case
args1 = sys.executable, '-c', code
proc1 = subprocess.run(args1, stdout=subprocess.PIPE)
self.assertEqual(proc1.returncode, 0, proc1)
output1 = proc1.stdout.strip().decode(encoding='utf-8')
# Manually dedent beforehand, check the result is the same.
args2 = sys.executable, '-c', dedent(code)
proc2 = subprocess.run(args2, stdout=subprocess.PIPE)
self.assertEqual(proc2.returncode, 0, proc2)
output2 = proc2.stdout.strip().decode(encoding='utf-8')
self.assertEqual(output1, output2)
self.assertEqual(output1.replace('\r\n', '\n'), expected)
def test_cmd_dedent_failcase(self):
# Mixing tabs and spaces is not allowed
from textwrap import dedent
template = dedent(
'''
-+if 1:
+-++ print('will fail')
''')
code = template.replace('-', ' ').replace('+', '\t')
assert_python_failure('-c', code)
code = template.replace('-', '\t').replace('+', ' ')
assert_python_failure('-c', code)
@unittest.expectedFailure # TODO: RUSTPYTHON
def test_cpu_count(self):
code = "import os; print(os.cpu_count(), os.process_cpu_count())"
res = assert_python_ok('-X', 'cpu_count=4321', '-c', code)
self.assertEqual(self.res2int(res), (4321, 4321))
res = assert_python_ok('-c', code, PYTHON_CPU_COUNT='1234')
self.assertEqual(self.res2int(res), (1234, 1234))
def test_cpu_count_default(self):
code = "import os; print(os.cpu_count(), os.process_cpu_count())"
res = assert_python_ok('-X', 'cpu_count=default', '-c', code)
self.assertEqual(self.res2int(res), (os.cpu_count(), os.process_cpu_count()))
res = assert_python_ok('-X', 'cpu_count=default', '-c', code, PYTHON_CPU_COUNT='1234')
self.assertEqual(self.res2int(res), (os.cpu_count(), os.process_cpu_count()))
res = assert_python_ok('-c', code, PYTHON_CPU_COUNT='default')
self.assertEqual(self.res2int(res), (os.cpu_count(), os.process_cpu_count()))
@unittest.expectedFailure # TODO: RUSTPYTHON
def test_import_time(self):
# os is not imported at startup
code = 'import os; import os'
for case in 'importtime', 'importtime=1', 'importtime=true':
res = assert_python_ok('-X', case, '-c', code)
res_err = res.err.decode('utf-8')
self.assertRegex(res_err, r'import time: \s*\d+ \| \s*\d+ \| \s*os')
self.assertNotRegex(res_err, r'import time: cached\s* \| cached\s* \| os')
res = assert_python_ok('-X', 'importtime=2', '-c', code)
res_err = res.err.decode('utf-8')
self.assertRegex(res_err, r'import time: \s*\d+ \| \s*\d+ \| \s*os')
self.assertRegex(res_err, r'import time: cached\s* \| cached\s* \| os')
assert_python_failure('-X', 'importtime=-1', '-c', code)
assert_python_failure('-X', 'importtime=3', '-c', code)
def res2int(self, res):
out = res.out.strip().decode("utf-8")
return tuple(int(i) for i in out.split())
@unittest.skipUnless(support.Py_GIL_DISABLED,
"PYTHON_TLBC and -X tlbc"
" only supported in Py_GIL_DISABLED builds")
@threading_helper.requires_working_threading()
def test_disable_thread_local_bytecode(self):
code = """if 1:
import threading
def test(x, y):
return x + y
t = threading.Thread(target=test, args=(1,2))
t.start()
t.join()"""
assert_python_ok("-W", "always", "-X", "tlbc=0", "-c", code)
assert_python_ok("-W", "always", "-c", code, PYTHON_TLBC="0")
@unittest.skipUnless(support.Py_GIL_DISABLED,
"PYTHON_TLBC and -X tlbc"
" only supported in Py_GIL_DISABLED builds")
@threading_helper.requires_working_threading()
def test_enable_thread_local_bytecode(self):
code = """if 1:
import threading
def test(x, y):
return x + y
t = threading.Thread(target=test, args=(1,2))
t.start()
t.join()"""
# The functionality of thread-local bytecode is tested more extensively
# in test_thread_local_bytecode
assert_python_ok("-W", "always", "-X", "tlbc=1", "-c", code)
assert_python_ok("-W", "always", "-c", code, PYTHON_TLBC="1")
@unittest.expectedFailure # TODO: RUSTPYTHON
@unittest.skipUnless(support.Py_GIL_DISABLED,
"PYTHON_TLBC and -X tlbc"
" only supported in Py_GIL_DISABLED builds")
def test_invalid_thread_local_bytecode(self):
rc, out, err = assert_python_failure("-X", "tlbc")
self.assertIn(b"tlbc=n: n is missing or invalid", err)
rc, out, err = assert_python_failure("-X", "tlbc=foo")
self.assertIn(b"tlbc=n: n is missing or invalid", err)
rc, out, err = assert_python_failure("-X", "tlbc=-1")
self.assertIn(b"tlbc=n: n is missing or invalid", err)
rc, out, err = assert_python_failure("-X", "tlbc=2")
self.assertIn(b"tlbc=n: n is missing or invalid", err)
rc, out, err = assert_python_failure(PYTHON_TLBC="foo")
self.assertIn(b"PYTHON_TLBC=N: N is missing or invalid", err)
rc, out, err = assert_python_failure(PYTHON_TLBC="-1")
self.assertIn(b"PYTHON_TLBC=N: N is missing or invalid", err)
rc, out, err = assert_python_failure(PYTHON_TLBC="2")
self.assertIn(b"PYTHON_TLBC=N: N is missing or invalid", err)
@unittest.skipIf(interpreter_requires_environment(),
'Cannot run -I tests when PYTHON env vars are required.')
@@ -1323,7 +979,6 @@ class IgnoreEnvironmentTest(unittest.TestCase):
class SyntaxErrorTests(unittest.TestCase):
@force_not_colorized
def check_string(self, code):
proc = subprocess.run([sys.executable, "-"], input=code,
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
@@ -1331,11 +986,13 @@ class SyntaxErrorTests(unittest.TestCase):
self.assertNotEqual(proc.stderr, None)
self.assertIn(b"\nSyntaxError", proc.stderr)
@unittest.expectedFailureIf(not support.is_android, "TODO: RUSTPYTHON")
# TODO: RUSTPYTHON
@unittest.expectedFailure
def test_tokenizer_error_with_stdin(self):
self.check_string(b"(1+2+3")
@unittest.expectedFailureIf(not support.is_android, "TODO: RUSTPYTHON")
# TODO: RUSTPYTHON
@unittest.expectedFailure
def test_decoding_error_at_the_end_of_the_line(self):
self.check_string(br"'\u1f'")

View File

@@ -14,7 +14,8 @@ import io
import textwrap
from test import support
from test.support import import_helper, is_apple, os_helper
from test.support import import_helper
from test.support import os_helper
from test.support.script_helper import (
make_pkg, make_script, make_zip_pkg, make_zip_script,
assert_python_ok, assert_python_failure, spawn_python, kill_python)
@@ -88,8 +89,6 @@ def _make_test_zip_pkg(zip_dir, zip_basename, pkg_name, script_basename,
importlib.invalidate_caches()
return to_return
@support.force_not_colorized_test_class
class CmdLineTest(unittest.TestCase):
def _check_output(self, script_name, exit_code, data,
expected_file, expected_argv0,
@@ -153,13 +152,15 @@ class CmdLineTest(unittest.TestCase):
print('Expected output: %r' % expected_msg)
self.assertIn(expected_msg.encode('utf-8'), err)
@unittest.expectedFailure # TODO: RUSTPYTHON
# TODO: RUSTPYTHON
@unittest.expectedFailure
def test_dash_c_loader(self):
rc, out, err = assert_python_ok("-c", "print(__loader__)")
expected = repr(importlib.machinery.BuiltinImporter).encode("utf-8")
self.assertIn(expected, out)
@unittest.expectedFailure # TODO: RUSTPYTHON
# TODO: RUSTPYTHON
@unittest.expectedFailure
def test_stdin_loader(self):
# Unfortunately, there's no way to automatically test the fully
# interactive REPL, since that code path only gets executed when
@@ -206,23 +207,21 @@ class CmdLineTest(unittest.TestCase):
stderr = p.stderr if separate_stderr else p.stdout
self.assertIn(b'Traceback ', stderr.readline())
self.assertIn(b'File "<stdin>"', stderr.readline())
self.assertIn(b'1/0', stderr.readline())
self.assertIn(b' ~^~', stderr.readline())
self.assertIn(b'ZeroDivisionError', stderr.readline())
@unittest.skip("TODO: RUSTPYTHON; test hang in middle")
@unittest.skip("TODO: RUSTPYTHON, test hang in middle")
def test_repl_stdout_flush(self):
self.check_repl_stdout_flush()
@unittest.skip("TODO: RUSTPYTHON; test hang in middle")
@unittest.skip("TODO: RUSTPYTHON, test hang in middle")
def test_repl_stdout_flush_separate_stderr(self):
self.check_repl_stdout_flush(True)
@unittest.skip("TODO: RUSTPYTHON; test hang in middle")
@unittest.skip("TODO: RUSTPYTHON, test hang in middle")
def test_repl_stderr_flush(self):
self.check_repl_stderr_flush()
@unittest.skip("TODO: RUSTPYTHON; test hang in middle")
@unittest.skip("TODO: RUSTPYTHON, test hang in middle")
def test_repl_stderr_flush_separate_stderr(self):
self.check_repl_stderr_flush(True)
@@ -234,7 +233,8 @@ class CmdLineTest(unittest.TestCase):
importlib.machinery.SourceFileLoader,
expected_cwd=script_dir)
@unittest.expectedFailure # TODO: RUSTPYTHON
# TODO: RUSTPYTHON
@unittest.expectedFailure
def test_script_abspath(self):
# pass the script using the relative path, expect the absolute path
# in __file__
@@ -390,7 +390,8 @@ class CmdLineTest(unittest.TestCase):
"be directly executed")
self._check_import_error(["-m", "test_pkg"], msg, cwd=script_dir)
@unittest.expectedFailure # TODO: RUSTPYTHON
# TODO: RUSTPYTHON
@unittest.expectedFailure
def test_issue8202(self):
# Make sure package __init__ modules see "-m" in sys.argv0 while
# searching for the module to execute
@@ -553,7 +554,7 @@ class CmdLineTest(unittest.TestCase):
script = textwrap.dedent("""\
try:
raise ValueError
except ValueError:
except:
raise NameError from None
""")
with os_helper.temp_dir() as script_dir:
@@ -561,23 +562,18 @@ class CmdLineTest(unittest.TestCase):
exitcode, stdout, stderr = assert_python_failure(script_name)
text = stderr.decode('ascii').split('\n')
self.assertEqual(len(text), 5)
self.assertStartsWith(text[0], 'Traceback')
self.assertStartsWith(text[1], ' File ')
self.assertStartsWith(text[3], 'NameError')
self.assertTrue(text[0].startswith('Traceback'))
self.assertTrue(text[1].startswith(' File '))
self.assertTrue(text[3].startswith('NameError'))
@unittest.expectedFailureIf(sys.platform in ("android", "linux"), "TODO: RUSTPYTHON")
@unittest.expectedFailureIf(sys.platform == "linux", "TODO: RUSTPYTHON")
def test_non_ascii(self):
# Apple platforms deny the creation of a file with an invalid UTF-8 name.
# Mac OS X denies the creation of a file with an invalid UTF-8 name.
# Windows allows creating a name with an arbitrary bytes name, but
# Python cannot a undecodable bytes argument to a subprocess.
# Emscripten/WASI does not permit invalid UTF-8 names.
if (
os_helper.TESTFN_UNDECODABLE
and sys.platform not in {
"win32", "emscripten", "wasi"
}
and not is_apple
):
# WASI does not permit invalid UTF-8 names.
if (os_helper.TESTFN_UNDECODABLE
and sys.platform not in ('win32', 'darwin', 'emscripten', 'wasi')):
name = os.fsdecode(os_helper.TESTFN_UNDECODABLE)
elif os_helper.TESTFN_NONASCII:
name = os_helper.TESTFN_NONASCII
@@ -645,7 +641,8 @@ class CmdLineTest(unittest.TestCase):
self.assertNotIn("\f", text)
self.assertIn("\n 1 + 1 = 2\n ^^^^^\n", text)
@unittest.expectedFailure # TODO: RUSTPYTHON
# TODO: RUSTPYTHON
@unittest.expectedFailure
def test_syntaxerror_multi_line_fstring(self):
script = 'foo = f"""{}\nfoo"""\n'
with os_helper.temp_dir() as script_dir:
@@ -660,7 +657,8 @@ class CmdLineTest(unittest.TestCase):
],
)
@unittest.expectedFailure # TODO: RUSTPYTHON
# TODO: RUSTPYTHON
@unittest.expectedFailure
def test_syntaxerror_invalid_escape_sequence_multi_line(self):
script = 'foo = """\\q"""\n'
with os_helper.temp_dir() as script_dir:
@@ -671,13 +669,13 @@ class CmdLineTest(unittest.TestCase):
self.assertEqual(
stderr.splitlines()[-3:],
[ b' foo = """\\q"""',
b' ^^',
b'SyntaxError: "\\q" is an invalid escape sequence. '
b'Did you mean "\\\\q"? A raw string is also an option.'
b' ^^^^^^^^',
b'SyntaxError: invalid escape sequence \'\\q\''
],
)
@unittest.expectedFailure # TODO: RUSTPYTHON
# TODO: RUSTPYTHON
@unittest.expectedFailure
def test_syntaxerror_null_bytes(self):
script = "x = '\0' nothing to see here\n';import os;os.system('echo pwnd')\n"
with os_helper.temp_dir() as script_dir:
@@ -690,7 +688,8 @@ class CmdLineTest(unittest.TestCase):
],
)
@unittest.expectedFailure # TODO: RUSTPYTHON
# TODO: RUSTPYTHON
@unittest.expectedFailure
def test_syntaxerror_null_bytes_in_multiline_string(self):
scripts = ["\n'''\nmultilinestring\0\n'''", "\nf'''\nmultilinestring\0\n'''"] # Both normal and f-strings
with os_helper.temp_dir() as script_dir:
@@ -704,26 +703,6 @@ class CmdLineTest(unittest.TestCase):
]
)
def test_source_lines_are_shown_when_running_source(self):
_, _, stderr = assert_python_failure("-c", "1/0")
expected_lines = [
b'Traceback (most recent call last):',
b' File "<string>", line 1, in <module>',
b' 1/0',
b' ~^~',
b'ZeroDivisionError: division by zero']
self.assertEqual(stderr.splitlines(), expected_lines)
def test_syntaxerror_does_not_crash(self):
script = "nonlocal x\n"
with os_helper.temp_dir() as script_dir:
script_name = _make_test_script(script_dir, 'script', script)
exitcode, stdout, stderr = assert_python_failure(script_name)
text = io.TextIOWrapper(io.BytesIO(stderr), 'ascii').read()
# It used to crash in https://github.com/python/cpython/issues/111132
self.assertEndsWith(text,
'SyntaxError: nonlocal declaration not allowed at module level\n')
def test_consistent_sys_path_for_direct_execution(self):
# This test case ensures that the following all give the same
# sys.path configuration:
@@ -792,7 +771,8 @@ class CmdLineTest(unittest.TestCase):
traceback_lines = stderr.decode().splitlines()
self.assertIn("No module named script_pkg", traceback_lines[-1])
@unittest.expectedFailure # TODO: RUSTPYTHON
# TODO: RUSTPYTHON
@unittest.expectedFailure
def test_nonexisting_script(self):
# bpo-34783: "./python script.py" must not crash
# if the script file doesn't exist.
@@ -808,11 +788,11 @@ class CmdLineTest(unittest.TestCase):
self.assertIn(": can't open file ", err)
self.assertNotEqual(proc.returncode, 0)
@unittest.skipIf(sys.platform.startswith("darwin"), "TODO: RUSTPYTHON; Problems with Mac os descriptor")
@unittest.skipUnless(os.path.exists('/dev/fd/0'), 'requires /dev/fd platform')
@unittest.skipIf(sys.platform.startswith("freebsd") and
os.stat("/dev").st_dev == os.stat("/dev/fd").st_dev,
"Requires fdescfs mounted on /dev/fd on FreeBSD")
@unittest.skipIf(sys.platform.startswith("darwin"), "TODO: RUSTPYTHON Problems with Mac os descriptor")
def test_script_as_dev_fd(self):
# GH-87235: On macOS passing a non-trivial script to /dev/fd/N can cause
# problems because all open /dev/fd/N file descriptors share the same

View File

@@ -1,234 +0,0 @@
"""Test suite for the cProfile module."""
import sys
import unittest
# rip off all interesting stuff from test_profile
try:
import cProfile
except ImportError:
# TODO: RUSTPYTHON; _lsprof not implemented
raise unittest.SkipTest('cProfile requires _lsprof')
import tempfile
import textwrap
from test.test_profile import ProfileTest, regenerate_expected_output
from test.support.script_helper import assert_python_failure, assert_python_ok
from test import support
class CProfileTest(ProfileTest):
profilerclass = cProfile.Profile
profilermodule = cProfile
expected_max_output = "{built-in method builtins.max}"
def get_expected_output(self):
return _ProfileOutput
def test_bad_counter_during_dealloc(self):
# bpo-3895
import _lsprof
with support.catch_unraisable_exception() as cm:
obj = _lsprof.Profiler(lambda: int)
obj.enable()
obj.disable()
obj.clear()
self.assertEqual(cm.unraisable.exc_type, TypeError)
def test_crash_with_not_enough_args(self):
# gh-126220
import _lsprof
for profile in [_lsprof.Profiler(), cProfile.Profile()]:
for method in [
"_pystart_callback",
"_pyreturn_callback",
"_ccall_callback",
"_creturn_callback",
]:
with self.subTest(profile=profile, method=method):
method_obj = getattr(profile, method)
with self.assertRaises(TypeError):
method_obj() # should not crash
def test_evil_external_timer(self):
# gh-120289
# Disabling profiler in external timer should not crash
import _lsprof
class EvilTimer():
def __init__(self, disable_count):
self.count = 0
self.disable_count = disable_count
def __call__(self):
self.count += 1
if self.count == self.disable_count:
profiler_with_evil_timer.disable()
return self.count
# this will trigger external timer to disable profiler at
# call event - in initContext in _lsprof.c
with support.catch_unraisable_exception() as cm:
profiler_with_evil_timer = _lsprof.Profiler(EvilTimer(1))
profiler_with_evil_timer.enable()
# Make a call to trigger timer
(lambda: None)()
profiler_with_evil_timer.disable()
profiler_with_evil_timer.clear()
self.assertEqual(cm.unraisable.exc_type, RuntimeError)
# this will trigger external timer to disable profiler at
# return event - in Stop in _lsprof.c
with support.catch_unraisable_exception() as cm:
profiler_with_evil_timer = _lsprof.Profiler(EvilTimer(2))
profiler_with_evil_timer.enable()
# Make a call to trigger timer
(lambda: None)()
profiler_with_evil_timer.disable()
profiler_with_evil_timer.clear()
self.assertEqual(cm.unraisable.exc_type, RuntimeError)
def test_profile_enable_disable(self):
prof = self.profilerclass()
# Make sure we clean ourselves up if the test fails for some reason.
self.addCleanup(prof.disable)
prof.enable()
self.assertEqual(
sys.monitoring.get_tool(sys.monitoring.PROFILER_ID), "cProfile")
prof.disable()
self.assertIs(sys.monitoring.get_tool(sys.monitoring.PROFILER_ID), None)
def test_profile_as_context_manager(self):
prof = self.profilerclass()
# Make sure we clean ourselves up if the test fails for some reason.
self.addCleanup(prof.disable)
with prof as __enter__return_value:
# profile.__enter__ should return itself.
self.assertIs(prof, __enter__return_value)
# profile should be set as the global profiler inside the
# with-block
self.assertEqual(
sys.monitoring.get_tool(sys.monitoring.PROFILER_ID), "cProfile")
# profile shouldn't be set once we leave the with-block.
self.assertIs(sys.monitoring.get_tool(sys.monitoring.PROFILER_ID), None)
def test_second_profiler(self):
pr = self.profilerclass()
pr2 = self.profilerclass()
pr.enable()
self.assertRaises(ValueError, pr2.enable)
pr.disable()
def test_throw(self):
"""
gh-106152
generator.throw() should trigger a call in cProfile
"""
def gen():
yield
pr = self.profilerclass()
pr.enable()
g = gen()
try:
g.throw(SyntaxError)
except SyntaxError:
pass
pr.disable()
pr.create_stats()
self.assertTrue(any("throw" in func[2] for func in pr.stats.keys())),
def test_bad_descriptor(self):
# gh-132250
# cProfile should not crash when the profiler callback fails to locate
# the actual function of a method.
with self.profilerclass() as prof:
with self.assertRaises(TypeError):
bytes.find(str())
class TestCommandLine(unittest.TestCase):
def test_sort(self):
rc, out, err = assert_python_failure('-m', 'cProfile', '-s', 'demo')
self.assertGreater(rc, 0)
self.assertIn(b"option -s: invalid choice: 'demo'", err)
def test_profile_script_importing_main(self):
"""Check that scripts that reference __main__ see their own namespace
when being profiled."""
with tempfile.NamedTemporaryFile("w+", delete_on_close=False) as f:
f.write(textwrap.dedent("""\
class Foo:
pass
import __main__
assert Foo == __main__.Foo
"""))
f.close()
assert_python_ok('-m', "cProfile", f.name)
def main():
if '-r' not in sys.argv:
unittest.main()
else:
regenerate_expected_output(__file__, CProfileTest)
# Don't remove this comment. Everything below it is auto-generated.
#--cut--------------------------------------------------------------------------
_ProfileOutput = {}
_ProfileOutput['print_stats'] = """\
28 0.028 0.001 0.028 0.001 profilee.py:110(__getattr__)
1 0.270 0.270 1.000 1.000 profilee.py:25(testfunc)
23/3 0.150 0.007 0.170 0.057 profilee.py:35(factorial)
20 0.020 0.001 0.020 0.001 profilee.py:48(mul)
2 0.040 0.020 0.600 0.300 profilee.py:55(helper)
4 0.116 0.029 0.120 0.030 profilee.py:73(helper1)
2 0.000 0.000 0.140 0.070 profilee.py:84(helper2_indirect)
8 0.312 0.039 0.400 0.050 profilee.py:88(helper2)
8 0.064 0.008 0.080 0.010 profilee.py:98(subhelper)"""
_ProfileOutput['print_callers'] = """\
profilee.py:110(__getattr__) <- 16 0.016 0.016 profilee.py:98(subhelper)
profilee.py:25(testfunc) <- 1 0.270 1.000 <string>:1(<module>)
profilee.py:35(factorial) <- 1 0.014 0.130 profilee.py:25(testfunc)
20/3 0.130 0.147 profilee.py:35(factorial)
2 0.006 0.040 profilee.py:84(helper2_indirect)
profilee.py:48(mul) <- 20 0.020 0.020 profilee.py:35(factorial)
profilee.py:55(helper) <- 2 0.040 0.600 profilee.py:25(testfunc)
profilee.py:73(helper1) <- 4 0.116 0.120 profilee.py:55(helper)
profilee.py:84(helper2_indirect) <- 2 0.000 0.140 profilee.py:55(helper)
profilee.py:88(helper2) <- 6 0.234 0.300 profilee.py:55(helper)
2 0.078 0.100 profilee.py:84(helper2_indirect)
profilee.py:98(subhelper) <- 8 0.064 0.080 profilee.py:88(helper2)
{built-in method builtins.hasattr} <- 4 0.000 0.004 profilee.py:73(helper1)
8 0.000 0.008 profilee.py:88(helper2)
{built-in method sys.exception} <- 4 0.000 0.000 profilee.py:73(helper1)
{method 'append' of 'list' objects} <- 4 0.000 0.000 profilee.py:73(helper1)"""
_ProfileOutput['print_callees'] = """\
<string>:1(<module>) -> 1 0.270 1.000 profilee.py:25(testfunc)
profilee.py:110(__getattr__) ->
profilee.py:25(testfunc) -> 1 0.014 0.130 profilee.py:35(factorial)
2 0.040 0.600 profilee.py:55(helper)
profilee.py:35(factorial) -> 20/3 0.130 0.147 profilee.py:35(factorial)
20 0.020 0.020 profilee.py:48(mul)
profilee.py:48(mul) ->
profilee.py:55(helper) -> 4 0.116 0.120 profilee.py:73(helper1)
2 0.000 0.140 profilee.py:84(helper2_indirect)
6 0.234 0.300 profilee.py:88(helper2)
profilee.py:73(helper1) -> 4 0.000 0.004 {built-in method builtins.hasattr}
profilee.py:84(helper2_indirect) -> 2 0.006 0.040 profilee.py:35(factorial)
2 0.078 0.100 profilee.py:88(helper2)
profilee.py:88(helper2) -> 8 0.064 0.080 profilee.py:98(subhelper)
profilee.py:98(subhelper) -> 16 0.016 0.016 profilee.py:110(__getattr__)
{built-in method builtins.hasattr} -> 12 0.012 0.012 profilee.py:110(__getattr__)"""
if __name__ == "__main__":
main()

2032
Lib/test/test_embed.py vendored

File diff suppressed because it is too large Load Diff

View File

@@ -21,8 +21,8 @@ if not support.has_subprocess_support:
raise unittest.SkipTest("test module requires subprocess")
# Test import all of the things we're about to try testing up front.
import _io # noqa: F401
import _pyio # noqa: F401
import _io
import _pyio
@unittest.skipUnless(os.name == 'posix', 'tests requires a posix system.')
class TestFileIOSignalInterrupt:
@@ -186,9 +186,10 @@ class TestFileIOSignalInterrupt:
class CTestFileIOSignalInterrupt(TestFileIOSignalInterrupt, unittest.TestCase):
modname = '_io'
@unittest.expectedFailure # TODO: RUSTPYTHON; - _io.FileIO.readall uses read_to_end which differs from _pyio.FileIO.readall
# TODO: RUSTPYTHON - _io.FileIO.readall uses read_to_end which differs from _pyio.FileIO.readall
@unittest.expectedFailure
def test_readall(self):
return super().test_readall()
super().test_readall()
class PyTestFileIOSignalInterrupt(TestFileIOSignalInterrupt, unittest.TestCase):
modname = '_pyio'

177
Lib/test/test_gc.py vendored
View File

@@ -7,7 +7,7 @@ from test.support import (verbose, refcount_test,
Py_GIL_DISABLED)
from test.support.import_helper import import_module
from test.support.os_helper import temp_dir, TESTFN, unlink
from test.support.script_helper import assert_python_ok, make_script
from test.support.script_helper import assert_python_ok, make_script, run_test_script
from test.support import threading_helper, gc_threshold
import gc
@@ -236,7 +236,8 @@ class GCTests(unittest.TestCase):
# is 3 because it includes f's code object.
self.assertIn(gc.collect(), (2, 3))
@unittest.expectedFailure # TODO: RUSTPYTHON; - weakref clear ordering differs from 3.15+
# TODO: RUSTPYTHON - weakref clear ordering differs from 3.15+
@unittest.expectedFailure
def test_function_tp_clear_leaves_consistent_state(self):
# https://github.com/python/cpython/issues/91636
code = """if 1:
@@ -310,6 +311,28 @@ class GCTests(unittest.TestCase):
self.assertRegex(stdout, rb"""\A\s*func=None""")
self.assertFalse(stderr)
# TODO: RUSTPYTHON - _datetime module not available
@unittest.expectedFailure
def test_datetime_weakref_cycle(self):
# https://github.com/python/cpython/issues/132413
# If the weakref used by the datetime extension gets cleared by the GC (due to being
# in an unreachable cycle) then datetime functions would crash (get_module_state()
# was returning a NULL pointer). This bug is fixed by clearing weakrefs without
# callbacks *after* running finalizers.
code = """if 1:
import _datetime
class C:
def __del__(self):
print('__del__ called')
_datetime.timedelta(days=1) # crash?
l = [C()]
l.append(l)
"""
rc, stdout, stderr = assert_python_ok("-c", code)
self.assertEqual(rc, 0)
self.assertEqual(stdout.strip(), b'__del__ called')
@refcount_test
def test_frame(self):
def f():
@@ -400,19 +423,11 @@ class GCTests(unittest.TestCase):
# each call to collect(N)
x = []
gc.collect(0)
# x is now in gen 1
# x is now in the old gen
a, b, c = gc.get_count()
gc.collect(1)
# x is now in gen 2
d, e, f = gc.get_count()
gc.collect(2)
# x is now in gen 3
g, h, i = gc.get_count()
# We don't check a, d, g since their exact values depends on
# We don't check a since its exact values depends on
# internal implementation details of the interpreter.
self.assertEqual((b, c), (1, 0))
self.assertEqual((e, f), (0, 1))
self.assertEqual((h, i), (0, 0))
def test_trashcan(self):
class Ouch:
@@ -667,9 +682,8 @@ class GCTests(unittest.TestCase):
gc.collect()
self.assertEqual(len(ouch), 2) # else the callbacks didn't run
for x in ouch:
# If the callback resurrected one of these guys, the instance
# would be damaged, with an empty __dict__.
self.assertEqual(x, None)
# The weakref should be cleared before executing the callback.
self.assertIsNone(x)
def test_bug21435(self):
# This is a poor test - its only virtue is that it happened to
@@ -831,17 +845,20 @@ class GCTests(unittest.TestCase):
rc, out, err = assert_python_ok(TESTFN)
self.assertEqual(out.strip(), b'__del__ called')
@unittest.expectedFailure # TODO: RUSTPYTHON
def test_get_stats(self):
stats = gc.get_stats()
self.assertEqual(len(stats), 3)
for st in stats:
self.assertIsInstance(st, dict)
self.assertEqual(set(st),
{"collected", "collections", "uncollectable"})
self.assertEqual(
set(st),
{"collected", "collections", "uncollectable", "candidates", "duration"}
)
self.assertGreaterEqual(st["collected"], 0)
self.assertGreaterEqual(st["collections"], 0)
self.assertGreaterEqual(st["uncollectable"], 0)
self.assertGreaterEqual(st["candidates"], 0)
self.assertGreaterEqual(st["duration"], 0)
# Check that collection counts are incremented correctly
if gc.isenabled():
self.addCleanup(gc.enable)
@@ -852,11 +869,25 @@ class GCTests(unittest.TestCase):
self.assertEqual(new[0]["collections"], old[0]["collections"] + 1)
self.assertEqual(new[1]["collections"], old[1]["collections"])
self.assertEqual(new[2]["collections"], old[2]["collections"])
self.assertGreater(new[0]["duration"], old[0]["duration"])
self.assertEqual(new[1]["duration"], old[1]["duration"])
self.assertEqual(new[2]["duration"], old[2]["duration"])
for stat in ["collected", "uncollectable", "candidates"]:
self.assertGreaterEqual(new[0][stat], old[0][stat])
self.assertEqual(new[1][stat], old[1][stat])
self.assertEqual(new[2][stat], old[2][stat])
gc.collect(2)
new = gc.get_stats()
self.assertEqual(new[0]["collections"], old[0]["collections"] + 1)
old, new = new, gc.get_stats()
self.assertEqual(new[0]["collections"], old[0]["collections"])
self.assertEqual(new[1]["collections"], old[1]["collections"])
self.assertEqual(new[2]["collections"], old[2]["collections"] + 1)
self.assertEqual(new[0]["duration"], old[0]["duration"])
self.assertEqual(new[1]["duration"], old[1]["duration"])
self.assertGreater(new[2]["duration"], old[2]["duration"])
for stat in ["collected", "uncollectable", "candidates"]:
self.assertEqual(new[0][stat], old[0][stat])
self.assertEqual(new[1][stat], old[1][stat])
self.assertGreaterEqual(new[2][stat], old[2][stat])
def test_freeze(self):
gc.freeze()
@@ -880,42 +911,10 @@ class GCTests(unittest.TestCase):
self.assertTrue(
any(l is element for element in gc.get_objects(generation=0))
)
self.assertFalse(
any(l is element for element in gc.get_objects(generation=1))
)
self.assertFalse(
any(l is element for element in gc.get_objects(generation=2))
)
gc.collect(generation=0)
gc.collect()
self.assertFalse(
any(l is element for element in gc.get_objects(generation=0))
)
self.assertTrue(
any(l is element for element in gc.get_objects(generation=1))
)
self.assertFalse(
any(l is element for element in gc.get_objects(generation=2))
)
gc.collect(generation=1)
self.assertFalse(
any(l is element for element in gc.get_objects(generation=0))
)
self.assertFalse(
any(l is element for element in gc.get_objects(generation=1))
)
self.assertTrue(
any(l is element for element in gc.get_objects(generation=2))
)
gc.collect(generation=2)
self.assertFalse(
any(l is element for element in gc.get_objects(generation=0))
)
self.assertFalse(
any(l is element for element in gc.get_objects(generation=1))
)
self.assertTrue(
any(l is element for element in gc.get_objects(generation=2))
)
del l
gc.collect()
@@ -1204,6 +1203,37 @@ class GCTests(unittest.TestCase):
""")
assert_python_ok("-c", source)
def test_do_not_cleanup_type_subclasses_before_finalization(self):
# See https://github.com/python/cpython/issues/135552
# If we cleanup weakrefs for tp_subclasses before calling
# the finalizer (__del__) then the line `fail = BaseNode.next.next`
# should fail because we are trying to access a subclass
# attribute. But subclass type cache was not properly invalidated.
code = """
class BaseNode:
def __del__(self):
BaseNode.next = BaseNode.next.next
fail = BaseNode.next.next
class Node(BaseNode):
pass
BaseNode.next = Node()
BaseNode.next.next = Node()
"""
# this test checks garbage collection while interp
# finalization
assert_python_ok("-c", textwrap.dedent(code))
code_inside_function = textwrap.dedent(F"""
def test():
{textwrap.indent(code, ' ')}
test()
""")
# this test checks regular garbage collection
assert_python_ok("-c", code_inside_function)
@unittest.skipUnless(Py_GIL_DISABLED, "requires free-threaded GC")
@unittest.skipIf(_testinternalcapi is None, "requires _testinternalcapi")
@@ -1222,14 +1252,16 @@ class GCTests(unittest.TestCase):
# Use n // 2 just in case some other objects were collected.
self.assertTrue(new_count - count > (n // 2))
@requires_gil_enabled('need generational GC')
class IncrementalGCTests(unittest.TestCase):
@unittest.skipIf(_testinternalcapi is None, "requires _testinternalcapi")
def test_heap_size(self):
count = _testinternalcapi.get_tracked_heap_size()
l = []
self.assertEqual(count + 1, _testinternalcapi.get_tracked_heap_size())
del l
self.assertEqual(count, _testinternalcapi.get_tracked_heap_size())
@requires_gil_enabled("Free threading does not support incremental GC")
def test_incremental_gc_handles_fast_cycle_creation(self):
# Run this test in a fresh process. The number of alive objects (which can
# be from unit tests run before this one) can influence how quickly cyclic
# garbage is found.
script = support.findfile("_test_gc_fast_cycles.py")
run_test_script(script)
class GCCallbackTests(unittest.TestCase):
@@ -1306,9 +1338,11 @@ class GCCallbackTests(unittest.TestCase):
# Check that we got the right info dict for all callbacks
for v in self.visit:
info = v[2]
self.assertTrue("generation" in info)
self.assertTrue("collected" in info)
self.assertTrue("uncollectable" in info)
self.assertIn("generation", info)
self.assertIn("collected", info)
self.assertIn("uncollectable", info)
self.assertIn("candidates", info)
self.assertIn("duration", info)
def test_collect_generation(self):
self.preclean()
@@ -1484,8 +1518,8 @@ class GCTogglingTests(unittest.TestCase):
assert not detector.gc_happened
while not detector.gc_happened:
i += 1
if i > 10000:
self.fail("gc didn't happen after 10000 iterations")
if i > 100000:
self.fail("gc didn't happen after 100000 iterations")
self.assertEqual(len(ouch), 0)
junk.append([]) # this will eventually trigger gc
@@ -1496,6 +1530,7 @@ class GCTogglingTests(unittest.TestCase):
self.assertEqual(x, None)
@gc_threshold(1000, 0, 0)
@unittest.skipIf(Py_GIL_DISABLED, "requires GC generations or increments")
def test_bug1055820d(self):
# Corresponds to temp2d.py in the bug report. This is very much like
# test_bug1055820c, but uses a __del__ method instead of a weakref
@@ -1556,8 +1591,8 @@ class GCTogglingTests(unittest.TestCase):
gc.collect()
while not detector.gc_happened:
i += 1
if i > 10000:
self.fail("gc didn't happen after 10000 iterations")
if i > 50000:
self.fail("gc didn't happen after 50000 iterations")
self.assertEqual(len(ouch), 0)
junk.append([]) # this will eventually trigger gc
@@ -1574,8 +1609,8 @@ class GCTogglingTests(unittest.TestCase):
detector = GC_Detector()
while not detector.gc_happened:
i += 1
if i > 10000:
self.fail("gc didn't happen after 10000 iterations")
if i > 100000:
self.fail("gc didn't happen after 100000 iterations")
junk.append([]) # this will eventually trigger gc
try:
@@ -1585,11 +1620,11 @@ class GCTogglingTests(unittest.TestCase):
detector = GC_Detector()
while not detector.gc_happened:
i += 1
if i > 10000:
if i > 100000:
break
junk.append([]) # this may eventually trigger gc (if it is enabled)
self.assertEqual(i, 10001)
self.assertEqual(i, 100001)
finally:
gc.enable()

View File

@@ -0,0 +1 @@

View File

@@ -0,0 +1 @@

View File

@@ -0,0 +1 @@

View File

@@ -1,415 +0,0 @@
#
# test_multibytecodec.py
# Unit test for multibytecodec itself
#
import codecs
import io
import sys
import textwrap
import unittest
try:
import _multibytecodec
except ImportError:
# TODO: RUSTPYTHON; _multibytecodec not implemented
raise unittest.SkipTest('_multibytecodec not available')
from test import support
from test.support import os_helper
from test.support.os_helper import TESTFN
from test.support.import_helper import import_module
ALL_CJKENCODINGS = [
# _codecs_cn
'gb2312', 'gbk', 'gb18030', 'hz',
# _codecs_hk
'big5hkscs',
# _codecs_jp
'cp932', 'shift_jis', 'euc_jp', 'euc_jisx0213', 'shift_jisx0213',
'euc_jis_2004', 'shift_jis_2004',
# _codecs_kr
'cp949', 'euc_kr', 'johab',
# _codecs_tw
'big5', 'cp950',
# _codecs_iso2022
'iso2022_jp', 'iso2022_jp_1', 'iso2022_jp_2', 'iso2022_jp_2004',
'iso2022_jp_3', 'iso2022_jp_ext', 'iso2022_kr',
]
class Test_MultibyteCodec(unittest.TestCase):
def test_nullcoding(self):
for enc in ALL_CJKENCODINGS:
self.assertEqual(b''.decode(enc), '')
self.assertEqual(str(b'', enc), '')
self.assertEqual(''.encode(enc), b'')
def test_str_decode(self):
for enc in ALL_CJKENCODINGS:
self.assertEqual('abcd'.encode(enc), b'abcd')
def test_errorcallback_longindex(self):
dec = codecs.getdecoder('euc-kr')
myreplace = lambda exc: ('', sys.maxsize+1)
codecs.register_error('test.cjktest', myreplace)
self.assertRaises(IndexError, dec,
b'apple\x92ham\x93spam', 'test.cjktest')
def test_errorcallback_custom_ignore(self):
# Issue #23215: MemoryError with custom error handlers and multibyte codecs
data = 100 * "\udc00"
codecs.register_error("test.ignore", codecs.ignore_errors)
for enc in ALL_CJKENCODINGS:
self.assertEqual(data.encode(enc, "test.ignore"), b'')
def test_codingspec(self):
try:
for enc in ALL_CJKENCODINGS:
code = '# coding: {}\n'.format(enc)
exec(code)
finally:
os_helper.unlink(TESTFN)
def test_init_segfault(self):
# bug #3305: this used to segfault
self.assertRaises(AttributeError,
_multibytecodec.MultibyteStreamReader, None)
self.assertRaises(AttributeError,
_multibytecodec.MultibyteStreamWriter, None)
def test_decode_unicode(self):
# Trying to decode a unicode string should raise a TypeError
for enc in ALL_CJKENCODINGS:
self.assertRaises(TypeError, codecs.getdecoder(enc), "")
class Test_IncrementalEncoder(unittest.TestCase):
def test_stateless(self):
# cp949 encoder isn't stateful at all.
encoder = codecs.getincrementalencoder('cp949')()
self.assertEqual(encoder.encode('\ud30c\uc774\uc36c \ub9c8\uc744'),
b'\xc6\xc4\xc0\xcc\xbd\xe3 \xb8\xb6\xc0\xbb')
self.assertEqual(encoder.reset(), None)
self.assertEqual(encoder.encode('\u2606\u223c\u2606', True),
b'\xa1\xd9\xa1\xad\xa1\xd9')
self.assertEqual(encoder.reset(), None)
self.assertEqual(encoder.encode('', True), b'')
self.assertEqual(encoder.encode('', False), b'')
self.assertEqual(encoder.reset(), None)
def test_stateful(self):
# jisx0213 encoder is stateful for a few code points. eg)
# U+00E6 => A9DC
# U+00E6 U+0300 => ABC4
# U+0300 => ABDC
encoder = codecs.getincrementalencoder('jisx0213')()
self.assertEqual(encoder.encode('\u00e6\u0300'), b'\xab\xc4')
self.assertEqual(encoder.encode('\u00e6'), b'')
self.assertEqual(encoder.encode('\u0300'), b'\xab\xc4')
self.assertEqual(encoder.encode('\u00e6', True), b'\xa9\xdc')
self.assertEqual(encoder.reset(), None)
self.assertEqual(encoder.encode('\u0300'), b'\xab\xdc')
self.assertEqual(encoder.encode('\u00e6'), b'')
self.assertEqual(encoder.encode('', True), b'\xa9\xdc')
self.assertEqual(encoder.encode('', True), b'')
def test_stateful_keep_buffer(self):
encoder = codecs.getincrementalencoder('jisx0213')()
self.assertEqual(encoder.encode('\u00e6'), b'')
self.assertRaises(UnicodeEncodeError, encoder.encode, '\u0123')
self.assertEqual(encoder.encode('\u0300\u00e6'), b'\xab\xc4')
self.assertRaises(UnicodeEncodeError, encoder.encode, '\u0123')
self.assertEqual(encoder.reset(), None)
self.assertEqual(encoder.encode('\u0300'), b'\xab\xdc')
self.assertEqual(encoder.encode('\u00e6'), b'')
self.assertRaises(UnicodeEncodeError, encoder.encode, '\u0123')
self.assertEqual(encoder.encode('', True), b'\xa9\xdc')
def test_state_methods_with_buffer_state(self):
# euc_jis_2004 stores state as a buffer of pending bytes
encoder = codecs.getincrementalencoder('euc_jis_2004')()
initial_state = encoder.getstate()
self.assertEqual(encoder.encode('\u00e6\u0300'), b'\xab\xc4')
encoder.setstate(initial_state)
self.assertEqual(encoder.encode('\u00e6\u0300'), b'\xab\xc4')
self.assertEqual(encoder.encode('\u00e6'), b'')
partial_state = encoder.getstate()
self.assertEqual(encoder.encode('\u0300'), b'\xab\xc4')
encoder.setstate(partial_state)
self.assertEqual(encoder.encode('\u0300'), b'\xab\xc4')
def test_state_methods_with_non_buffer_state(self):
# iso2022_jp stores state without using a buffer
encoder = codecs.getincrementalencoder('iso2022_jp')()
self.assertEqual(encoder.encode('z'), b'z')
en_state = encoder.getstate()
self.assertEqual(encoder.encode('\u3042'), b'\x1b\x24\x42\x24\x22')
jp_state = encoder.getstate()
self.assertEqual(encoder.encode('z'), b'\x1b\x28\x42z')
encoder.setstate(jp_state)
self.assertEqual(encoder.encode('\u3042'), b'\x24\x22')
encoder.setstate(en_state)
self.assertEqual(encoder.encode('z'), b'z')
def test_getstate_returns_expected_value(self):
# Note: getstate is implemented such that these state values
# are expected to be the same across all builds of Python,
# regardless of x32/64 bit, endianness and compiler.
# euc_jis_2004 stores state as a buffer of pending bytes
buffer_state_encoder = codecs.getincrementalencoder('euc_jis_2004')()
self.assertEqual(buffer_state_encoder.getstate(), 0)
buffer_state_encoder.encode('\u00e6')
self.assertEqual(buffer_state_encoder.getstate(),
int.from_bytes(
b"\x02"
b"\xc3\xa6"
b"\x00\x00\x00\x00\x00\x00\x00\x00",
'little'))
buffer_state_encoder.encode('\u0300')
self.assertEqual(buffer_state_encoder.getstate(), 0)
# iso2022_jp stores state without using a buffer
non_buffer_state_encoder = codecs.getincrementalencoder('iso2022_jp')()
self.assertEqual(non_buffer_state_encoder.getstate(),
int.from_bytes(
b"\x00"
b"\x42\x42\x00\x00\x00\x00\x00\x00",
'little'))
non_buffer_state_encoder.encode('\u3042')
self.assertEqual(non_buffer_state_encoder.getstate(),
int.from_bytes(
b"\x00"
b"\xc2\x42\x00\x00\x00\x00\x00\x00",
'little'))
def test_setstate_validates_input_size(self):
encoder = codecs.getincrementalencoder('euc_jp')()
pending_size_nine = int.from_bytes(
b"\x09"
b"\x00\x00\x00\x00\x00\x00\x00\x00"
b"\x00\x00\x00\x00\x00\x00\x00\x00",
'little')
self.assertRaises(UnicodeError, encoder.setstate, pending_size_nine)
def test_setstate_validates_input_bytes(self):
encoder = codecs.getincrementalencoder('euc_jp')()
invalid_utf8 = int.from_bytes(
b"\x01"
b"\xff"
b"\x00\x00\x00\x00\x00\x00\x00\x00",
'little')
self.assertRaises(UnicodeDecodeError, encoder.setstate, invalid_utf8)
def test_issue5640(self):
encoder = codecs.getincrementalencoder('shift-jis')('backslashreplace')
self.assertEqual(encoder.encode('\xff'), b'\\xff')
self.assertEqual(encoder.encode('\n'), b'\n')
@support.cpython_only
def test_subinterp(self):
# bpo-42846: Test a CJK codec in a subinterpreter
_testcapi = import_module("_testcapi")
encoding = 'cp932'
text = "Python の開発は、1990 年ごろから開始されています。"
code = textwrap.dedent("""
import codecs
encoding = %r
text = %r
encoder = codecs.getincrementalencoder(encoding)()
text2 = encoder.encode(text).decode(encoding)
if text2 != text:
raise ValueError(f"encoding issue: {text2!a} != {text!a}")
""") % (encoding, text)
res = _testcapi.run_in_subinterp(code)
self.assertEqual(res, 0)
class Test_IncrementalDecoder(unittest.TestCase):
def test_dbcs(self):
# cp949 decoder is simple with only 1 or 2 bytes sequences.
decoder = codecs.getincrementaldecoder('cp949')()
self.assertEqual(decoder.decode(b'\xc6\xc4\xc0\xcc\xbd'),
'\ud30c\uc774')
self.assertEqual(decoder.decode(b'\xe3 \xb8\xb6\xc0\xbb'),
'\uc36c \ub9c8\uc744')
self.assertEqual(decoder.decode(b''), '')
def test_dbcs_keep_buffer(self):
decoder = codecs.getincrementaldecoder('cp949')()
self.assertEqual(decoder.decode(b'\xc6\xc4\xc0'), '\ud30c')
self.assertRaises(UnicodeDecodeError, decoder.decode, b'', True)
self.assertEqual(decoder.decode(b'\xcc'), '\uc774')
self.assertEqual(decoder.decode(b'\xc6\xc4\xc0'), '\ud30c')
self.assertRaises(UnicodeDecodeError, decoder.decode,
b'\xcc\xbd', True)
self.assertEqual(decoder.decode(b'\xcc'), '\uc774')
def test_iso2022(self):
decoder = codecs.getincrementaldecoder('iso2022-jp')()
ESC = b'\x1b'
self.assertEqual(decoder.decode(ESC + b'('), '')
self.assertEqual(decoder.decode(b'B', True), '')
self.assertEqual(decoder.decode(ESC + b'$'), '')
self.assertEqual(decoder.decode(b'B@$'), '\u4e16')
self.assertEqual(decoder.decode(b'@$@'), '\u4e16')
self.assertEqual(decoder.decode(b'$', True), '\u4e16')
self.assertEqual(decoder.reset(), None)
self.assertEqual(decoder.decode(b'@$'), '@$')
self.assertEqual(decoder.decode(ESC + b'$'), '')
self.assertRaises(UnicodeDecodeError, decoder.decode, b'', True)
self.assertEqual(decoder.decode(b'B@$'), '\u4e16')
def test_decode_unicode(self):
# Trying to decode a unicode string should raise a TypeError
for enc in ALL_CJKENCODINGS:
decoder = codecs.getincrementaldecoder(enc)()
self.assertRaises(TypeError, decoder.decode, "")
def test_state_methods(self):
decoder = codecs.getincrementaldecoder('euc_jp')()
# Decode a complete input sequence
self.assertEqual(decoder.decode(b'\xa4\xa6'), '\u3046')
pending1, _ = decoder.getstate()
self.assertEqual(pending1, b'')
# Decode first half of a partial input sequence
self.assertEqual(decoder.decode(b'\xa4'), '')
pending2, flags2 = decoder.getstate()
self.assertEqual(pending2, b'\xa4')
# Decode second half of a partial input sequence
self.assertEqual(decoder.decode(b'\xa6'), '\u3046')
pending3, _ = decoder.getstate()
self.assertEqual(pending3, b'')
# Jump back and decode second half of partial input sequence again
decoder.setstate((pending2, flags2))
self.assertEqual(decoder.decode(b'\xa6'), '\u3046')
pending4, _ = decoder.getstate()
self.assertEqual(pending4, b'')
# Ensure state values are preserved correctly
decoder.setstate((b'abc', 123456789))
self.assertEqual(decoder.getstate(), (b'abc', 123456789))
def test_setstate_validates_input(self):
decoder = codecs.getincrementaldecoder('euc_jp')()
self.assertRaises(TypeError, decoder.setstate, 123)
self.assertRaises(TypeError, decoder.setstate, ("invalid", 0))
self.assertRaises(TypeError, decoder.setstate, (b"1234", "invalid"))
self.assertRaises(UnicodeDecodeError, decoder.setstate, (b"123456789", 0))
class Test_StreamReader(unittest.TestCase):
def test_bug1728403(self):
try:
f = open(TESTFN, 'wb')
try:
f.write(b'\xa1')
finally:
f.close()
with self.assertWarns(DeprecationWarning):
f = codecs.open(TESTFN, encoding='cp949')
try:
self.assertRaises(UnicodeDecodeError, f.read, 2)
finally:
f.close()
finally:
os_helper.unlink(TESTFN)
class Test_StreamWriter(unittest.TestCase):
def test_gb18030(self):
s= io.BytesIO()
c = codecs.getwriter('gb18030')(s)
c.write('123')
self.assertEqual(s.getvalue(), b'123')
c.write('\U00012345')
self.assertEqual(s.getvalue(), b'123\x907\x959')
c.write('\uac00\u00ac')
self.assertEqual(s.getvalue(),
b'123\x907\x959\x827\xcf5\x810\x851')
def test_utf_8(self):
s= io.BytesIO()
c = codecs.getwriter('utf-8')(s)
c.write('123')
self.assertEqual(s.getvalue(), b'123')
c.write('\U00012345')
self.assertEqual(s.getvalue(), b'123\xf0\x92\x8d\x85')
c.write('\uac00\u00ac')
self.assertEqual(s.getvalue(),
b'123\xf0\x92\x8d\x85'
b'\xea\xb0\x80\xc2\xac')
def test_streamwriter_strwrite(self):
s = io.BytesIO()
wr = codecs.getwriter('gb18030')(s)
wr.write('abcd')
self.assertEqual(s.getvalue(), b'abcd')
class Test_ISO2022(unittest.TestCase):
def test_g2(self):
iso2022jp2 = b'\x1b(B:hu4:unit\x1b.A\x1bNi de famille'
uni = ':hu4:unit\xe9 de famille'
self.assertEqual(iso2022jp2.decode('iso2022-jp-2'), uni)
def test_iso2022_jp_g0(self):
self.assertNotIn(b'\x0e', '\N{SOFT HYPHEN}'.encode('iso-2022-jp-2'))
for encoding in ('iso-2022-jp-2004', 'iso-2022-jp-3'):
e = '\u3406'.encode(encoding)
self.assertFalse(any(x > 0x80 for x in e))
@support.requires_resource('cpu')
def test_bug1572832(self):
for x in range(0x10000, 0x110000):
# Any ISO 2022 codec will cause the segfault
chr(x).encode('iso_2022_jp', 'ignore')
class TestStateful(unittest.TestCase):
text = '\u4E16\u4E16'
encoding = 'iso-2022-jp'
expected = b'\x1b$B@$@$'
reset = b'\x1b(B'
expected_reset = expected + reset
def test_encode(self):
self.assertEqual(self.text.encode(self.encoding), self.expected_reset)
def test_incrementalencoder(self):
encoder = codecs.getincrementalencoder(self.encoding)()
output = b''.join(
encoder.encode(char)
for char in self.text)
self.assertEqual(output, self.expected)
self.assertEqual(encoder.encode('', final=True), self.reset)
self.assertEqual(encoder.encode('', final=True), b'')
def test_incrementalencoder_final(self):
encoder = codecs.getincrementalencoder(self.encoding)()
last_index = len(self.text) - 1
output = b''.join(
encoder.encode(char, index == last_index)
for index, char in enumerate(self.text))
self.assertEqual(output, self.expected_reset)
self.assertEqual(encoder.encode('', final=True), b'')
class TestHZStateful(TestStateful):
text = '\u804a\u804a'
encoding = 'hz'
expected = b'~{ADAD'
reset = b'~}'
expected_reset = expected + reset
if __name__ == "__main__":
unittest.main()

View File

@@ -1,223 +0,0 @@
"""Test suite for the profile module."""
import sys
import pstats
import unittest
import os
from difflib import unified_diff
from io import StringIO
from test.support.os_helper import TESTFN, unlink, temp_dir, change_cwd
from contextlib import contextmanager, redirect_stdout
import profile
from test.profilee import testfunc, timer
from test.support.script_helper import assert_python_failure, assert_python_ok
class ProfileTest(unittest.TestCase):
profilerclass = profile.Profile
profilermodule = profile
methodnames = ['print_stats', 'print_callers', 'print_callees']
expected_max_output = ':0(max)'
def tearDown(self):
unlink(TESTFN)
def get_expected_output(self):
return _ProfileOutput
@classmethod
def do_profiling(cls):
results = []
prof = cls.profilerclass(timer, 0.001)
start_timer = timer()
prof.runctx("testfunc()", globals(), locals())
results.append(timer() - start_timer)
for methodname in cls.methodnames:
s = StringIO()
stats = pstats.Stats(prof, stream=s)
stats.strip_dirs().sort_stats("stdname")
getattr(stats, methodname)()
output = s.getvalue().splitlines()
mod_name = testfunc.__module__.rsplit('.', 1)[1]
# Only compare against stats originating from the test file.
# Prevents outside code (e.g., the io module) from causing
# unexpected output.
output = [line.rstrip() for line in output if mod_name in line]
results.append('\n'.join(output))
return results
@unittest.expectedFailure # TODO: RUSTPYTHON; print_callees output differs from CPython
def test_cprofile(self):
results = self.do_profiling()
expected = self.get_expected_output()
self.assertEqual(results[0], 1000)
fail = []
for i, method in enumerate(self.methodnames):
a = expected[method]
b = results[i+1]
if a != b:
fail.append(f"\nStats.{method} output for "
f"{self.profilerclass.__name__} "
"does not fit expectation:")
fail.extend(unified_diff(a.split('\n'), b.split('\n'),
lineterm=""))
if fail:
self.fail("\n".join(fail))
def test_calling_conventions(self):
# Issue #5330: profile and cProfile wouldn't report C functions called
# with keyword arguments. We test all calling conventions.
stmts = [
"max([0])",
"max([0], key=int)",
"max([0], **dict(key=int))",
"max(*([0],))",
"max(*([0],), key=int)",
"max(*([0],), **dict(key=int))",
]
for stmt in stmts:
s = StringIO()
prof = self.profilerclass(timer, 0.001)
prof.runctx(stmt, globals(), locals())
stats = pstats.Stats(prof, stream=s)
stats.print_stats()
res = s.getvalue()
self.assertIn(self.expected_max_output, res,
"Profiling {0!r} didn't report max:\n{1}".format(stmt, res))
def test_run(self):
with silent():
self.profilermodule.run("int('1')")
self.profilermodule.run("int('1')", filename=TESTFN)
self.assertTrue(os.path.exists(TESTFN))
def test_run_with_sort_by_values(self):
with redirect_stdout(StringIO()) as f:
self.profilermodule.run("int('1')", sort=('tottime', 'stdname'))
self.assertIn("Ordered by: internal time, standard name", f.getvalue())
def test_runctx(self):
with silent():
self.profilermodule.runctx("testfunc()", globals(), locals())
self.profilermodule.runctx("testfunc()", globals(), locals(),
filename=TESTFN)
self.assertTrue(os.path.exists(TESTFN))
def test_run_profile_as_module(self):
# Test that -m switch needs an argument
assert_python_failure('-m', self.profilermodule.__name__, '-m')
# Test failure for not-existent module
assert_python_failure('-m', self.profilermodule.__name__,
'-m', 'random_module_xyz')
# Test successful run
assert_python_ok('-m', self.profilermodule.__name__,
'-m', 'timeit', '-n', '1')
def test_output_file_when_changing_directory(self):
with temp_dir() as tmpdir, change_cwd(tmpdir):
os.mkdir('dest')
with open('demo.py', 'w', encoding="utf-8") as f:
f.write('import os; os.chdir("dest")')
assert_python_ok(
'-m', self.profilermodule.__name__,
'-o', 'out.pstats',
'demo.py',
)
self.assertTrue(os.path.exists('out.pstats'))
def regenerate_expected_output(filename, cls):
filename = filename.rstrip('co')
print('Regenerating %s...' % filename)
results = cls.do_profiling()
newfile = []
with open(filename, 'r') as f:
for line in f:
newfile.append(line)
if line.startswith('#--cut'):
break
with open(filename, 'w') as f:
f.writelines(newfile)
f.write("_ProfileOutput = {}\n")
for i, method in enumerate(cls.methodnames):
f.write('_ProfileOutput[%r] = """\\\n%s"""\n' % (
method, results[i+1]))
f.write('\nif __name__ == "__main__":\n main()\n')
@contextmanager
def silent():
stdout = sys.stdout
try:
sys.stdout = StringIO()
yield
finally:
sys.stdout = stdout
def main():
if '-r' not in sys.argv:
unittest.main()
else:
regenerate_expected_output(__file__, ProfileTest)
# Don't remove this comment. Everything below it is auto-generated.
#--cut--------------------------------------------------------------------------
_ProfileOutput = {}
_ProfileOutput['print_stats'] = """\
28 27.972 0.999 27.972 0.999 profilee.py:110(__getattr__)
1 269.996 269.996 999.769 999.769 profilee.py:25(testfunc)
23/3 149.937 6.519 169.917 56.639 profilee.py:35(factorial)
20 19.980 0.999 19.980 0.999 profilee.py:48(mul)
2 39.986 19.993 599.830 299.915 profilee.py:55(helper)
4 115.984 28.996 119.964 29.991 profilee.py:73(helper1)
2 -0.006 -0.003 139.946 69.973 profilee.py:84(helper2_indirect)
8 311.976 38.997 399.912 49.989 profilee.py:88(helper2)
8 63.976 7.997 79.960 9.995 profilee.py:98(subhelper)"""
_ProfileOutput['print_callers'] = """\
:0(append) <- profilee.py:73(helper1)(4) 119.964
:0(exception) <- profilee.py:73(helper1)(4) 119.964
:0(hasattr) <- profilee.py:73(helper1)(4) 119.964
profilee.py:88(helper2)(8) 399.912
profilee.py:110(__getattr__) <- :0(hasattr)(12) 11.964
profilee.py:98(subhelper)(16) 79.960
profilee.py:25(testfunc) <- <string>:1(<module>)(1) 999.767
profilee.py:35(factorial) <- profilee.py:25(testfunc)(1) 999.769
profilee.py:35(factorial)(20) 169.917
profilee.py:84(helper2_indirect)(2) 139.946
profilee.py:48(mul) <- profilee.py:35(factorial)(20) 169.917
profilee.py:55(helper) <- profilee.py:25(testfunc)(2) 999.769
profilee.py:73(helper1) <- profilee.py:55(helper)(4) 599.830
profilee.py:84(helper2_indirect) <- profilee.py:55(helper)(2) 599.830
profilee.py:88(helper2) <- profilee.py:55(helper)(6) 599.830
profilee.py:84(helper2_indirect)(2) 139.946
profilee.py:98(subhelper) <- profilee.py:88(helper2)(8) 399.912"""
_ProfileOutput['print_callees'] = """\
:0(hasattr) -> profilee.py:110(__getattr__)(12) 27.972
<string>:1(<module>) -> profilee.py:25(testfunc)(1) 999.769
profilee.py:110(__getattr__) ->
profilee.py:25(testfunc) -> profilee.py:35(factorial)(1) 169.917
profilee.py:55(helper)(2) 599.830
profilee.py:35(factorial) -> profilee.py:35(factorial)(20) 169.917
profilee.py:48(mul)(20) 19.980
profilee.py:48(mul) ->
profilee.py:55(helper) -> profilee.py:73(helper1)(4) 119.964
profilee.py:84(helper2_indirect)(2) 139.946
profilee.py:88(helper2)(6) 399.912
profilee.py:73(helper1) -> :0(append)(4) -0.004
profilee.py:84(helper2_indirect) -> profilee.py:35(factorial)(2) 169.917
profilee.py:88(helper2)(2) 399.912
profilee.py:88(helper2) -> :0(hasattr)(8) 11.964
profilee.py:98(subhelper)(8) 79.960
profilee.py:98(subhelper) -> profilee.py:110(__getattr__)(16) 27.972"""
if __name__ == "__main__":
main()

View File

@@ -1,164 +0,0 @@
import unittest
from test import support
from test.support.import_helper import ensure_lazy_imports
from io import StringIO
from pstats import SortKey
from enum import StrEnum, _test_simple_enum
import os
import pstats
import tempfile
try:
import cProfile # XXX: RUSTPYTHON
except ImportError:
cProfile = None
class LazyImportTest(unittest.TestCase):
@support.cpython_only
def test_lazy_import(self):
ensure_lazy_imports("pstats", {"typing"})
class AddCallersTestCase(unittest.TestCase):
"""Tests for pstats.add_callers helper."""
def test_combine_results(self):
# pstats.add_callers should combine the call results of both target
# and source by adding the call time. See issue1269.
# new format: used by the cProfile module
target = {"a": (1, 2, 3, 4)}
source = {"a": (1, 2, 3, 4), "b": (5, 6, 7, 8)}
new_callers = pstats.add_callers(target, source)
self.assertEqual(new_callers, {'a': (2, 4, 6, 8), 'b': (5, 6, 7, 8)})
# old format: used by the profile module
target = {"a": 1}
source = {"a": 1, "b": 5}
new_callers = pstats.add_callers(target, source)
self.assertEqual(new_callers, {'a': 2, 'b': 5})
class StatsTestCase(unittest.TestCase):
def setUp(self):
stats_file = support.findfile('pstats.pck')
self.stats = pstats.Stats(stats_file)
def test_add(self):
stream = StringIO()
stats = pstats.Stats(stream=stream)
stats.add(self.stats, self.stats)
def test_dump_and_load_works_correctly(self):
temp_storage_new = tempfile.NamedTemporaryFile(delete=False)
try:
self.stats.dump_stats(filename=temp_storage_new.name)
tmp_stats = pstats.Stats(temp_storage_new.name)
self.assertEqual(self.stats.stats, tmp_stats.stats)
finally:
temp_storage_new.close()
os.remove(temp_storage_new.name)
@unittest.skipUnless(cProfile, 'TODO: RUSTPYTHON; _lsprof not implemented')
def test_load_equivalent_to_init(self):
stats = pstats.Stats()
self.temp_storage = tempfile.NamedTemporaryFile(delete=False)
try:
cProfile.run('import os', filename=self.temp_storage.name)
stats.load_stats(self.temp_storage.name)
created = pstats.Stats(self.temp_storage.name)
self.assertEqual(stats.stats, created.stats)
finally:
self.temp_storage.close()
os.remove(self.temp_storage.name)
def test_loading_wrong_types(self):
stats = pstats.Stats()
with self.assertRaises(TypeError):
stats.load_stats(42)
def test_sort_stats_int(self):
valid_args = {-1: 'stdname',
0: 'calls',
1: 'time',
2: 'cumulative'}
for arg_int, arg_str in valid_args.items():
self.stats.sort_stats(arg_int)
self.assertEqual(self.stats.sort_type,
self.stats.sort_arg_dict_default[arg_str][-1])
def test_sort_stats_string(self):
for sort_name in ['calls', 'ncalls', 'cumtime', 'cumulative',
'filename', 'line', 'module', 'name', 'nfl', 'pcalls',
'stdname', 'time', 'tottime']:
self.stats.sort_stats(sort_name)
self.assertEqual(self.stats.sort_type,
self.stats.sort_arg_dict_default[sort_name][-1])
def test_sort_stats_partial(self):
sortkey = 'filename'
for sort_name in ['f', 'fi', 'fil', 'file', 'filen', 'filena',
'filenam', 'filename']:
self.stats.sort_stats(sort_name)
self.assertEqual(self.stats.sort_type,
self.stats.sort_arg_dict_default[sortkey][-1])
def test_sort_stats_enum(self):
for member in SortKey:
self.stats.sort_stats(member)
self.assertEqual(
self.stats.sort_type,
self.stats.sort_arg_dict_default[member.value][-1])
class CheckedSortKey(StrEnum):
CALLS = 'calls', 'ncalls'
CUMULATIVE = 'cumulative', 'cumtime'
FILENAME = 'filename', 'module'
LINE = 'line'
NAME = 'name'
NFL = 'nfl'
PCALLS = 'pcalls'
STDNAME = 'stdname'
TIME = 'time', 'tottime'
def __new__(cls, *values):
value = values[0]
obj = str.__new__(cls, value)
obj._value_ = value
for other_value in values[1:]:
cls._value2member_map_[other_value] = obj
obj._all_values = values
return obj
_test_simple_enum(CheckedSortKey, SortKey)
def test_sort_starts_mix(self):
self.assertRaises(TypeError, self.stats.sort_stats,
'calls',
SortKey.TIME)
self.assertRaises(TypeError, self.stats.sort_stats,
SortKey.TIME,
'calls')
@unittest.skipUnless(cProfile, 'TODO: RUSTPYTHON; _lsprof not implemented')
def test_get_stats_profile(self):
def pass1(): pass
def pass2(): pass
def pass3(): pass
pr = cProfile.Profile()
pr.enable()
pass1()
pass2()
pass3()
pr.create_stats()
ps = pstats.Stats(pr)
stats_profile = ps.get_stats_profile()
funcs_called = set(stats_profile.func_profiles.keys())
self.assertIn('pass1', funcs_called)
self.assertIn('pass2', funcs_called)
self.assertIn('pass3', funcs_called)
def test_SortKey_enum(self):
self.assertEqual(SortKey.FILENAME, 'filename')
self.assertNotEqual(SortKey.FILENAME, SortKey.CALLS)
if __name__ == "__main__":
unittest.main()

View File

@@ -3,7 +3,7 @@
import subprocess
import sys
import os
from test.support import script_helper, requires_subprocess
from test.support import script_helper
import unittest
from unittest import mock
@@ -69,12 +69,12 @@ class TestScriptHelper(unittest.TestCase):
self.assertNotIn('-E', popen_command)
@requires_subprocess()
class TestScriptHelperEnvironment(unittest.TestCase):
"""Code coverage for interpreter_requires_environment()."""
def setUp(self):
self.assertHasAttr(script_helper, '__cached_interp_requires_environment')
self.assertTrue(
hasattr(script_helper, '__cached_interp_requires_environment'))
# Reset the private cached state.
script_helper.__dict__['__cached_interp_requires_environment'] = None

File diff suppressed because it is too large Load Diff

39
Lib/test/test_ucn.py vendored
View File

@@ -88,9 +88,6 @@ class UnicodeNamesTest(unittest.TestCase):
self.checkletter("HANGUL SYLLABLE HWEOK", "\ud6f8")
self.checkletter("HANGUL SYLLABLE HIH", "\ud7a3")
self.checkletter("haNGul SYllABle WAe", '\uc65c')
self.checkletter("HAngUL syLLabLE waE", '\uc65c')
self.assertRaises(ValueError, unicodedata.name, "\ud7a4")
def test_cjk_unified_ideographs(self):
@@ -106,36 +103,6 @@ class UnicodeNamesTest(unittest.TestCase):
self.checkletter("CJK UNIFIED IDEOGRAPH-2B81D", "\U0002B81D")
self.checkletter("CJK UNIFIED IDEOGRAPH-3134A", "\U0003134A")
self.checkletter("cjK UniFIeD idEogRAph-3aBc", "\u3abc")
self.checkletter("CJk uNIfiEd IDeOGraPH-3AbC", "\u3abc")
self.checkletter("cjK UniFIeD idEogRAph-2aBcD", "\U0002abcd")
self.checkletter("CJk uNIfiEd IDeOGraPH-2AbCd", "\U0002abcd")
@unittest.expectedFailure # TODO: RUSTPYTHON; SyntaxError: got unexpected unicode
def test_tangut_ideographs(self):
self.checkletter("TANGUT IDEOGRAPH-17000", "\U00017000")
self.checkletter("TANGUT IDEOGRAPH-187F7", "\U000187f7")
self.checkletter("TANGUT IDEOGRAPH-18D00", "\U00018D00")
self.checkletter("TANGUT IDEOGRAPH-18D08", "\U00018d08")
self.checkletter("tangut ideograph-18d08", "\U00018d08")
def test_egyptian_hieroglyphs(self):
self.checkletter("EGYPTIAN HIEROGLYPH-13460", "\U00013460")
self.checkletter("EGYPTIAN HIEROGLYPH-143FA", "\U000143fa")
self.checkletter("egyptian hieroglyph-143fa", "\U000143fa")
def test_khitan_small_script_characters(self):
self.checkletter("KHITAN SMALL SCRIPT CHARACTER-18B00", "\U00018b00")
self.checkletter("KHITAN SMALL SCRIPT CHARACTER-18CD5", "\U00018cd5")
self.checkletter("KHITAN SMALL SCRIPT CHARACTER-18CFF", "\U00018cff")
self.checkletter("KHITAN SMALL SCRIPT CHARACTER-18CFF", "\U00018cff")
self.checkletter("khitan small script character-18cff", "\U00018cff")
def test_nushu_characters(self):
self.checkletter("NUSHU CHARACTER-1B170", "\U0001b170")
self.checkletter("NUSHU CHARACTER-1B2FB", "\U0001b2fb")
self.checkletter("nushu character-1b2fb", "\U0001b2fb")
def test_bmp_characters(self):
for code in range(0x10000):
char = chr(code)
@@ -149,7 +116,7 @@ class UnicodeNamesTest(unittest.TestCase):
self.checkletter("HALFWIDTH KATAKANA SEMI-VOICED SOUND MARK", "\uFF9F")
self.checkletter("FULLWIDTH LATIN SMALL LETTER A", "\uFF41")
@unittest.expectedFailure # TODO: RUSTPYTHON
@unittest.expectedFailure # TODO: RUSTPYTHON
def test_aliases(self):
# Check that the aliases defined in the NameAliases.txt file work.
# This should be updated when new aliases are added or the file
@@ -190,7 +157,7 @@ class UnicodeNamesTest(unittest.TestCase):
unicodedata.name(chr(cp))
self.assertEqual(str(cm.exception), 'no such name')
@unittest.expectedFailure # TODO: RUSTPYTHON
@unittest.expectedFailure # TODO: RUSTPYTHON
def test_named_sequences_sample(self):
# Check a few named sequences. See #12753.
sequences = [
@@ -207,7 +174,7 @@ class UnicodeNamesTest(unittest.TestCase):
with self.assertRaises(KeyError):
unicodedata.ucd_3_2_0.lookup(seqname)
@unittest.expectedFailure # TODO: RUSTPYTHON
@unittest.expectedFailure # TODO: RUSTPYTHON
def test_named_sequences_full(self):
# Check all the named sequences
def check_version(testfile):

View File

@@ -125,8 +125,8 @@ class UnicodeFileTests(unittest.TestCase):
# open(), os.stat(), etc. don't raise any exception.
@unittest.skipIf(is_apple, 'irrelevant test on Apple platforms')
@unittest.skipIf(
support.is_wasi,
"test fails on WASI when host platform is macOS."
support.is_emscripten or support.is_wasi,
"test fails on Emscripten/WASI when host platform is macOS."
)
def test_normalize(self):
files = set(self.files)

View File

@@ -17,10 +17,11 @@ class PEP3131Test(unittest.TestCase):
𝔘𝔫𝔦𝔠𝔬𝔡𝔢 = 1
self.assertIn("Unicode", dir())
@unittest.expectedFailure # TODO: RUSTPYTHON
# TODO: RUSTPYTHON
@unittest.expectedFailure
def test_invalid(self):
try:
from test.tokenizedata import badsyntax_3131 # noqa: F401
from test.tokenizedata import badsyntax_3131
except SyntaxError as err:
self.assertEqual(str(err),
"invalid character '' (U+20AC) (badsyntax_3131.py, line 2)")

560
Lib/tracemalloc.py vendored
View File

@@ -1,560 +0,0 @@
from collections.abc import Sequence, Iterable
from functools import total_ordering
import fnmatch
import linecache
import os.path
import pickle
# Import types and functions implemented in C
from _tracemalloc import *
from _tracemalloc import _get_object_traceback, _get_traces
def _format_size(size, sign):
for unit in ('B', 'KiB', 'MiB', 'GiB', 'TiB'):
if abs(size) < 100 and unit != 'B':
# 3 digits (xx.x UNIT)
if sign:
return "%+.1f %s" % (size, unit)
else:
return "%.1f %s" % (size, unit)
if abs(size) < 10 * 1024 or unit == 'TiB':
# 4 or 5 digits (xxxx UNIT)
if sign:
return "%+.0f %s" % (size, unit)
else:
return "%.0f %s" % (size, unit)
size /= 1024
class Statistic:
"""
Statistic difference on memory allocations between two Snapshot instance.
"""
__slots__ = ('traceback', 'size', 'count')
def __init__(self, traceback, size, count):
self.traceback = traceback
self.size = size
self.count = count
def __hash__(self):
return hash((self.traceback, self.size, self.count))
def __eq__(self, other):
if not isinstance(other, Statistic):
return NotImplemented
return (self.traceback == other.traceback
and self.size == other.size
and self.count == other.count)
def __str__(self):
text = ("%s: size=%s, count=%i"
% (self.traceback,
_format_size(self.size, False),
self.count))
if self.count:
average = self.size / self.count
text += ", average=%s" % _format_size(average, False)
return text
def __repr__(self):
return ('<Statistic traceback=%r size=%i count=%i>'
% (self.traceback, self.size, self.count))
def _sort_key(self):
return (self.size, self.count, self.traceback)
class StatisticDiff:
"""
Statistic difference on memory allocations between an old and a new
Snapshot instance.
"""
__slots__ = ('traceback', 'size', 'size_diff', 'count', 'count_diff')
def __init__(self, traceback, size, size_diff, count, count_diff):
self.traceback = traceback
self.size = size
self.size_diff = size_diff
self.count = count
self.count_diff = count_diff
def __hash__(self):
return hash((self.traceback, self.size, self.size_diff,
self.count, self.count_diff))
def __eq__(self, other):
if not isinstance(other, StatisticDiff):
return NotImplemented
return (self.traceback == other.traceback
and self.size == other.size
and self.size_diff == other.size_diff
and self.count == other.count
and self.count_diff == other.count_diff)
def __str__(self):
text = ("%s: size=%s (%s), count=%i (%+i)"
% (self.traceback,
_format_size(self.size, False),
_format_size(self.size_diff, True),
self.count,
self.count_diff))
if self.count:
average = self.size / self.count
text += ", average=%s" % _format_size(average, False)
return text
def __repr__(self):
return ('<StatisticDiff traceback=%r size=%i (%+i) count=%i (%+i)>'
% (self.traceback, self.size, self.size_diff,
self.count, self.count_diff))
def _sort_key(self):
return (abs(self.size_diff), self.size,
abs(self.count_diff), self.count,
self.traceback)
def _compare_grouped_stats(old_group, new_group):
statistics = []
for traceback, stat in new_group.items():
previous = old_group.pop(traceback, None)
if previous is not None:
stat = StatisticDiff(traceback,
stat.size, stat.size - previous.size,
stat.count, stat.count - previous.count)
else:
stat = StatisticDiff(traceback,
stat.size, stat.size,
stat.count, stat.count)
statistics.append(stat)
for traceback, stat in old_group.items():
stat = StatisticDiff(traceback, 0, -stat.size, 0, -stat.count)
statistics.append(stat)
return statistics
@total_ordering
class Frame:
"""
Frame of a traceback.
"""
__slots__ = ("_frame",)
def __init__(self, frame):
# frame is a tuple: (filename: str, lineno: int)
self._frame = frame
@property
def filename(self):
return self._frame[0]
@property
def lineno(self):
return self._frame[1]
def __eq__(self, other):
if not isinstance(other, Frame):
return NotImplemented
return (self._frame == other._frame)
def __lt__(self, other):
if not isinstance(other, Frame):
return NotImplemented
return (self._frame < other._frame)
def __hash__(self):
return hash(self._frame)
def __str__(self):
return "%s:%s" % (self.filename, self.lineno)
def __repr__(self):
return "<Frame filename=%r lineno=%r>" % (self.filename, self.lineno)
@total_ordering
class Traceback(Sequence):
"""
Sequence of Frame instances sorted from the oldest frame
to the most recent frame.
"""
__slots__ = ("_frames", '_total_nframe')
def __init__(self, frames, total_nframe=None):
Sequence.__init__(self)
# frames is a tuple of frame tuples: see Frame constructor for the
# format of a frame tuple; it is reversed, because _tracemalloc
# returns frames sorted from most recent to oldest, but the
# Python API expects oldest to most recent
self._frames = tuple(reversed(frames))
self._total_nframe = total_nframe
@property
def total_nframe(self):
return self._total_nframe
def __len__(self):
return len(self._frames)
def __getitem__(self, index):
if isinstance(index, slice):
return tuple(Frame(trace) for trace in self._frames[index])
else:
return Frame(self._frames[index])
def __contains__(self, frame):
return frame._frame in self._frames
def __hash__(self):
return hash(self._frames)
def __eq__(self, other):
if not isinstance(other, Traceback):
return NotImplemented
return (self._frames == other._frames)
def __lt__(self, other):
if not isinstance(other, Traceback):
return NotImplemented
return (self._frames < other._frames)
def __str__(self):
return str(self[0])
def __repr__(self):
s = f"<Traceback {tuple(self)}"
if self._total_nframe is None:
s += ">"
else:
s += f" total_nframe={self.total_nframe}>"
return s
def format(self, limit=None, most_recent_first=False):
lines = []
if limit is not None:
if limit > 0:
frame_slice = self[-limit:]
else:
frame_slice = self[:limit]
else:
frame_slice = self
if most_recent_first:
frame_slice = reversed(frame_slice)
for frame in frame_slice:
lines.append(' File "%s", line %s'
% (frame.filename, frame.lineno))
line = linecache.getline(frame.filename, frame.lineno).strip()
if line:
lines.append(' %s' % line)
return lines
def get_object_traceback(obj):
"""
Get the traceback where the Python object *obj* was allocated.
Return a Traceback instance.
Return None if the tracemalloc module is not tracing memory allocations or
did not trace the allocation of the object.
"""
frames = _get_object_traceback(obj)
if frames is not None:
return Traceback(frames)
else:
return None
class Trace:
"""
Trace of a memory block.
"""
__slots__ = ("_trace",)
def __init__(self, trace):
# trace is a tuple: (domain: int, size: int, traceback: tuple).
# See Traceback constructor for the format of the traceback tuple.
self._trace = trace
@property
def domain(self):
return self._trace[0]
@property
def size(self):
return self._trace[1]
@property
def traceback(self):
return Traceback(*self._trace[2:])
def __eq__(self, other):
if not isinstance(other, Trace):
return NotImplemented
return (self._trace == other._trace)
def __hash__(self):
return hash(self._trace)
def __str__(self):
return "%s: %s" % (self.traceback, _format_size(self.size, False))
def __repr__(self):
return ("<Trace domain=%s size=%s, traceback=%r>"
% (self.domain, _format_size(self.size, False), self.traceback))
class _Traces(Sequence):
def __init__(self, traces):
Sequence.__init__(self)
# traces is a tuple of trace tuples: see Trace constructor
self._traces = traces
def __len__(self):
return len(self._traces)
def __getitem__(self, index):
if isinstance(index, slice):
return tuple(Trace(trace) for trace in self._traces[index])
else:
return Trace(self._traces[index])
def __contains__(self, trace):
return trace._trace in self._traces
def __eq__(self, other):
if not isinstance(other, _Traces):
return NotImplemented
return (self._traces == other._traces)
def __repr__(self):
return "<Traces len=%s>" % len(self)
def _normalize_filename(filename):
filename = os.path.normcase(filename)
if filename.endswith('.pyc'):
filename = filename[:-1]
return filename
class BaseFilter:
def __init__(self, inclusive):
self.inclusive = inclusive
def _match(self, trace):
raise NotImplementedError
class Filter(BaseFilter):
def __init__(self, inclusive, filename_pattern,
lineno=None, all_frames=False, domain=None):
super().__init__(inclusive)
self.inclusive = inclusive
self._filename_pattern = _normalize_filename(filename_pattern)
self.lineno = lineno
self.all_frames = all_frames
self.domain = domain
@property
def filename_pattern(self):
return self._filename_pattern
def _match_frame_impl(self, filename, lineno):
filename = _normalize_filename(filename)
if not fnmatch.fnmatch(filename, self._filename_pattern):
return False
if self.lineno is None:
return True
else:
return (lineno == self.lineno)
def _match_frame(self, filename, lineno):
return self._match_frame_impl(filename, lineno) ^ (not self.inclusive)
def _match_traceback(self, traceback):
if self.all_frames:
if any(self._match_frame_impl(filename, lineno)
for filename, lineno in traceback):
return self.inclusive
else:
return (not self.inclusive)
else:
filename, lineno = traceback[0]
return self._match_frame(filename, lineno)
def _match(self, trace):
domain, size, traceback, total_nframe = trace
res = self._match_traceback(traceback)
if self.domain is not None:
if self.inclusive:
return res and (domain == self.domain)
else:
return res or (domain != self.domain)
return res
class DomainFilter(BaseFilter):
def __init__(self, inclusive, domain):
super().__init__(inclusive)
self._domain = domain
@property
def domain(self):
return self._domain
def _match(self, trace):
domain, size, traceback, total_nframe = trace
return (domain == self.domain) ^ (not self.inclusive)
class Snapshot:
"""
Snapshot of traces of memory blocks allocated by Python.
"""
def __init__(self, traces, traceback_limit):
# traces is a tuple of trace tuples: see _Traces constructor for
# the exact format
self.traces = _Traces(traces)
self.traceback_limit = traceback_limit
def dump(self, filename):
"""
Write the snapshot into a file.
"""
with open(filename, "wb") as fp:
pickle.dump(self, fp, pickle.HIGHEST_PROTOCOL)
@staticmethod
def load(filename):
"""
Load a snapshot from a file.
"""
with open(filename, "rb") as fp:
return pickle.load(fp)
def _filter_trace(self, include_filters, exclude_filters, trace):
if include_filters:
if not any(trace_filter._match(trace)
for trace_filter in include_filters):
return False
if exclude_filters:
if any(not trace_filter._match(trace)
for trace_filter in exclude_filters):
return False
return True
def filter_traces(self, filters):
"""
Create a new Snapshot instance with a filtered traces sequence, filters
is a list of Filter or DomainFilter instances. If filters is an empty
list, return a new Snapshot instance with a copy of the traces.
"""
if not isinstance(filters, Iterable):
raise TypeError("filters must be a list of filters, not %s"
% type(filters).__name__)
if filters:
include_filters = []
exclude_filters = []
for trace_filter in filters:
if trace_filter.inclusive:
include_filters.append(trace_filter)
else:
exclude_filters.append(trace_filter)
new_traces = [trace for trace in self.traces._traces
if self._filter_trace(include_filters,
exclude_filters,
trace)]
else:
new_traces = self.traces._traces.copy()
return Snapshot(new_traces, self.traceback_limit)
def _group_by(self, key_type, cumulative):
if key_type not in ('traceback', 'filename', 'lineno'):
raise ValueError("unknown key_type: %r" % (key_type,))
if cumulative and key_type not in ('lineno', 'filename'):
raise ValueError("cumulative mode cannot by used "
"with key type %r" % key_type)
stats = {}
tracebacks = {}
if not cumulative:
for trace in self.traces._traces:
domain, size, trace_traceback, total_nframe = trace
try:
traceback = tracebacks[trace_traceback]
except KeyError:
if key_type == 'traceback':
frames = trace_traceback
elif key_type == 'lineno':
frames = trace_traceback[:1]
else: # key_type == 'filename':
frames = ((trace_traceback[0][0], 0),)
traceback = Traceback(frames)
tracebacks[trace_traceback] = traceback
try:
stat = stats[traceback]
stat.size += size
stat.count += 1
except KeyError:
stats[traceback] = Statistic(traceback, size, 1)
else:
# cumulative statistics
for trace in self.traces._traces:
domain, size, trace_traceback, total_nframe = trace
for frame in trace_traceback:
try:
traceback = tracebacks[frame]
except KeyError:
if key_type == 'lineno':
frames = (frame,)
else: # key_type == 'filename':
frames = ((frame[0], 0),)
traceback = Traceback(frames)
tracebacks[frame] = traceback
try:
stat = stats[traceback]
stat.size += size
stat.count += 1
except KeyError:
stats[traceback] = Statistic(traceback, size, 1)
return stats
def statistics(self, key_type, cumulative=False):
"""
Group statistics by key_type. Return a sorted list of Statistic
instances.
"""
grouped = self._group_by(key_type, cumulative)
statistics = list(grouped.values())
statistics.sort(reverse=True, key=Statistic._sort_key)
return statistics
def compare_to(self, old_snapshot, key_type, cumulative=False):
"""
Compute the differences with an old snapshot old_snapshot. Get
statistics as a sorted list of StatisticDiff instances, grouped by
group_by.
"""
new_group = self._group_by(key_type, cumulative)
old_group = old_snapshot._group_by(key_type, cumulative)
statistics = _compare_grouped_stats(old_group, new_group)
statistics.sort(reverse=True, key=StatisticDiff._sort_key)
return statistics
def take_snapshot():
"""
Take a snapshot of traces of memory blocks allocated by Python.
"""
if not is_tracing():
raise RuntimeError("the tracemalloc module must be tracing memory "
"allocations to take a snapshot")
traces = _get_traces()
traceback_limit = get_traceback_limit()
return Snapshot(traces, traceback_limit)

View File

@@ -80,7 +80,7 @@ $ python # now `python` is the alias of the RustPython for the new env
If you'd like to make https requests, you can enable the `ssl` feature, which
also lets you install the `pip` package manager. Note that on Windows, you may
need to install OpenSSL, or you can enable the `ssl-openssl-vendor` feature instead,
need to install OpenSSL, or you can enable the `ssl-vendor` feature instead,
which compiles OpenSSL for you but requires a C compiler, perl, and `make`.
OpenSSL version 3 is expected and tested in CI. Older versions may not work.
@@ -103,7 +103,7 @@ rustpython
### SSL provider
For HTTPS requests, `ssl-rustls-aws-lc` is enabled by default for the RustPython binary. Embedders can use `rustpython-stdlib`'s provider-agnostic `ssl-rustls` feature and install their own rustls crypto provider, or replace rustls with `ssl-openssl` if their environment requires OpenSSL.
Note that to use OpenSSL on Windows, you may need to install OpenSSL, or you can enable the `ssl-openssl-vendor` feature instead,
Note that to use OpenSSL on Windows, you may need to install OpenSSL, or you can enable the `ssl-vendor` feature instead,
which compiles OpenSSL for you but requires a C compiler, perl, and `make`.
OpenSSL version 3 is expected and tested in CI. Older versions may not work.
@@ -229,10 +229,24 @@ For a high level overview of the components, see the [architecture](architecture
## Contributing
Contributions are welcome and highly appreciated. To get started, check out the
[**contributing guidelines**](CONTRIBUTING.md).
Contributions are more than welcome, and in many cases we are happy to guide
contributors through PRs or on Discord. Please refer to the
[development guide](DEVELOPMENT.md) as well for tips on developments.
You can also join us on [**Discord**](https://discord.gg/vru8NypEhv).
With that in mind, please note this project is maintained by volunteers, some of
the best ways to get started are below:
Most tasks are listed in the
[issue tracker](https://github.com/RustPython/RustPython/issues). Check issues
labeled with [good first issue](https://github.com/RustPython/RustPython/issues?q=label%3A%22good+first+issue%22+is%3Aissue+is%3Aopen+) if you wish to start coding.
To enhance CPython compatibility, try to increase unittest coverage by checking this article: [How to contribute to RustPython by CPython unittest](https://rustpython.github.io/guideline/2020/04/04/how-to-contribute-by-cpython-unittest.html)
Another approach is to checkout the source code: builtin functions and object
methods are often the simplest and easiest way to contribute.
You can also simply run `python -I scripts/whats_left.py` to assist in finding any unimplemented
method.
## Compiling to WebAssembly

View File

@@ -428,6 +428,7 @@ pub(crate) const fn is_uni_linebreak(ch: u32) -> bool {
}
#[inline]
pub(crate) fn is_uni_alnum(ch: u32) -> bool {
// TODO: check with cpython
char::try_from(ch).is_ok_and(|c| {
GeneralCategoryGroup::Letter
.union(GeneralCategoryGroup::Number)

View File

@@ -20,7 +20,7 @@ sqlite = ["dep:libsqlite3-sys"]
ssl = ["host_env"]
ssl-rustls = ["__ssl-rustls", "rustls/custom-provider"]
ssl-openssl = ["ssl", "openssl", "openssl-sys", "foreign-types-shared", "openssl-probe"]
ssl-openssl-vendor = ["ssl-openssl", "openssl/vendored"]
ssl-vendor = ["ssl-openssl", "openssl/vendored"]
tkinter = ["dep:tk-sys", "dep:tcl-sys", "dep:widestring"]
flame-it = ["flame"]

View File

@@ -131,7 +131,7 @@ impl Comparable for PyWeak {
) -> PyResult<PyComparisonValue> {
op.eq_only(|| {
let other = class_or_notimplemented!(Self, other);
let both = zelf.upgrade().zip(other.upgrade());
let both = zelf.upgrade().and_then(|s| other.upgrade().map(|o| (s, o)));
match both {
// CPython parity (Objects/weakref.c::weakref_richcompare): use
// PyObject_RichCompare on the referents, not the bool variant,

View File

@@ -1006,6 +1006,10 @@ mod builtins {
exp: y,
modulus,
} = args;
#[expect(
clippy::unnecessary_option_map_or_else,
reason = "changing this won't compile"
)]
let modulus = modulus
.as_ref()
.map_or_else(|| vm.ctx.none.as_object(), |m| m);

View File

@@ -733,18 +733,6 @@ const ERROR_CODES: &[(&str, i32)] = &[
ERPCMISMATCH
),
e!(cfg(target_vendor = "apple"), ESHLIBVERS),
e!(
cfg(all(
unix,
any(target_os = "linux", target_os = "fuchsia"),
not(any(
target_os = "freebsd",
target_os = "android",
target_vendor = "apple",
))
)),
EHWPOISON
),
];
#[cfg(not(any(unix, windows, target_os = "wasi")))]

View File

@@ -1,6 +1,5 @@
use rustpython_vm::Interpreter;
#[link(wasm_import_module = "env")]
unsafe extern "C" {
fn kv_get(kp: i32, kl: i32, vp: i32, vl: i32) -> i32;

View File

@@ -891,13 +891,3 @@ assert id(b) != id(b * 0)
assert id(b) != id(b * 1)
assert id(b) != id(1 * b)
assert id(b) != id(b * 2)
# Regression tests for isalpha/isalnum Unicode General Category correctness.
# These characters are in letter categories (Ll/Lo) and should return True,
# but were missed in older Unicode tables used by unic-ucd-category.
# See: https://github.com/RustPython/RustPython/pull/7520#issuecomment-4148322294
for _cp in [1376, 1416, 1519, 2160, 2161, 2162, 2163, 2164, 2165, 2166]:
_c = chr(_cp)
assert _c.isalpha(), f"U+{_cp:04X} should be isalpha"
assert _c.isalnum(), f"U+{_cp:04X} should be isalnum"

View File

@@ -11,7 +11,6 @@ c = ᚴ * 3
assert c == "👋👋👋"
import re
import unicodedata
assert unicodedata.category("a") == "Ll"
@@ -39,10 +38,3 @@ assert b"xn--pythn-mua.org.".decode("idna") == "pyth\xf6n.org."
# TODO: add east_asian_width and mirrored
# assert unicodedata.ucd_3_2_0.east_asian_width('\u231a') == 'N'
# assert not unicodedata.ucd_3_2_0.mirrored("\u0f3a")
# U+0345 COMBINING GREEK YPOGEGRAMMENI (category Mn) should not be alphanumeric.
# CPython's isalpha/isalnum use Unicode letter categories (Lu/Ll/Lt/Lm/Lo),
# not the broader Unicode Alphabetic derived property.
assert not "\u0345".isalpha(), "isalpha should not match Mn category characters"
assert not "\u0345".isalnum(), "isalnum should not match Mn category characters"
assert not re.match(r"\w", "\u0345"), r"\w should not match U+0345 (category Mn)"