From 3150b4ded6b92c9dd2dc8cfcc542e784c95a7b92 Mon Sep 17 00:00:00 2001 From: DimitrisJim Date: Mon, 12 Jun 2023 00:08:49 +0300 Subject: [PATCH] Update cgi.py from Python 3.11 --- Lib/cgi.py | 46 +++++++++++++++++++++++++++++++------------- Lib/test/test_cgi.py | 19 ++++++++++-------- 2 files changed, 44 insertions(+), 21 deletions(-) diff --git a/Lib/cgi.py b/Lib/cgi.py index c22c71b387..8787567be7 100755 --- a/Lib/cgi.py +++ b/Lib/cgi.py @@ -13,6 +13,11 @@ This module defines a number of utilities for use by CGI scripts written in Python. + +The global variable maxlen can be set to an integer indicating the maximum size +of a POST request. POST requests larger than this size will result in a +ValueError being raised during parsing. The default value of this variable is 0, +meaning the request size is unlimited. """ # History @@ -41,12 +46,16 @@ from email.message import Message import html import locale import tempfile +import warnings __all__ = ["MiniFieldStorage", "FieldStorage", "parse", "parse_multipart", "parse_header", "test", "print_exception", "print_environ", "print_form", "print_directory", "print_arguments", "print_environ_usage"] + +warnings._deprecated(__name__, remove=(3,13)) + # Logging support # =============== @@ -77,9 +86,11 @@ def initlog(*allargs): """ global log, logfile, logfp + warnings.warn("cgi.log() is deprecated as of 3.10. Use logging instead", + DeprecationWarning, stacklevel=2) if logfile and not logfp: try: - logfp = open(logfile, "a") + logfp = open(logfile, "a", encoding="locale") except OSError: pass if not logfp: @@ -115,7 +126,8 @@ log = initlog # The current logging function # 0 ==> unlimited input maxlen = 0 -def parse(fp=None, environ=os.environ, keep_blank_values=0, strict_parsing=0): +def parse(fp=None, environ=os.environ, keep_blank_values=0, + strict_parsing=0, separator='&'): """Parse a query in the environment or from a file (default stdin) Arguments, all optional: @@ -134,6 +146,9 @@ def parse(fp=None, environ=os.environ, keep_blank_values=0, strict_parsing=0): strict_parsing: flag indicating what to do with parsing errors. If false (the default), errors are silently ignored. If true, errors raise a ValueError exception. + + separator: str. The symbol to use for separating the query arguments. + Defaults to &. """ if fp is None: fp = sys.stdin @@ -154,7 +169,7 @@ def parse(fp=None, environ=os.environ, keep_blank_values=0, strict_parsing=0): if environ['REQUEST_METHOD'] == 'POST': ctype, pdict = parse_header(environ['CONTENT_TYPE']) if ctype == 'multipart/form-data': - return parse_multipart(fp, pdict) + return parse_multipart(fp, pdict, separator=separator) elif ctype == 'application/x-www-form-urlencoded': clength = int(environ['CONTENT_LENGTH']) if maxlen and clength > maxlen: @@ -178,10 +193,10 @@ def parse(fp=None, environ=os.environ, keep_blank_values=0, strict_parsing=0): qs = "" environ['QUERY_STRING'] = qs # XXX Shouldn't, really return urllib.parse.parse_qs(qs, keep_blank_values, strict_parsing, - encoding=encoding) + encoding=encoding, separator=separator) -def parse_multipart(fp, pdict, encoding="utf-8", errors="replace"): +def parse_multipart(fp, pdict, encoding="utf-8", errors="replace", separator='&'): """Parse multipart input. Arguments: @@ -194,15 +209,18 @@ def parse_multipart(fp, pdict, encoding="utf-8", errors="replace"): value is a list of values for that field. For non-file fields, the value is a list of strings. """ - # RFC 2026, Section 5.1 : The "multipart" boundary delimiters are always + # RFC 2046, Section 5.1 : The "multipart" boundary delimiters are always # represented as 7bit US-ASCII. boundary = pdict['boundary'].decode('ascii') ctype = "multipart/form-data; boundary={}".format(boundary) headers = Message() headers.set_type(ctype) - headers['Content-Length'] = pdict['CONTENT-LENGTH'] + try: + headers['Content-Length'] = pdict['CONTENT-LENGTH'] + except KeyError: + pass fs = FieldStorage(fp, headers=headers, encoding=encoding, errors=errors, - environ={'REQUEST_METHOD': 'POST'}) + environ={'REQUEST_METHOD': 'POST'}, separator=separator) return {k: fs.getlist(k) for k in fs} def _parseparam(s): @@ -312,7 +330,7 @@ class FieldStorage: def __init__(self, fp=None, headers=None, outerboundary=b'', environ=os.environ, keep_blank_values=0, strict_parsing=0, limit=None, encoding='utf-8', errors='replace', - max_num_fields=None): + max_num_fields=None, separator='&'): """Constructor. Read multipart/* until last part. Arguments, all optional: @@ -360,6 +378,7 @@ class FieldStorage: self.keep_blank_values = keep_blank_values self.strict_parsing = strict_parsing self.max_num_fields = max_num_fields + self.separator = separator if 'REQUEST_METHOD' in environ: method = environ['REQUEST_METHOD'].upper() self.qs_on_post = None @@ -586,7 +605,7 @@ class FieldStorage: query = urllib.parse.parse_qsl( qs, self.keep_blank_values, self.strict_parsing, encoding=self.encoding, errors=self.errors, - max_num_fields=self.max_num_fields) + max_num_fields=self.max_num_fields, separator=self.separator) self.list = [MiniFieldStorage(key, value) for key, value in query] self.skip_lines() @@ -602,7 +621,7 @@ class FieldStorage: query = urllib.parse.parse_qsl( self.qs_on_post, self.keep_blank_values, self.strict_parsing, encoding=self.encoding, errors=self.errors, - max_num_fields=self.max_num_fields) + max_num_fields=self.max_num_fields, separator=self.separator) self.list.extend(MiniFieldStorage(key, value) for key, value in query) klass = self.FieldStorageClass or self.__class__ @@ -646,7 +665,7 @@ class FieldStorage: else self.limit - self.bytes_read part = klass(self.fp, headers, ib, environ, keep_blank_values, strict_parsing, limit, - self.encoding, self.errors, max_num_fields) + self.encoding, self.errors, max_num_fields, self.separator) if max_num_fields is not None: max_num_fields -= 1 @@ -736,7 +755,8 @@ class FieldStorage: last_line_lfend = True _read = 0 while 1: - if self.limit is not None and _read >= self.limit: + + if self.limit is not None and 0 <= self.limit <= _read: break line = self.fp.readline(1<<16) # bytes self.bytes_read += len(line) diff --git a/Lib/test/test_cgi.py b/Lib/test/test_cgi.py index cc736572b0..43164cff31 100644 --- a/Lib/test/test_cgi.py +++ b/Lib/test/test_cgi.py @@ -1,4 +1,3 @@ -import cgi import os import sys import tempfile @@ -6,6 +5,10 @@ import unittest from collections import namedtuple from io import StringIO, BytesIO from test import support +from test.support import warnings_helper + +cgi = warnings_helper.import_deprecated("cgi") + class HackedSysModule: # The regression test will have real values in sys.argv, which @@ -50,7 +53,7 @@ def do_test(buf, method): return ComparableException(err) parse_strict_test_cases = [ - ("", ValueError("bad query field: ''")), + ("", {}), ("&", ValueError("bad query field: ''")), ("&&", ValueError("bad query field: ''")), # Should the next few really be valid? @@ -123,8 +126,6 @@ class CgiTests(unittest.TestCase): 'file': [b'Testing 123.\n'], 'title': ['']} self.assertEqual(result, expected) - # TODO: RUSTPYTHON - @unittest.expectedFailure def test_parse_multipart_without_content_length(self): POSTDATA = '''--JfISa01 Content-Disposition: form-data; name="submit-name" @@ -174,6 +175,8 @@ Content-Length: 3 fs = cgi.FieldStorage(headers={'content-type':'text/plain'}) self.assertRaises(TypeError, bool, fs) + # TODO: RUSTPYTHON + @unittest.expectedFailure def test_strict(self): for orig, expect in parse_strict_test_cases: # Test basic parsing @@ -200,8 +203,6 @@ Content-Length: 3 else: self.assertEqual(fs.getvalue(key), expect_val[0]) - # TODO: RUSTPYTHON - @unittest.expectedFailure def test_separator(self): parse_semicolon = [ ("x=1;y=2.0", {'x': ['1'], 'y': ['2.0']}), @@ -226,6 +227,7 @@ Content-Length: 3 else: self.assertEqual(fs.getvalue(key), expect_val[0]) + @warnings_helper.ignore_warnings(category=DeprecationWarning) def test_log(self): cgi.log("Testing") @@ -578,8 +580,9 @@ this is the content of the fake file ("form-data", {"name": "files", "filename": 'fo"o;bar'})) def test_all(self): - not_exported = {"logfile", "logfp", "initlog", "dolog", "nolog", - "closelog", "log", "maxlen", "valid_boundary"} + not_exported = { + "logfile", "logfp", "initlog", "dolog", "nolog", "closelog", "log", + "maxlen", "valid_boundary"} support.check__all__(self, cgi, not_exported=not_exported)