Merge pull request #1503 from RustPython/coolreader18/antigravity

Allow antigravity to be imported
This commit is contained in:
Windel Bouwman
2019-10-12 10:42:32 +02:00
committed by GitHub
11 changed files with 1758 additions and 18 deletions

17
Lib/antigravity.py vendored Normal file
View File

@@ -0,0 +1,17 @@
import webbrowser
import hashlib
webbrowser.open("https://xkcd.com/353/")
def geohash(latitude, longitude, datedow):
'''Compute geohash() using the Munroe algorithm.
>>> geohash(37.421542, -122.085589, b'2005-05-26-10458.68')
37.857713 -122.544543
'''
# http://xkcd.com/426/
h = hashlib.md5(datedow).hexdigest()
p, q = [('%f' % float.fromhex('0.' + x)) for x in (h[:16], h[16:32])]
print('%d%s %d%s' % (latitude, p[1:], longitude, q[1:]))

313
Lib/copy.py vendored Normal file
View File

@@ -0,0 +1,313 @@
"""Generic (shallow and deep) copying operations.
Interface summary:
import copy
x = copy.copy(y) # make a shallow copy of y
x = copy.deepcopy(y) # make a deep copy of y
For module specific errors, copy.Error is raised.
The difference between shallow and deep copying is only relevant for
compound objects (objects that contain other objects, like lists or
class instances).
- A shallow copy constructs a new compound object and then (to the
extent possible) inserts *the same objects* into it that the
original contains.
- A deep copy constructs a new compound object and then, recursively,
inserts *copies* into it of the objects found in the original.
Two problems often exist with deep copy operations that don't exist
with shallow copy operations:
a) recursive objects (compound objects that, directly or indirectly,
contain a reference to themselves) may cause a recursive loop
b) because deep copy copies *everything* it may copy too much, e.g.
administrative data structures that should be shared even between
copies
Python's deep copy operation avoids these problems by:
a) keeping a table of objects already copied during the current
copying pass
b) letting user-defined classes override the copying operation or the
set of components copied
This version does not copy types like module, class, function, method,
nor stack trace, stack frame, nor file, socket, window, nor array, nor
any similar types.
Classes can use the same interfaces to control copying that they use
to control pickling: they can define methods called __getinitargs__(),
__getstate__() and __setstate__(). See the documentation for module
"pickle" for information on these methods.
"""
import types
import weakref
from copyreg import dispatch_table
class Error(Exception):
pass
error = Error # backward compatibility
try:
from org.python.core import PyStringMap
except ImportError:
PyStringMap = None
__all__ = ["Error", "copy", "deepcopy"]
def copy(x):
"""Shallow copy operation on arbitrary Python objects.
See the module's __doc__ string for more info.
"""
cls = type(x)
copier = _copy_dispatch.get(cls)
if copier:
return copier(x)
try:
issc = issubclass(cls, type)
except TypeError: # cls is not a class
issc = False
if issc:
# treat it as a regular class:
return _copy_immutable(x)
copier = getattr(cls, "__copy__", None)
if copier:
return copier(x)
reductor = dispatch_table.get(cls)
if reductor:
rv = reductor(x)
else:
reductor = getattr(x, "__reduce_ex__", None)
if reductor:
rv = reductor(4)
else:
reductor = getattr(x, "__reduce__", None)
if reductor:
rv = reductor()
else:
raise Error("un(shallow)copyable object of type %s" % cls)
if isinstance(rv, str):
return x
return _reconstruct(x, None, *rv)
_copy_dispatch = d = {}
def _copy_immutable(x):
return x
for t in (type(None), int, float, bool, complex, str, tuple,
bytes, frozenset, type, range, slice,
types.BuiltinFunctionType, type(Ellipsis), type(NotImplemented),
types.FunctionType, weakref.ref):
d[t] = _copy_immutable
t = getattr(types, "CodeType", None)
if t is not None:
d[t] = _copy_immutable
d[list] = list.copy
d[dict] = dict.copy
d[set] = set.copy
d[bytearray] = bytearray.copy
if PyStringMap is not None:
d[PyStringMap] = PyStringMap.copy
del d, t
def deepcopy(x, memo=None, _nil=[]):
"""Deep copy operation on arbitrary Python objects.
See the module's __doc__ string for more info.
"""
if memo is None:
memo = {}
d = id(x)
y = memo.get(d, _nil)
if y is not _nil:
return y
cls = type(x)
copier = _deepcopy_dispatch.get(cls)
if copier:
y = copier(x, memo)
else:
try:
issc = issubclass(cls, type)
except TypeError: # cls is not a class (old Boost; see SF #502085)
issc = 0
if issc:
y = _deepcopy_atomic(x, memo)
else:
copier = getattr(x, "__deepcopy__", None)
if copier:
y = copier(memo)
else:
reductor = dispatch_table.get(cls)
if reductor:
rv = reductor(x)
else:
reductor = getattr(x, "__reduce_ex__", None)
if reductor:
rv = reductor(4)
else:
reductor = getattr(x, "__reduce__", None)
if reductor:
rv = reductor()
else:
raise Error(
"un(deep)copyable object of type %s" % cls)
if isinstance(rv, str):
y = x
else:
y = _reconstruct(x, memo, *rv)
# If is its own copy, don't memoize.
if y is not x:
memo[d] = y
_keep_alive(x, memo) # Make sure x lives at least as long as d
return y
_deepcopy_dispatch = d = {}
def _deepcopy_atomic(x, memo):
return x
d[type(None)] = _deepcopy_atomic
d[type(Ellipsis)] = _deepcopy_atomic
d[type(NotImplemented)] = _deepcopy_atomic
d[int] = _deepcopy_atomic
d[float] = _deepcopy_atomic
d[bool] = _deepcopy_atomic
d[complex] = _deepcopy_atomic
d[bytes] = _deepcopy_atomic
d[str] = _deepcopy_atomic
try:
d[types.CodeType] = _deepcopy_atomic
except AttributeError:
pass
d[type] = _deepcopy_atomic
d[types.BuiltinFunctionType] = _deepcopy_atomic
d[types.FunctionType] = _deepcopy_atomic
d[weakref.ref] = _deepcopy_atomic
def _deepcopy_list(x, memo, deepcopy=deepcopy):
y = []
memo[id(x)] = y
append = y.append
for a in x:
append(deepcopy(a, memo))
return y
d[list] = _deepcopy_list
def _deepcopy_tuple(x, memo, deepcopy=deepcopy):
y = [deepcopy(a, memo) for a in x]
# We're not going to put the tuple in the memo, but it's still important we
# check for it, in case the tuple contains recursive mutable structures.
try:
return memo[id(x)]
except KeyError:
pass
for k, j in zip(x, y):
if k is not j:
y = tuple(y)
break
else:
y = x
return y
d[tuple] = _deepcopy_tuple
def _deepcopy_dict(x, memo, deepcopy=deepcopy):
y = {}
memo[id(x)] = y
for key, value in x.items():
y[deepcopy(key, memo)] = deepcopy(value, memo)
return y
d[dict] = _deepcopy_dict
if PyStringMap is not None:
d[PyStringMap] = _deepcopy_dict
def _deepcopy_method(x, memo): # Copy instance methods
return type(x)(x.__func__, deepcopy(x.__self__, memo))
d[types.MethodType] = _deepcopy_method
del d
def _keep_alive(x, memo):
"""Keeps a reference to the object x in the memo.
Because we remember objects by their id, we have
to assure that possibly temporary objects are kept
alive by referencing them.
We store a reference at the id of the memo, which should
normally not be used unless someone tries to deepcopy
the memo itself...
"""
try:
memo[id(memo)].append(x)
except KeyError:
# aha, this is the first one :-)
memo[id(memo)]=[x]
def _reconstruct(x, memo, func, args,
state=None, listiter=None, dictiter=None,
deepcopy=deepcopy):
deep = memo is not None
if deep and args:
args = (deepcopy(arg, memo) for arg in args)
y = func(*args)
if deep:
memo[id(x)] = y
if state is not None:
if deep:
state = deepcopy(state, memo)
if hasattr(y, '__setstate__'):
y.__setstate__(state)
else:
if isinstance(state, tuple) and len(state) == 2:
state, slotstate = state
else:
slotstate = None
if state is not None:
y.__dict__.update(state)
if slotstate is not None:
for key, value in slotstate.items():
setattr(y, key, value)
if listiter is not None:
if deep:
for item in listiter:
item = deepcopy(item, memo)
y.append(item)
else:
for item in listiter:
y.append(item)
if dictiter is not None:
if deep:
for key, value in dictiter:
key = deepcopy(key, memo)
value = deepcopy(value, memo)
y[key] = value
else:
for key, value in dictiter:
y[key] = value
return y
del types, weakref, PyStringMap

