Update wsgiref.py to 3.14.4 (#7825)

This commit is contained in:
Shahar Naveh
2026-05-11 18:44:05 +03:00
committed by GitHub
parent 3fbfbf53c2
commit c79aefecee
3 changed files with 70 additions and 25 deletions

View File

@@ -1,6 +1,6 @@
from unittest import mock
from test import support
from test.support import socket_helper
from test.support import socket_helper, control_characters_c0
from test.test_httpservers import NoLogRequestHandler
from unittest import TestCase
from wsgiref.util import setup_testing_defaults
@@ -149,9 +149,9 @@ class IntegrationTests(TestCase):
start_response("200 OK", ('Content-Type','text/plain'))
return ["Hello, world!"]
out, err = run_amock(validator(bad_app))
self.assertTrue(out.endswith(
self.assertEndsWith(out,
b"A server error occurred. Please contact the administrator."
))
)
self.assertEqual(
err.splitlines()[-2],
"AssertionError: Headers (('Content-Type', 'text/plain')) must"
@@ -174,9 +174,9 @@ class IntegrationTests(TestCase):
for status, exc_message in tests:
with self.subTest(status=status):
out, err = run_amock(create_bad_app(status))
self.assertTrue(out.endswith(
self.assertEndsWith(out,
b"A server error occurred. Please contact the administrator."
))
)
self.assertEqual(err.splitlines()[-2], exc_message)
def test_wsgi_input(self):
@@ -185,9 +185,9 @@ class IntegrationTests(TestCase):
s("200 OK", [("Content-Type", "text/plain; charset=utf-8")])
return [b"data"]
out, err = run_amock(validator(bad_app))
self.assertTrue(out.endswith(
self.assertEndsWith(out,
b"A server error occurred. Please contact the administrator."
))
)
self.assertEqual(
err.splitlines()[-2], "AssertionError"
)
@@ -200,7 +200,7 @@ class IntegrationTests(TestCase):
])
return [b"data"]
out, err = run_amock(validator(app))
self.assertTrue(err.endswith('"GET / HTTP/1.0" 200 4\n'))
self.assertEndsWith(err, '"GET / HTTP/1.0" 200 4\n')
ver = sys.version.split()[0].encode('ascii')
py = python_implementation().encode('ascii')
pyver = py + b"/" + ver
@@ -448,7 +448,7 @@ class UtilityTests(TestCase):
for alt in hop, hop.title(), hop.upper(), hop.lower():
self.assertFalse(util.is_hop_by_hop(alt))
@unittest.expectedFailure # TODO: RUSTPYTHON
@unittest.expectedFailure # TODO: RUSTPYTHON
def test_filewrapper_getitem_deprecation(self):
return super().test_filewrapper_getitem_deprecation()
@@ -507,6 +507,22 @@ class HeaderTests(TestCase):
'\r\n'
)
def testRaisesControlCharacters(self):
for c0 in control_characters_c0():
with self.subTest(c0):
headers = Headers()
self.assertRaises(ValueError, headers.__setitem__, f"key{c0}", "val")
self.assertRaises(ValueError, headers.add_header, f"key{c0}", "val", param="param")
# HTAB (\x09) is allowed in values, not names.
if c0 == "\t":
headers["key"] = f"val{c0}"
headers.add_header("key", f"val{c0}")
headers.setdefault(f"key", f"val{c0}")
else:
self.assertRaises(ValueError, headers.__setitem__, "key", f"val{c0}")
self.assertRaises(ValueError, headers.add_header, "key", f"val{c0}", param="param")
self.assertRaises(ValueError, headers.add_header, "key", "val", param=f"param{c0}")
class ErrorHandler(BaseCGIHandler):
"""Simple handler subclass for testing BaseHandler"""
@@ -843,6 +859,25 @@ class HandlerTests(TestCase):
self.assertIsNotNone(h.status)
self.assertIsNotNone(h.environ)
def testRaisesControlCharacters(self):
for c0 in control_characters_c0():
with self.subTest(c0):
base = BaseHandler()
with self.assertRaises(ValueError):
base.start_response(c0, [('x', 'y')])
base = BaseHandler()
with self.assertRaises(ValueError):
base.start_response('200 OK', [(c0, 'y')])
# HTAB (\x09) is allowed in header values, but not in names.
base = BaseHandler()
if c0 != "\t":
with self.assertRaises(ValueError):
base.start_response('200 OK', [('x', c0)])
else:
base.start_response('200 OK', [('x', c0)])
if __name__ == "__main__":
unittest.main()

