forked from Rust-related/RustPython
Update cgi.py from Python 3.11
This commit is contained in:
46
Lib/cgi.py
vendored
46
Lib/cgi.py
vendored
@@ -13,6 +13,11 @@
|
||||
|
||||
This module defines a number of utilities for use by CGI scripts
|
||||
written in Python.
|
||||
|
||||
The global variable maxlen can be set to an integer indicating the maximum size
|
||||
of a POST request. POST requests larger than this size will result in a
|
||||
ValueError being raised during parsing. The default value of this variable is 0,
|
||||
meaning the request size is unlimited.
|
||||
"""
|
||||
|
||||
# History
|
||||
@@ -41,12 +46,16 @@ from email.message import Message
|
||||
import html
|
||||
import locale
|
||||
import tempfile
|
||||
import warnings
|
||||
|
||||
__all__ = ["MiniFieldStorage", "FieldStorage", "parse", "parse_multipart",
|
||||
"parse_header", "test", "print_exception", "print_environ",
|
||||
"print_form", "print_directory", "print_arguments",
|
||||
"print_environ_usage"]
|
||||
|
||||
|
||||
warnings._deprecated(__name__, remove=(3,13))
|
||||
|
||||
# Logging support
|
||||
# ===============
|
||||
|
||||
@@ -77,9 +86,11 @@ def initlog(*allargs):
|
||||
|
||||
"""
|
||||
global log, logfile, logfp
|
||||
warnings.warn("cgi.log() is deprecated as of 3.10. Use logging instead",
|
||||
DeprecationWarning, stacklevel=2)
|
||||
if logfile and not logfp:
|
||||
try:
|
||||
logfp = open(logfile, "a")
|
||||
logfp = open(logfile, "a", encoding="locale")
|
||||
except OSError:
|
||||
pass
|
||||
if not logfp:
|
||||
@@ -115,7 +126,8 @@ log = initlog # The current logging function
|
||||
# 0 ==> unlimited input
|
||||
maxlen = 0
|
||||
|
||||
def parse(fp=None, environ=os.environ, keep_blank_values=0, strict_parsing=0):
|
||||
def parse(fp=None, environ=os.environ, keep_blank_values=0,
|
||||
strict_parsing=0, separator='&'):
|
||||
"""Parse a query in the environment or from a file (default stdin)
|
||||
|
||||
Arguments, all optional:
|
||||
@@ -134,6 +146,9 @@ def parse(fp=None, environ=os.environ, keep_blank_values=0, strict_parsing=0):
|
||||
strict_parsing: flag indicating what to do with parsing errors.
|
||||
If false (the default), errors are silently ignored.
|
||||
If true, errors raise a ValueError exception.
|
||||
|
||||
separator: str. The symbol to use for separating the query arguments.
|
||||
Defaults to &.
|
||||
"""
|
||||
if fp is None:
|
||||
fp = sys.stdin
|
||||
@@ -154,7 +169,7 @@ def parse(fp=None, environ=os.environ, keep_blank_values=0, strict_parsing=0):
|
||||
if environ['REQUEST_METHOD'] == 'POST':
|
||||
ctype, pdict = parse_header(environ['CONTENT_TYPE'])
|
||||
if ctype == 'multipart/form-data':
|
||||
return parse_multipart(fp, pdict)
|
||||
return parse_multipart(fp, pdict, separator=separator)
|
||||
elif ctype == 'application/x-www-form-urlencoded':
|
||||
clength = int(environ['CONTENT_LENGTH'])
|
||||
if maxlen and clength > maxlen:
|
||||
@@ -178,10 +193,10 @@ def parse(fp=None, environ=os.environ, keep_blank_values=0, strict_parsing=0):
|
||||
qs = ""
|
||||
environ['QUERY_STRING'] = qs # XXX Shouldn't, really
|
||||
return urllib.parse.parse_qs(qs, keep_blank_values, strict_parsing,
|
||||
encoding=encoding)
|
||||
encoding=encoding, separator=separator)
|
||||
|
||||
|
||||
def parse_multipart(fp, pdict, encoding="utf-8", errors="replace"):
|
||||
def parse_multipart(fp, pdict, encoding="utf-8", errors="replace", separator='&'):
|
||||
"""Parse multipart input.
|
||||
|
||||
Arguments:
|
||||
@@ -194,15 +209,18 @@ def parse_multipart(fp, pdict, encoding="utf-8", errors="replace"):
|
||||
value is a list of values for that field. For non-file fields, the value
|
||||
is a list of strings.
|
||||
"""
|
||||
# RFC 2026, Section 5.1 : The "multipart" boundary delimiters are always
|
||||
# RFC 2046, Section 5.1 : The "multipart" boundary delimiters are always
|
||||
# represented as 7bit US-ASCII.
|
||||
boundary = pdict['boundary'].decode('ascii')
|
||||
ctype = "multipart/form-data; boundary={}".format(boundary)
|
||||
headers = Message()
|
||||
headers.set_type(ctype)
|
||||
headers['Content-Length'] = pdict['CONTENT-LENGTH']
|
||||
try:
|
||||
headers['Content-Length'] = pdict['CONTENT-LENGTH']
|
||||
except KeyError:
|
||||
pass
|
||||
fs = FieldStorage(fp, headers=headers, encoding=encoding, errors=errors,
|
||||
environ={'REQUEST_METHOD': 'POST'})
|
||||
environ={'REQUEST_METHOD': 'POST'}, separator=separator)
|
||||
return {k: fs.getlist(k) for k in fs}
|
||||
|
||||
def _parseparam(s):
|
||||
@@ -312,7 +330,7 @@ class FieldStorage:
|
||||
def __init__(self, fp=None, headers=None, outerboundary=b'',
|
||||
environ=os.environ, keep_blank_values=0, strict_parsing=0,
|
||||
limit=None, encoding='utf-8', errors='replace',
|
||||
max_num_fields=None):
|
||||
max_num_fields=None, separator='&'):
|
||||
"""Constructor. Read multipart/* until last part.
|
||||
|
||||
Arguments, all optional:
|
||||
@@ -360,6 +378,7 @@ class FieldStorage:
|
||||
self.keep_blank_values = keep_blank_values
|
||||
self.strict_parsing = strict_parsing
|
||||
self.max_num_fields = max_num_fields
|
||||
self.separator = separator
|
||||
if 'REQUEST_METHOD' in environ:
|
||||
method = environ['REQUEST_METHOD'].upper()
|
||||
self.qs_on_post = None
|
||||
@@ -586,7 +605,7 @@ class FieldStorage:
|
||||
query = urllib.parse.parse_qsl(
|
||||
qs, self.keep_blank_values, self.strict_parsing,
|
||||
encoding=self.encoding, errors=self.errors,
|
||||
max_num_fields=self.max_num_fields)
|
||||
max_num_fields=self.max_num_fields, separator=self.separator)
|
||||
self.list = [MiniFieldStorage(key, value) for key, value in query]
|
||||
self.skip_lines()
|
||||
|
||||
@@ -602,7 +621,7 @@ class FieldStorage:
|
||||
query = urllib.parse.parse_qsl(
|
||||
self.qs_on_post, self.keep_blank_values, self.strict_parsing,
|
||||
encoding=self.encoding, errors=self.errors,
|
||||
max_num_fields=self.max_num_fields)
|
||||
max_num_fields=self.max_num_fields, separator=self.separator)
|
||||
self.list.extend(MiniFieldStorage(key, value) for key, value in query)
|
||||
|
||||
klass = self.FieldStorageClass or self.__class__
|
||||
@@ -646,7 +665,7 @@ class FieldStorage:
|
||||
else self.limit - self.bytes_read
|
||||
part = klass(self.fp, headers, ib, environ, keep_blank_values,
|
||||
strict_parsing, limit,
|
||||
self.encoding, self.errors, max_num_fields)
|
||||
self.encoding, self.errors, max_num_fields, self.separator)
|
||||
|
||||
if max_num_fields is not None:
|
||||
max_num_fields -= 1
|
||||
@@ -736,7 +755,8 @@ class FieldStorage:
|
||||
last_line_lfend = True
|
||||
_read = 0
|
||||
while 1:
|
||||
if self.limit is not None and _read >= self.limit:
|
||||
|
||||
if self.limit is not None and 0 <= self.limit <= _read:
|
||||
break
|
||||
line = self.fp.readline(1<<16) # bytes
|
||||
self.bytes_read += len(line)
|
||||
|
||||
19
Lib/test/test_cgi.py
vendored
19
Lib/test/test_cgi.py
vendored
@@ -1,4 +1,3 @@
|
||||
import cgi
|
||||
import os
|
||||
import sys
|
||||
import tempfile
|
||||
@@ -6,6 +5,10 @@ import unittest
|
||||
from collections import namedtuple
|
||||
from io import StringIO, BytesIO
|
||||
from test import support
|
||||
from test.support import warnings_helper
|
||||
|
||||
cgi = warnings_helper.import_deprecated("cgi")
|
||||
|
||||
|
||||
class HackedSysModule:
|
||||
# The regression test will have real values in sys.argv, which
|
||||
@@ -50,7 +53,7 @@ def do_test(buf, method):
|
||||
return ComparableException(err)
|
||||
|
||||
parse_strict_test_cases = [
|
||||
("", ValueError("bad query field: ''")),
|
||||
("", {}),
|
||||
("&", ValueError("bad query field: ''")),
|
||||
("&&", ValueError("bad query field: ''")),
|
||||
# Should the next few really be valid?
|
||||
@@ -123,8 +126,6 @@ class CgiTests(unittest.TestCase):
|
||||
'file': [b'Testing 123.\n'], 'title': ['']}
|
||||
self.assertEqual(result, expected)
|
||||
|
||||
# TODO: RUSTPYTHON
|
||||
@unittest.expectedFailure
|
||||
def test_parse_multipart_without_content_length(self):
|
||||
POSTDATA = '''--JfISa01
|
||||
Content-Disposition: form-data; name="submit-name"
|
||||
@@ -174,6 +175,8 @@ Content-Length: 3
|
||||
fs = cgi.FieldStorage(headers={'content-type':'text/plain'})
|
||||
self.assertRaises(TypeError, bool, fs)
|
||||
|
||||
# TODO: RUSTPYTHON
|
||||
@unittest.expectedFailure
|
||||
def test_strict(self):
|
||||
for orig, expect in parse_strict_test_cases:
|
||||
# Test basic parsing
|
||||
@@ -200,8 +203,6 @@ Content-Length: 3
|
||||
else:
|
||||
self.assertEqual(fs.getvalue(key), expect_val[0])
|
||||
|
||||
# TODO: RUSTPYTHON
|
||||
@unittest.expectedFailure
|
||||
def test_separator(self):
|
||||
parse_semicolon = [
|
||||
("x=1;y=2.0", {'x': ['1'], 'y': ['2.0']}),
|
||||
@@ -226,6 +227,7 @@ Content-Length: 3
|
||||
else:
|
||||
self.assertEqual(fs.getvalue(key), expect_val[0])
|
||||
|
||||
@warnings_helper.ignore_warnings(category=DeprecationWarning)
|
||||
def test_log(self):
|
||||
cgi.log("Testing")
|
||||
|
||||
@@ -578,8 +580,9 @@ this is the content of the fake file
|
||||
("form-data", {"name": "files", "filename": 'fo"o;bar'}))
|
||||
|
||||
def test_all(self):
|
||||
not_exported = {"logfile", "logfp", "initlog", "dolog", "nolog",
|
||||
"closelog", "log", "maxlen", "valid_boundary"}
|
||||
not_exported = {
|
||||
"logfile", "logfp", "initlog", "dolog", "nolog", "closelog", "log",
|
||||
"maxlen", "valid_boundary"}
|
||||
support.check__all__(self, cgi, not_exported=not_exported)
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user