513
Lib/subprocess.py vendored Normal file
View File

@@ -0,0 +1,513 @@
# subprocess - Subprocesses with accessible I/O streams
#
# For more information about this module, see PEP 324.
#
# Copyright (c) 2003-2005 by Peter Astrand <astrand@lysator.liu.se>
#
# Licensed to PSF under a Contributor Agreement.
# See http://www.python.org/2.4/license for licensing details.
r"""Subprocesses with accessible I/O streams
This module allows you to spawn processes, connect to their
input/output/error pipes, and obtain their return codes.
For a complete description of this module see the Python documentation.
Main API
========
run(...): Runs a command, waits for it to complete, then returns a
CompletedProcess instance.
Popen(...): A class for flexibly executing a command in a new process
Constants
---------
DEVNULL: Special value that indicates that os.devnull should be used
PIPE: Special value that indicates a pipe should be created
STDOUT: Special value that indicates that stderr should go to stdout
Older API
=========
call(...): Runs a command, waits for it to complete, then returns
the return code.
check_call(...): Same as call() but raises CalledProcessError()
if return code is not 0
check_output(...): Same as check_call() but returns the contents of
stdout instead of a return code
getoutput(...): Runs a command in the shell, waits for it to complete,
then returns the output
getstatusoutput(...): Runs a command in the shell, waits for it to complete,
then returns a (exitcode, output) tuple
"""
import sys
import io
import os
import time
import signal
import builtins
import warnings
import errno
from time import monotonic as _time
from _subprocess import *
# TODO: use these classes instead of the _subprocess ones
# Exception classes used by this module.
# class SubprocessError(Exception): pass
# class CalledProcessError(SubprocessError):
# """Raised when run() is called with check=True and the process
# returns a non-zero exit status.
# Attributes:
# cmd, returncode, stdout, stderr, output
# """
# def __init__(self, returncode, cmd, output=None, stderr=None):
# self.returncode = returncode
# self.cmd = cmd
# self.output = output
# self.stderr = stderr
# def __str__(self):
# if self.returncode and self.returncode < 0:
# try:
# return "Command '%s' died with %r." % (
# self.cmd, signal.Signals(-self.returncode))
# except ValueError:
# return "Command '%s' died with unknown signal %d." % (
# self.cmd, -self.returncode)
# else:
# return "Command '%s' returned non-zero exit status %d." % (
# self.cmd, self.returncode)
# @property
# def stdout(self):
# """Alias for output attribute, to match stderr"""
# return self.output
# @stdout.setter
# def stdout(self, value):
# # There's no obvious reason to set this, but allow it anyway so
# # .stdout is a transparent alias for .output
# self.output = value
# class TimeoutExpired(SubprocessError):
# """This exception is raised when the timeout expires while waiting for a
# child process.
# Attributes:
# cmd, output, stdout, stderr, timeout
# """
# def __init__(self, cmd, timeout, output=None, stderr=None):
# self.cmd = cmd
# self.timeout = timeout
# self.output = output
# self.stderr = stderr
# def __str__(self):
# return ("Command '%s' timed out after %s seconds" %
# (self.cmd, self.timeout))
# @property
# def stdout(self):
# return self.output
# @stdout.setter
# def stdout(self, value):
# # There's no obvious reason to set this, but allow it anyway so
# # .stdout is a transparent alias for .output
# self.output = value
__all__ = ["Popen", "PIPE", "STDOUT", "call", "check_call", "getstatusoutput",
"getoutput", "check_output", "run", "CalledProcessError", "DEVNULL",
"SubprocessError", "TimeoutExpired", "CompletedProcess"]
# NOTE: We intentionally exclude list2cmdline as it is
# considered an internal implementation detail. issue10838.
# This lists holds Popen instances for which the underlying process had not
# exited at the time its __del__ method got called: those processes are wait()ed
# for synchronously from _cleanup() when a new Popen object is created, to avoid
# zombie processes.
_active = []
def _cleanup():
for inst in _active[:]:
res = inst._internal_poll(_deadstate=sys.maxsize)
if res is not None:
try:
_active.remove(inst)
except ValueError:
# This can happen if two threads create a new Popen instance.
# It's harmless that it was already removed, so ignore.
pass
PIPE = -1
STDOUT = -2
DEVNULL = -3
# XXX This function is only used by multiprocessing and the test suite,
# but it's here so that it can be imported when Python is compiled without
# threads.
def _optim_args_from_interpreter_flags():
"""Return a list of command-line arguments reproducing the current
optimization settings in sys.flags."""
args = []
value = sys.flags.optimize
if value > 0:
args.append('-' + 'O' * value)
return args
def _args_from_interpreter_flags():
"""Return a list of command-line arguments reproducing the current
settings in sys.flags, sys.warnoptions and sys._xoptions."""
flag_opt_map = {
'debug': 'd',
# 'inspect': 'i',
# 'interactive': 'i',
'dont_write_bytecode': 'B',
'no_user_site': 's',
'no_site': 'S',
'ignore_environment': 'E',
'verbose': 'v',
'bytes_warning': 'b',
'quiet': 'q',
# -O is handled in _optim_args_from_interpreter_flags()
}
args = _optim_args_from_interpreter_flags()
for flag, opt in flag_opt_map.items():
v = getattr(sys.flags, flag)
if v > 0:
args.append('-' + opt * v)
# -W options
warnopts = sys.warnoptions[:]
bytes_warning = sys.flags.bytes_warning
xoptions = getattr(sys, '_xoptions', {})
dev_mode = ('dev' in xoptions)
if bytes_warning > 1:
warnopts.remove("error::BytesWarning")
elif bytes_warning:
warnopts.remove("default::BytesWarning")
if dev_mode:
warnopts.remove('default')
for opt in warnopts:
args.append('-W' + opt)
# -X options
if dev_mode:
args.extend(('-X', 'dev'))
for opt in ('faulthandler', 'tracemalloc', 'importtime',
'showalloccount', 'showrefcount', 'utf8'):
if opt in xoptions:
value = xoptions[opt]
if value is True:
arg = opt
else:
arg = '%s=%s' % (opt, value)
args.extend(('-X', arg))
return args
def call(*popenargs, timeout=None, **kwargs):
"""Run command with arguments. Wait for command to complete or
timeout, then return the returncode attribute.
The arguments are the same as for the Popen constructor. Example:
retcode = call(["ls", "-l"])
"""
with Popen(*popenargs, **kwargs) as p:
try:
return p.wait(timeout=timeout)
except: # Including KeyboardInterrupt, wait handled that.
p.kill()
# We don't call p.wait() again as p.__exit__ does that for us.
raise
def check_call(*popenargs, **kwargs):
"""Run command with arguments. Wait for command to complete. If
the exit code was zero then return, otherwise raise
CalledProcessError. The CalledProcessError object will have the
return code in the returncode attribute.
The arguments are the same as for the call function. Example:
check_call(["ls", "-l"])
"""
retcode = call(*popenargs, **kwargs)
if retcode:
cmd = kwargs.get("args")
if cmd is None:
cmd = popenargs[0]
raise CalledProcessError(retcode, cmd)
return 0
def check_output(*popenargs, timeout=None, **kwargs):
r"""Run command with arguments and return its output.
If the exit code was non-zero it raises a CalledProcessError. The
CalledProcessError object will have the return code in the returncode
attribute and output in the output attribute.
The arguments are the same as for the Popen constructor. Example:
>>> check_output(["ls", "-l", "/dev/null"])
b'crw-rw-rw- 1 root root 1, 3 Oct 18 2007 /dev/null\n'
The stdout argument is not allowed as it is used internally.
To capture standard error in the result, use stderr=STDOUT.
>>> check_output(["/bin/sh", "-c",
... "ls -l non_existent_file ; exit 0"],
... stderr=STDOUT)
b'ls: non_existent_file: No such file or directory\n'
There is an additional optional argument, "input", allowing you to
pass a string to the subprocess's stdin. If you use this argument
you may not also use the Popen constructor's "stdin" argument, as
it too will be used internally. Example:
>>> check_output(["sed", "-e", "s/foo/bar/"],
... input=b"when in the course of fooman events\n")
b'when in the course of barman events\n'
By default, all communication is in bytes, and therefore any "input"
should be bytes, and the return value wil be bytes. If in text mode,
any "input" should be a string, and the return value will be a string
decoded according to locale encoding, or by "encoding" if set. Text mode
is triggered by setting any of text, encoding, errors or universal_newlines.
"""
if 'stdout' in kwargs:
raise ValueError('stdout argument not allowed, it will be overridden.')
if 'input' in kwargs and kwargs['input'] is None:
# Explicitly passing input=None was previously equivalent to passing an
# empty string. That is maintained here for backwards compatibility.
kwargs['input'] = '' if kwargs.get('universal_newlines', False) else b''
return run(*popenargs, stdout=PIPE, timeout=timeout, check=True,
**kwargs).stdout
class CompletedProcess(object):
"""A process that has finished running.
This is returned by run().
Attributes:
args: The list or str args passed to run().
returncode: The exit code of the process, negative for signals.
stdout: The standard output (None if not captured).
stderr: The standard error (None if not captured).
"""
def __init__(self, args, returncode, stdout=None, stderr=None):
self.args = args
self.returncode = returncode
self.stdout = stdout
self.stderr = stderr
def __repr__(self):
args = ['args={!r}'.format(self.args),
'returncode={!r}'.format(self.returncode)]
if self.stdout is not None:
args.append('stdout={!r}'.format(self.stdout))
if self.stderr is not None:
args.append('stderr={!r}'.format(self.stderr))
return "{}({})".format(type(self).__name__, ', '.join(args))
def check_returncode(self):
"""Raise CalledProcessError if the exit code is non-zero."""
if self.returncode:
raise CalledProcessError(self.returncode, self.args, self.stdout,
self.stderr)
def run(*popenargs,
input=None, capture_output=False, timeout=None, check=False, **kwargs):
"""Run command with arguments and return a CompletedProcess instance.
The returned instance will have attributes args, returncode, stdout and
stderr. By default, stdout and stderr are not captured, and those attributes
will be None. Pass stdout=PIPE and/or stderr=PIPE in order to capture them.
If check is True and the exit code was non-zero, it raises a
CalledProcessError. The CalledProcessError object will have the return code
in the returncode attribute, and output & stderr attributes if those streams
were captured.
If timeout is given, and the process takes too long, a TimeoutExpired
exception will be raised.
There is an optional argument "input", allowing you to
pass bytes or a string to the subprocess's stdin. If you use this argument
you may not also use the Popen constructor's "stdin" argument, as
it will be used internally.
By default, all communication is in bytes, and therefore any "input" should
be bytes, and the stdout and stderr will be bytes. If in text mode, any
"input" should be a string, and stdout and stderr will be strings decoded
according to locale encoding, or by "encoding" if set. Text mode is
triggered by setting any of text, encoding, errors or universal_newlines.
The other arguments are the same as for the Popen constructor.
"""
if input is not None:
if 'stdin' in kwargs:
raise ValueError('stdin and input arguments may not both be used.')
kwargs['stdin'] = PIPE
if capture_output:
if ('stdout' in kwargs) or ('stderr' in kwargs):
raise ValueError('stdout and stderr arguments may not be used '
'with capture_output.')
kwargs['stdout'] = PIPE
kwargs['stderr'] = PIPE
with Popen(*popenargs, **kwargs) as process:
try:
stdout, stderr = process.communicate(input, timeout=timeout)
except TimeoutExpired:
process.kill()
stdout, stderr = process.communicate()
raise TimeoutExpired(process.args, timeout, output=stdout,
stderr=stderr)
except: # Including KeyboardInterrupt, communicate handled that.
process.kill()
# We don't call process.wait() as .__exit__ does that for us.
raise
retcode = process.poll()
if check and retcode:
raise CalledProcessError(retcode, process.args,
output=stdout, stderr=stderr)
return CompletedProcess(process.args, retcode, stdout, stderr)
def list2cmdline(seq):
"""
Translate a sequence of arguments into a command line
string, using the same rules as the MS C runtime:
1) Arguments are delimited by white space, which is either a
space or a tab.
2) A string surrounded by double quotation marks is
interpreted as a single argument, regardless of white space
contained within. A quoted string can be embedded in an
argument.
3) A double quotation mark preceded by a backslash is
interpreted as a literal double quotation mark.
4) Backslashes are interpreted literally, unless they
immediately precede a double quotation mark.
5) If backslashes immediately precede a double quotation mark,
every pair of backslashes is interpreted as a literal
backslash. If the number of backslashes is odd, the last
backslash escapes the next double quotation mark as
described in rule 3.
"""
# See
# http://msdn.microsoft.com/en-us/library/17w5ykft.aspx
# or search http://msdn.microsoft.com for
# "Parsing C++ Command-Line Arguments"
result = []
needquote = False
for arg in seq:
bs_buf = []
# Add a space to separate this argument from the others
if result:
result.append(' ')
needquote = (" " in arg) or ("\t" in arg) or not arg
if needquote:
result.append('"')
for c in arg:
if c == '\\':
# Don't know if we need to double yet.
bs_buf.append(c)
elif c == '"':
# Double backslashes.
result.append('\\' * len(bs_buf)*2)
bs_buf = []
result.append('\\"')
else:
# Normal char
if bs_buf:
result.extend(bs_buf)
bs_buf = []
result.append(c)
# Add remaining backslashes, if any.
if bs_buf:
result.extend(bs_buf)
if needquote:
result.extend(bs_buf)
result.append('"')
return ''.join(result)
# Various tools for executing commands and looking at their output and status.
#
def getstatusoutput(cmd):
"""Return (exitcode, output) of executing cmd in a shell.
Execute the string 'cmd' in a shell with 'check_output' and
return a 2-tuple (status, output). The locale encoding is used
to decode the output and process newlines.
A trailing newline is stripped from the output.
The exit status for the command can be interpreted
according to the rules for the function 'wait'. Example:
>>> import subprocess
>>> subprocess.getstatusoutput('ls /bin/ls')
(0, '/bin/ls')
>>> subprocess.getstatusoutput('cat /bin/junk')
(1, 'cat: /bin/junk: No such file or directory')
>>> subprocess.getstatusoutput('/bin/junk')
(127, 'sh: /bin/junk: not found')
>>> subprocess.getstatusoutput('/bin/kill $$')
(-15, '')
"""
try:
data = check_output(cmd, shell=True, text=True, stderr=STDOUT)
exitcode = 0
except CalledProcessError as ex:
data = ex.output
exitcode = ex.returncode
if data[-1:] == '\n':
data = data[:-1]
return exitcode, data
def getoutput(cmd):
"""Return output (stdout or stderr) of executing cmd in a shell.
Like getstatusoutput(), except the exit status is ignored and the return
value is a string containing the command's output. Example:
>>> import subprocess
>>> subprocess.getoutput('ls /bin/ls')
'/bin/ls'
"""
return getstatusoutput(cmd)[1]