View File

@@ -1,7 +1,7 @@
"""Base classes for server/gateway implementations"""
from .util import FileWrapper, guess_scheme, is_hop_by_hop
from .headers import Headers
from .headers import Headers, _name_disallowed_re
import sys, os, time
@@ -249,6 +249,8 @@ class BaseHandler:
return self.write
def _validate_status(self, status):
if _name_disallowed_re.search(status):
raise ValueError("Control characters are not allowed in status")
if len(status) < 4:
raise AssertionError("Status must be at least 4 characters")
if not status[:3].isdigit():

View File

@@ -1,14 +1,19 @@
"""Manage HTTP Response Headers
Much of this module is red-handedly pilfered from email.message in the stdlib,
so portions are Copyright (C) 2001,2002 Python Software Foundation, and were
so portions are Copyright (C) 2001 Python Software Foundation, and were
written by Barry Warsaw.
"""
# Regular expression that matches `special' characters in parameters, the
# Regular expression that matches 'special' characters in parameters, the
# existence of which force quoting of the parameter value.
import re
tspecials = re.compile(r'[ \(\)<>@,;:\\"/\[\]\?=]')
# Disallowed characters for headers and values.
# HTAB (\x09) is allowed in header values, but
# not in header names. (RFC 9110 Section 5.5)
_name_disallowed_re = re.compile(r'[\x00-\x1F\x7F]')
_value_disallowed_re = re.compile(r'[\x00-\x08\x0A-\x1F\x7F]')
def _formatparam(param, value=None, quote=1):
"""Convenience function to format and return a key=value pair.
@@ -35,12 +40,15 @@ class Headers:
self._headers = headers
if __debug__:
for k, v in headers:
self._convert_string_type(k)
self._convert_string_type(v)
self._convert_string_type(k, name=True)
self._convert_string_type(v, name=False)
def _convert_string_type(self, value):
def _convert_string_type(self, value, *, name):
"""Convert/check value type."""
if type(value) is str:
regex = (_name_disallowed_re if name else _value_disallowed_re)
if regex.search(value):
raise ValueError("Control characters not allowed in headers")
return value
raise AssertionError("Header names/values must be"
" of type str (got {0})".format(repr(value)))
@@ -53,14 +61,14 @@ class Headers:
"""Set the value of a header."""
del self[name]
self._headers.append(
(self._convert_string_type(name), self._convert_string_type(val)))
(self._convert_string_type(name, name=True), self._convert_string_type(val, name=False)))
def __delitem__(self,name):
"""Delete all occurrences of a header, if present.
Does *not* raise an exception if the header is missing.
"""
name = self._convert_string_type(name.lower())
name = self._convert_string_type(name.lower(), name=True)
self._headers[:] = [kv for kv in self._headers if kv[0].lower() != name]
def __getitem__(self,name):
@@ -87,13 +95,13 @@ class Headers:
fields deleted and re-inserted are always appended to the header list.
If no fields exist with the given name, returns an empty list.
"""
name = self._convert_string_type(name.lower())
name = self._convert_string_type(name.lower(), name=True)
return [kv[1] for kv in self._headers if kv[0].lower()==name]
def get(self,name,default=None):
"""Get the first header value for 'name', or return 'default'"""
name = self._convert_string_type(name.lower())
name = self._convert_string_type(name.lower(), name=True)
for k,v in self._headers:
if k.lower()==name:
return v
@@ -148,8 +156,8 @@ class Headers:
and value 'value'."""
result = self.get(name)
if result is None:
self._headers.append((self._convert_string_type(name),
self._convert_string_type(value)))
self._headers.append((self._convert_string_type(name, name=True),
self._convert_string_type(value, name=False)))
return value
else:
return result
@@ -172,13 +180,13 @@ class Headers:
"""
parts = []
if _value is not None:
_value = self._convert_string_type(_value)
_value = self._convert_string_type(_value, name=False)
parts.append(_value)
for k, v in _params.items():
k = self._convert_string_type(k)
k = self._convert_string_type(k, name=True)
if v is None:
parts.append(k.replace('_', '-'))
else:
v = self._convert_string_type(v)
v = self._convert_string_type(v, name=False)
parts.append(_formatparam(k.replace('_', '-'), v))
self._headers.append((self._convert_string_type(_name), "; ".join(parts)))
self._headers.append((self._convert_string_type(_name, name=True), "; ".join(parts)))