664
Lib/webbrowser.py vendored Executable file
View File

@@ -0,0 +1,664 @@
#! /usr/bin/env python3
"""Interfaces for launching and remotely controlling Web browsers."""
# Maintained by Georg Brandl.
import os
import shlex
import shutil
import sys
import subprocess
__all__ = ["Error", "open", "open_new", "open_new_tab", "get", "register"]
class Error(Exception):
pass
_browsers = {} # Dictionary of available browser controllers
_tryorder = [] # Preference order of available browsers
def register(name, klass, instance=None, update_tryorder=1):
"""Register a browser connector and, optionally, connection."""
_browsers[name.lower()] = [klass, instance]
if update_tryorder > 0:
_tryorder.append(name)
elif update_tryorder < 0:
_tryorder.insert(0, name)
def get(using=None):
"""Return a browser launcher instance appropriate for the environment."""
if using is not None:
alternatives = [using]
else:
alternatives = _tryorder
for browser in alternatives:
if '%s' in browser:
# User gave us a command line, split it into name and args
browser = shlex.split(browser)
if browser[-1] == '&':
return BackgroundBrowser(browser[:-1])
else:
return GenericBrowser(browser)
else:
# User gave us a browser name or path.
try:
command = _browsers[browser.lower()]
except KeyError:
command = _synthesize(browser)
if command[1] is not None:
return command[1]
elif command[0] is not None:
return command[0]()
raise Error("could not locate runnable browser")
# Please note: the following definition hides a builtin function.
# It is recommended one does "import webbrowser" and uses webbrowser.open(url)
# instead of "from webbrowser import *".
def open(url, new=0, autoraise=True):
for name in _tryorder:
browser = get(name)
if browser.open(url, new, autoraise):
return True
return False
def open_new(url):
return open(url, 1)
def open_new_tab(url):
return open(url, 2)
def _synthesize(browser, update_tryorder=1):
"""Attempt to synthesize a controller base on existing controllers.
This is useful to create a controller when a user specifies a path to
an entry in the BROWSER environment variable -- we can copy a general
controller to operate using a specific installation of the desired
browser in this way.
If we can't create a controller in this way, or if there is no
executable for the requested browser, return [None, None].
"""
cmd = browser.split()[0]
if not shutil.which(cmd):
return [None, None]
name = os.path.basename(cmd)
try:
command = _browsers[name.lower()]
except KeyError:
return [None, None]
# now attempt to clone to fit the new name:
controller = command[1]
if controller and name.lower() == controller.basename:
import copy
controller = copy.copy(controller)
controller.name = browser
controller.basename = os.path.basename(browser)
register(browser, None, controller, update_tryorder)
return [None, controller]
return [None, None]
# General parent classes
class BaseBrowser(object):
"""Parent class for all browsers. Do not use directly."""
args = ['%s']
def __init__(self, name=""):
self.name = name
self.basename = name
def open(self, url, new=0, autoraise=True):
raise NotImplementedError
def open_new(self, url):
return self.open(url, 1)
def open_new_tab(self, url):
return self.open(url, 2)
class GenericBrowser(BaseBrowser):
"""Class for all browsers started with a command
and without remote functionality."""
def __init__(self, name):
if isinstance(name, str):
self.name = name
self.args = ["%s"]
else:
# name should be a list with arguments
self.name = name[0]
self.args = name[1:]
self.basename = os.path.basename(self.name)
def open(self, url, new=0, autoraise=True):
cmdline = [self.name] + [arg.replace("%s", url)
for arg in self.args]
try:
if sys.platform[:3] == 'win':
p = subprocess.Popen(cmdline)
else:
p = subprocess.Popen(cmdline, close_fds=True)
return not p.wait()
except OSError:
return False
class BackgroundBrowser(GenericBrowser):
"""Class for all browsers which are to be started in the
background."""
def open(self, url, new=0, autoraise=True):
cmdline = [self.name] + [arg.replace("%s", url)
for arg in self.args]
try:
if sys.platform[:3] == 'win':
p = subprocess.Popen(cmdline)
else:
p = subprocess.Popen(cmdline, close_fds=True,
start_new_session=True)
return (p.poll() is None)
except OSError:
return False
class UnixBrowser(BaseBrowser):
"""Parent class for all Unix browsers with remote functionality."""
raise_opts = None
background = False
redirect_stdout = True
# In remote_args, %s will be replaced with the requested URL. %action will
# be replaced depending on the value of 'new' passed to open.
# remote_action is used for new=0 (open). If newwin is not None, it is
# used for new=1 (open_new). If newtab is not None, it is used for
# new=3 (open_new_tab). After both substitutions are made, any empty
# strings in the transformed remote_args list will be removed.
remote_args = ['%action', '%s']
remote_action = None
remote_action_newwin = None
remote_action_newtab = None
def _invoke(self, args, remote, autoraise):
raise_opt = []
if remote and self.raise_opts:
# use autoraise argument only for remote invocation
autoraise = int(autoraise)
opt = self.raise_opts[autoraise]
if opt: raise_opt = [opt]
cmdline = [self.name] + raise_opt + args
if remote or self.background:
inout = subprocess.DEVNULL
else:
# for TTY browsers, we need stdin/out
inout = None
p = subprocess.Popen(cmdline, close_fds=True, stdin=inout,
stdout=(self.redirect_stdout and inout or None),
stderr=inout, start_new_session=True)
if remote:
# wait at most five seconds. If the subprocess is not finished, the
# remote invocation has (hopefully) started a new instance.
try:
rc = p.wait(5)
# if remote call failed, open() will try direct invocation
return not rc
except subprocess.TimeoutExpired:
return True
elif self.background:
if p.poll() is None:
return True
else:
return False
else:
return not p.wait()
def open(self, url, new=0, autoraise=True):
if new == 0:
action = self.remote_action
elif new == 1:
action = self.remote_action_newwin
elif new == 2:
if self.remote_action_newtab is None:
action = self.remote_action_newwin
else:
action = self.remote_action_newtab
else:
raise Error("Bad 'new' parameter to open(); " +
"expected 0, 1, or 2, got %s" % new)
args = [arg.replace("%s", url).replace("%action", action)
for arg in self.remote_args]
args = [arg for arg in args if arg]
success = self._invoke(args, True, autoraise)
if not success:
# remote invocation failed, try straight way
args = [arg.replace("%s", url) for arg in self.args]
return self._invoke(args, False, False)
else:
return True
class Mozilla(UnixBrowser):
"""Launcher class for Mozilla browsers."""
remote_args = ['%action', '%s']
remote_action = ""
remote_action_newwin = "-new-window"
remote_action_newtab = "-new-tab"
background = True
class Netscape(UnixBrowser):
"""Launcher class for Netscape browser."""
raise_opts = ["-noraise", "-raise"]
remote_args = ['-remote', 'openURL(%s%action)']
remote_action = ""
remote_action_newwin = ",new-window"
remote_action_newtab = ",new-tab"
background = True
class Galeon(UnixBrowser):
"""Launcher class for Galeon/Epiphany browsers."""
raise_opts = ["-noraise", ""]
remote_args = ['%action', '%s']
remote_action = "-n"
remote_action_newwin = "-w"
background = True
class Chrome(UnixBrowser):
"Launcher class for Google Chrome browser."
remote_args = ['%action', '%s']
remote_action = ""
remote_action_newwin = "--new-window"
remote_action_newtab = ""
background = True
Chromium = Chrome
class Opera(UnixBrowser):
"Launcher class for Opera browser."
raise_opts = ["-noraise", ""]
remote_args = ['-remote', 'openURL(%s%action)']
remote_action = ""
remote_action_newwin = ",new-window"
remote_action_newtab = ",new-page"
background = True
class Elinks(UnixBrowser):
"Launcher class for Elinks browsers."
remote_args = ['-remote', 'openURL(%s%action)']
remote_action = ""
remote_action_newwin = ",new-window"
remote_action_newtab = ",new-tab"
background = False
# elinks doesn't like its stdout to be redirected -
# it uses redirected stdout as a signal to do -dump
redirect_stdout = False
class Konqueror(BaseBrowser):
"""Controller for the KDE File Manager (kfm, or Konqueror).
See the output of ``kfmclient --commands``
for more information on the Konqueror remote-control interface.
"""
def open(self, url, new=0, autoraise=True):
# XXX Currently I know no way to prevent KFM from opening a new win.
if new == 2:
action = "newTab"
else:
action = "openURL"
devnull = subprocess.DEVNULL
try:
p = subprocess.Popen(["kfmclient", action, url],
close_fds=True, stdin=devnull,
stdout=devnull, stderr=devnull)
except OSError:
# fall through to next variant
pass
else:
p.wait()
# kfmclient's return code unfortunately has no meaning as it seems
return True
try:
p = subprocess.Popen(["konqueror", "--silent", url],
close_fds=True, stdin=devnull,
stdout=devnull, stderr=devnull,
start_new_session=True)
except OSError:
# fall through to next variant
pass
else:
if p.poll() is None:
# Should be running now.
return True
try:
p = subprocess.Popen(["kfm", "-d", url],
close_fds=True, stdin=devnull,
stdout=devnull, stderr=devnull,
start_new_session=True)
except OSError:
return False
else:
return (p.poll() is None)
class Grail(BaseBrowser):
# There should be a way to maintain a connection to Grail, but the
# Grail remote control protocol doesn't really allow that at this
# point. It probably never will!
def _find_grail_rc(self):
import glob
import pwd
import socket
import tempfile
tempdir = os.path.join(tempfile.gettempdir(),
".grail-unix")
user = pwd.getpwuid(os.getuid())[0]
filename = os.path.join(tempdir, user + "-*")
maybes = glob.glob(filename)
if not maybes:
return None
s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
for fn in maybes:
# need to PING each one until we find one that's live
try:
s.connect(fn)
except OSError:
# no good; attempt to clean it out, but don't fail:
try:
os.unlink(fn)
except OSError:
pass
else:
return s
def _remote(self, action):
s = self._find_grail_rc()
if not s:
return 0
s.send(action)
s.close()
return 1
def open(self, url, new=0, autoraise=True):
if new:
ok = self._remote("LOADNEW " + url)
else:
ok = self._remote("LOAD " + url)
return ok
#
# Platform support for Unix
#
# These are the right tests because all these Unix browsers require either
# a console terminal or an X display to run.
def register_X_browsers():
# use xdg-open if around
if shutil.which("xdg-open"):
register("xdg-open", None, BackgroundBrowser("xdg-open"))
# The default GNOME3 browser
if "GNOME_DESKTOP_SESSION_ID" in os.environ and shutil.which("gvfs-open"):
register("gvfs-open", None, BackgroundBrowser("gvfs-open"))
# The default GNOME browser
if "GNOME_DESKTOP_SESSION_ID" in os.environ and shutil.which("gnome-open"):
register("gnome-open", None, BackgroundBrowser("gnome-open"))
# The default KDE browser
if "KDE_FULL_SESSION" in os.environ and shutil.which("kfmclient"):
register("kfmclient", Konqueror, Konqueror("kfmclient"))
if shutil.which("x-www-browser"):
register("x-www-browser", None, BackgroundBrowser("x-www-browser"))
# The Mozilla browsers
for browser in ("firefox", "iceweasel", "iceape", "seamonkey"):
if shutil.which(browser):
register(browser, None, Mozilla(browser))
# The Netscape and old Mozilla browsers
for browser in ("mozilla-firefox",
"mozilla-firebird", "firebird",
"mozilla", "netscape"):
if shutil.which(browser):
register(browser, None, Netscape(browser))
# Konqueror/kfm, the KDE browser.
if shutil.which("kfm"):
register("kfm", Konqueror, Konqueror("kfm"))
elif shutil.which("konqueror"):
register("konqueror", Konqueror, Konqueror("konqueror"))
# Gnome's Galeon and Epiphany
for browser in ("galeon", "epiphany"):
if shutil.which(browser):
register(browser, None, Galeon(browser))
# Skipstone, another Gtk/Mozilla based browser
if shutil.which("skipstone"):
register("skipstone", None, BackgroundBrowser("skipstone"))
# Google Chrome/Chromium browsers
for browser in ("google-chrome", "chrome", "chromium", "chromium-browser"):
if shutil.which(browser):
register(browser, None, Chrome(browser))
# Opera, quite popular
if shutil.which("opera"):
register("opera", None, Opera("opera"))
# Next, Mosaic -- old but still in use.
if shutil.which("mosaic"):
register("mosaic", None, BackgroundBrowser("mosaic"))
# Grail, the Python browser. Does anybody still use it?
if shutil.which("grail"):
register("grail", Grail, None)
# Prefer X browsers if present
if os.environ.get("DISPLAY"):
register_X_browsers()
# Also try console browsers
if os.environ.get("TERM"):
if shutil.which("www-browser"):
register("www-browser", None, GenericBrowser("www-browser"))
# The Links/elinks browsers <http://artax.karlin.mff.cuni.cz/~mikulas/links/>
if shutil.which("links"):
register("links", None, GenericBrowser("links"))
if shutil.which("elinks"):
register("elinks", None, Elinks("elinks"))
# The Lynx browser <http://lynx.isc.org/>, <http://lynx.browser.org/>
if shutil.which("lynx"):
register("lynx", None, GenericBrowser("lynx"))
# The w3m browser <http://w3m.sourceforge.net/>
if shutil.which("w3m"):
register("w3m", None, GenericBrowser("w3m"))
#
# Platform support for Windows
#
if sys.platform[:3] == "win":
class WindowsDefault(BaseBrowser):
def open(self, url, new=0, autoraise=True):
try:
os.startfile(url)
except OSError:
# [Error 22] No application is associated with the specified
# file for this operation: '<URL>'
return False
else:
return True
_tryorder = []
_browsers = {}
# First try to use the default Windows browser
register("windows-default", WindowsDefault)
# Detect some common Windows browsers, fallback to IE
iexplore = os.path.join(os.environ.get("PROGRAMFILES", "C:\\Program Files"),
"Internet Explorer\\IEXPLORE.EXE")
for browser in ("firefox", "firebird", "seamonkey", "mozilla",
"netscape", "opera", iexplore):
if shutil.which(browser):
register(browser, None, BackgroundBrowser(browser))
#
# Platform support for MacOS
#
if sys.platform == 'darwin':
# Adapted from patch submitted to SourceForge by Steven J. Burr
class MacOSX(BaseBrowser):
"""Launcher class for Aqua browsers on Mac OS X
Optionally specify a browser name on instantiation. Note that this
will not work for Aqua browsers if the user has moved the application
package after installation.
If no browser is specified, the default browser, as specified in the
Internet System Preferences panel, will be used.
"""
def __init__(self, name):
self.name = name
def open(self, url, new=0, autoraise=True):
assert "'" not in url
# hack for local urls
if not ':' in url:
url = 'file:'+url
# new must be 0 or 1
new = int(bool(new))
if self.name == "default":
# User called open, open_new or get without a browser parameter
script = 'open location "%s"' % url.replace('"', '%22') # opens in default browser
else:
# User called get and chose a browser
if self.name == "OmniWeb":
toWindow = ""
else:
# Include toWindow parameter of OpenURL command for browsers
# that support it. 0 == new window; -1 == existing
toWindow = "toWindow %d" % (new - 1)
cmd = 'OpenURL "%s"' % url.replace('"', '%22')
script = '''tell application "%s"
activate
%s %s
end tell''' % (self.name, cmd, toWindow)
# Open pipe to AppleScript through osascript command
osapipe = os.popen("osascript", "w")
if osapipe is None:
return False
# Write script to osascript's stdin
osapipe.write(script)
rc = osapipe.close()
return not rc
class MacOSXOSAScript(BaseBrowser):
def __init__(self, name):
self._name = name
def open(self, url, new=0, autoraise=True):
if self._name == 'default':
script = 'open location "%s"' % url.replace('"', '%22') # opens in default browser
else:
script = '''
tell application "%s"
activate
open location "%s"
end
'''%(self._name, url.replace('"', '%22'))
osapipe = os.popen("osascript", "w")
if osapipe is None:
return False
osapipe.write(script)
rc = osapipe.close()
return not rc
# Don't clear _tryorder or _browsers since OS X can use above Unix support
# (but we prefer using the OS X specific stuff)
register("safari", None, MacOSXOSAScript('safari'), -1)
register("firefox", None, MacOSXOSAScript('firefox'), -1)
register("chrome", None, MacOSXOSAScript('chrome'), -1)
register("MacOSX", None, MacOSXOSAScript('default'), -1)
# OK, now that we know what the default preference orders for each
# platform are, allow user to override them with the BROWSER variable.
if "BROWSER" in os.environ:
_userchoices = os.environ["BROWSER"].split(os.pathsep)
_userchoices.reverse()
# Treat choices in same way as if passed into get() but do register
# and prepend to _tryorder
for cmdline in _userchoices:
if cmdline != '':
cmd = _synthesize(cmdline, -1)
if cmd[1] is None:
register(cmdline, None, GenericBrowser(cmdline), -1)
cmdline = None # to make del work if _userchoices was empty
del cmdline
del _userchoices
# what to do if _tryorder is now empty?
def main():
import getopt
usage = """Usage: %s [-n | -t] url
-n: open new window
-t: open new tab""" % sys.argv[0]
try:
opts, args = getopt.getopt(sys.argv[1:], 'ntd')
except getopt.error as msg:
print(msg, file=sys.stderr)
print(usage, file=sys.stderr)
sys.exit(1)
new_win = 0
for o, a in opts:
if o == '-n': new_win = 1
elif o == '-t': new_win = 2
if len(args) != 1:
print(usage, file=sys.stderr)
sys.exit(1)
url = args[0]
open(url, new_win)
print("\a")
if __name__ == "__main__":
main()

86
vm/Lib/__reducelib.py Normal file
View File

@@ -0,0 +1,86 @@
# Modified from code from the PyPy project:
# https://bitbucket.org/pypy/pypy/src/default/pypy/objspace/std/objectobject.py
# The MIT License
# Permission is hereby granted, free of charge, to any person
# obtaining a copy of this software and associated documentation
# files (the "Software"), to deal in the Software without
# restriction, including without limitation the rights to use,
# copy, modify, merge, publish, distribute, sublicense, and/or
# sell copies of the Software, and to permit persons to whom the
# Software is furnished to do so, subject to the following conditions:
# The above copyright notice and this permission notice shall be included
# in all copies or substantial portions of the Software.
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
# THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
# DEALINGS IN THE SOFTWARE.
import copyreg
def _abstract_method_error(typ):
methods = ", ".join(sorted(typ.__abstractmethods__))
err = "Can't instantiate abstract class %s with abstract methods %s"
raise TypeError(err % (typ.__name__, methods))
def reduce_2(obj):
cls = obj.__class__
try:
getnewargs = obj.__getnewargs__
except AttributeError:
args = ()
else:
args = getnewargs()
if not isinstance(args, tuple):
raise TypeError("__getnewargs__ should return a tuple")
try:
getstate = obj.__getstate__
except AttributeError:
state = getattr(obj, "__dict__", None)
names = slotnames(cls) # not checking for list
if names is not None:
slots = {}
for name in names:
try:
value = getattr(obj, name)
except AttributeError:
pass
else:
slots[name] = value
if slots:
state = state, slots
else:
state = getstate()
listitems = iter(obj) if isinstance(obj, list) else None
dictitems = obj.iteritems() if isinstance(obj, dict) else None
newobj = copyreg.__newobj__
args2 = (cls,) + args
return newobj, args2, state, listitems, dictitems
def slotnames(cls):
if not isinstance(cls, type):
return None
try:
return cls.__dict__["__slotnames__"]
except KeyError:
pass
slotnames = copyreg._slotnames(cls)
if not isinstance(slotnames, list) and slotnames is not None:
raise TypeError("copyreg._slotnames didn't return a list or None")
return slotnames

View File

@@ -15,10 +15,18 @@ pub fn get_module_inits() -> HashMap<String, FrozenModule> {
file = "Lib/_bootstrap_external.py",
module_name = "_frozen_importlib_external",
));
modules.extend(py_compile_bytecode!(
file = "../Lib/copyreg.py",
module_name = "copyreg",
));
modules.extend(py_compile_bytecode!(
file = "Lib/__reducelib.py",
module_name = "__reducelib",
));
#[cfg(feature = "freeze-stdlib")]
{
modules.extend(py_compile_bytecode!(dir = "../Lib/",));
modules.extend(py_compile_bytecode!(dir = "../Lib/"));
}
modules

View File

@@ -3,7 +3,7 @@ use super::objlist::PyList;
use super::objproperty::PropertyBuilder;
use super::objstr::PyStringRef;
use super::objtype::{self, PyClassRef};
use crate::function::PyFuncArgs;
use crate::function::{OptionalArg, PyFuncArgs};
use crate::pyhash;
use crate::pyobject::{
IdProtocol, ItemProtocol, PyAttributes, PyContext, PyObject, PyObjectRef, PyResult, PyValue,
@@ -185,6 +185,8 @@ pub fn init(context: &PyContext) {
"__format__" => context.new_rustfunc(object_format),
"__getattribute__" => context.new_rustfunc(object_getattribute),
"__subclasshook__" => context.new_classmethod(object_subclasshook),
"__reduce__" => context.new_rustfunc(object_reduce),
"__reduce_ex__" => context.new_rustfunc(object_reduce_ex),
"__doc__" => context.new_str(object_doc.to_string()),
});
}
@@ -210,7 +212,7 @@ fn object_dict(object: PyObjectRef, vm: &VirtualMachine) -> PyResult<PyDictRef>
if let Some(ref dict) = object.dict {
Ok(dict.clone())
} else {
Err(vm.new_type_error("TypeError: no dictionary.".to_string()))
Err(vm.new_attribute_error("no dictionary.".to_string()))
}
}
@@ -229,3 +231,31 @@ fn object_getattribute(obj: PyObjectRef, name: PyStringRef, vm: &VirtualMachine)
vm.generic_getattribute(obj.clone(), name.clone())?
.ok_or_else(|| vm.new_attribute_error(format!("{} has no attribute '{}'", obj, name)))
}
fn object_reduce(obj: PyObjectRef, proto: OptionalArg<usize>, vm: &VirtualMachine) -> PyResult {
common_reduce(obj, proto.unwrap_or(0), vm)
}
fn object_reduce_ex(obj: PyObjectRef, proto: usize, vm: &VirtualMachine) -> PyResult {
let cls = obj.class();
if let Some(reduce) = objtype::class_get_attr(&cls, "__reduce__") {
let object_reduce =
objtype::class_get_attr(&vm.ctx.types.object_type, "__reduce__").unwrap();
if !reduce.is(&object_reduce) {
return vm.invoke(&reduce, vec![]);
}
}
common_reduce(obj, proto, vm)
}
fn common_reduce(obj: PyObjectRef, proto: usize, vm: &VirtualMachine) -> PyResult {
if proto >= 2 {
let reducelib = vm.import("__reducelib", &[], 0)?;
let reduce_2 = vm.get_attribute(reducelib, "reduce_2")?;
vm.invoke(&reduce_2, vec![obj])
} else {
let copyreg = vm.import("copyreg", &[], 0)?;
let reduce_ex = vm.get_attribute(copyreg, "_reduce_ex")?;
vm.invoke(&reduce_ex, vec![obj, vm.new_int(proto)])
}
}

View File

@@ -1,5 +1,5 @@
use crate::function::OptionalArg;
use crate::obj::{objbool, objsequence, objtype::PyClassRef};
use crate::obj::{objbool, objiter, objsequence, objtype::PyClassRef};
use crate::pyobject::{IdProtocol, PyClassImpl, PyIterable, PyObjectRef, PyRef, PyResult, PyValue};
use crate::vm::ReprGuard;
use crate::VirtualMachine;
@@ -13,6 +13,7 @@ struct PyDeque {
deque: RefCell<VecDeque<PyObjectRef>>,
maxlen: Cell<Option<usize>>,
}
type PyDequeRef = PyRef<PyDeque>;
impl PyValue for PyDeque {
fn class(vm: &VirtualMachine) -> PyClassRef {
@@ -337,10 +338,51 @@ impl PyDeque {
fn len(&self, _vm: &VirtualMachine) -> usize {
self.deque.borrow().len()
}
#[pymethod(name = "__iter__")]
fn iter(zelf: PyRef<Self>, _vm: &VirtualMachine) -> PyDequeIterator {
PyDequeIterator {
position: Cell::new(0),
deque: zelf,
}
}
}
#[pyclass(name = "_deque_iterator")]
#[derive(Debug)]
struct PyDequeIterator {
position: Cell<usize>,
deque: PyDequeRef,
}
impl PyValue for PyDequeIterator {
fn class(vm: &VirtualMachine) -> PyClassRef {
vm.class("_collections", "_deque_iterator")
}
}
#[pyimpl]
impl PyDequeIterator {
#[pymethod(name = "__next__")]
fn next(&self, vm: &VirtualMachine) -> PyResult {
if self.position.get() < self.deque.deque.borrow().len() {
let ret = self.deque.deque.borrow()[self.position.get()].clone();
self.position.set(self.position.get() + 1);
Ok(ret)
} else {
Err(objiter::new_stop_iteration(vm))
}
}
#[pymethod(name = "__iter__")]
fn iter(zelf: PyRef<Self>, _vm: &VirtualMachine) -> PyRef<Self> {
zelf
}
}
pub fn make_module(vm: &VirtualMachine) -> PyObjectRef {
py_module!(vm, "_collections", {
"deque" => PyDeque::make_class(&vm.ctx),
"_deque_iterator" => PyDequeIterator::make_class(&vm.ctx),
})
}

View File

@@ -106,7 +106,7 @@ pub fn get_module_inits() -> HashMap<String, StdlibInitFunc> {
modules.insert("_os".to_string(), Box::new(os::make_module));
modules.insert("socket".to_string(), Box::new(socket::make_module));
modules.insert("signal".to_string(), Box::new(signal::make_module));
modules.insert("subprocess".to_string(), Box::new(subprocess::make_module));
modules.insert("_subprocess".to_string(), Box::new(subprocess::make_module));
modules.insert("zlib".to_string(), Box::new(zlib::make_module));
}

View File

@@ -457,8 +457,26 @@ fn os_unsetenv(key: Either<PyStringRef, PyBytesRef>, vm: &VirtualMachine) -> PyR
fn _os_environ(vm: &VirtualMachine) -> PyDictRef {
let environ = vm.ctx.new_dict();
for (key, value) in env::vars() {
environ.set_item(&key, vm.new_str(value), vm).unwrap();
#[cfg(unix)]
{
use std::os::unix::ffi::OsStringExt;
for (key, value) in env::vars_os() {
environ
.set_item(
&vm.ctx.new_bytes(key.into_vec()),
vm.ctx.new_bytes(value.into_vec()),
vm,
)
.unwrap();
}
}
#[cfg(windows)]
{
for (key, value) in env::vars() {
environ
.set_item(&vm.new_str(key), vm.new_str(value), vm)
.unwrap();
}
}
environ
}

View File

@@ -19,17 +19,19 @@ use crate::vm::VirtualMachine;
#[derive(Debug)]
struct Popen {
process: RefCell<subprocess::Popen>,
args: PyObjectRef,
}
impl PyValue for Popen {
fn class(vm: &VirtualMachine) -> PyClassRef {
vm.class("subprocess", "Popen")
vm.class("_subprocess", "Popen")
}
}
type PopenRef = PyRef<Popen>;
#[derive(FromArgs)]
#[allow(dead_code)]
struct PopenArgs {
#[pyarg(positional_only)]
args: Either<PyStringRef, PyListRef>,
@@ -40,7 +42,11 @@ struct PopenArgs {
#[pyarg(positional_or_keyword, default = "None")]
stderr: Option<i64>,
#[pyarg(positional_or_keyword, default = "None")]
close_fds: Option<bool>, // TODO: use these unused options
#[pyarg(positional_or_keyword, default = "None")]
cwd: Option<PyStringRef>,
#[pyarg(positional_or_keyword, default = "None")]
start_new_session: Option<bool>,
}
impl IntoPyObject for subprocess::ExitStatus {
@@ -55,12 +61,19 @@ impl IntoPyObject for subprocess::ExitStatus {
}
}
#[cfg(windows)]
const NULL_DEVICE: &str = "nul";
#[cfg(unix)]
const NULL_DEVICE: &str = "/dev/null";
fn convert_redirection(arg: Option<i64>, vm: &VirtualMachine) -> PyResult<subprocess::Redirection> {
match arg {
Some(fd) => match fd {
-1 => Ok(subprocess::Redirection::Pipe),
-2 => panic!("TODO"),
-3 => panic!("TODO"),
-2 => Ok(subprocess::Redirection::Merge),
-3 => Ok(subprocess::Redirection::File(
File::open(NULL_DEVICE).unwrap(),
)),
fd => {
if fd < 0 {
Err(vm.new_value_error(format!("Invalid fd: {}", fd)))
@@ -92,7 +105,7 @@ impl PopenRef {
let stdin = convert_redirection(args.stdin, vm)?;
let stdout = convert_redirection(args.stdout, vm)?;
let stderr = convert_redirection(args.stderr, vm)?;
let command_list = match args.args {
let command_list = match &args.args {
Either::A(command) => vec![command.as_str().to_string()],
Either::B(command_list) => objsequence::get_elements_list(command_list.as_object())
.iter()
@@ -115,6 +128,7 @@ impl PopenRef {
Popen {
process: RefCell::new(process),
args: args.args.into_object(),
}
.into_ref_with_type(vm, cls)
}
@@ -137,7 +151,7 @@ impl PopenRef {
}
.map_err(|s| vm.new_os_error(format!("Could not start program: {}", s)))?;
if timeout.is_none() {
let timeout_expired = vm.class("subprocess", "TimeoutExpired");
let timeout_expired = vm.try_class("_subprocess", "TimeoutExpired")?;
Err(vm.new_exception(timeout_expired, "Timeout".to_string()))
} else {
Ok(())
@@ -173,18 +187,52 @@ impl PopenRef {
#[allow(clippy::type_complexity)]
fn communicate(
self,
stdin: OptionalArg<PyBytesRef>,
args: PopenCommunicateArgs,
vm: &VirtualMachine,
) -> PyResult<(Option<Vec<u8>>, Option<Vec<u8>>)> {
let bytes = match args.input {
OptionalArg::Present(ref bytes) => Some(bytes.get_value()),
OptionalArg::Missing => None,
};
self.process
.borrow_mut()
.communicate_bytes(stdin.into_option().as_ref().map(|bytes| bytes.get_value()))
.communicate_bytes(bytes)
.map_err(|err| convert_io_error(vm, err))
}
fn pid(self, _vm: &VirtualMachine) -> Option<u32> {
self.process.borrow().pid()
}
fn enter(self, _vm: &VirtualMachine) -> Self {
self
}
fn exit(
self,
_exception_type: PyObjectRef,
_exception_value: PyObjectRef,
_traceback: PyObjectRef,
_vm: &VirtualMachine,
) {
let mut process = self.process.borrow_mut();
process.stdout.take();
process.stdin.take();
process.stderr.take();
}
fn args(self, _vm: &VirtualMachine) -> PyObjectRef {
self.args.clone()
}
}
#[derive(FromArgs)]
#[allow(dead_code)]
struct PopenCommunicateArgs {
#[pyarg(positional_or_keyword, optional = true)]
input: OptionalArg<PyBytesRef>,
#[pyarg(positional_or_keyword, optional = true)]
timeout: OptionalArg<u64>,
}
pub fn make_module(vm: &VirtualMachine) -> PyObjectRef {
@@ -205,16 +253,17 @@ pub fn make_module(vm: &VirtualMachine) -> PyObjectRef {
"kill" => ctx.new_rustfunc(PopenRef::kill),
"communicate" => ctx.new_rustfunc(PopenRef::communicate),
"pid" => ctx.new_property(PopenRef::pid),
"__enter__" => ctx.new_rustfunc(PopenRef::enter),
"__exit__" => ctx.new_rustfunc(PopenRef::exit),
"args" => ctx.new_property(PopenRef::args),
});
let module = py_module!(vm, "subprocess", {
py_module!(vm, "_subprocess", {
"Popen" => popen,
"SubprocessError" => subprocess_error,
"TimeoutExpired" => timeout_expired,
"PIPE" => ctx.new_int(-1),
"STDOUT" => ctx.new_int(-2),
"DEVNULL" => ctx.new_int(-3),
});
module
})
}