Update http from 3.14.3 (#7137)

* Update `http` from 3.14.3

* Reapply patch

* Update `test/certdata` from 3.14.3

* Revert "Update `test/certdata` from 3.14.3"

This reverts commit fa8fb388b3.

* Update `test_httpservers.py`

* Reapply long test patch

* Mark failing tests

* Skip flaky test

* Allow password to be None

* Unmark passing test

* Fix error message

* Clippy

---------

Co-authored-by: Jeong, YunWon <69878+youknowone@users.noreply.github.com>
This commit is contained in:
Shahar Naveh
2026-02-27 06:44:37 +01:00
committed by GitHub
parent 684001ffa2
commit 4c2537010d
10 changed files with 395 additions and 91 deletions

41
Lib/http/__init__.py vendored
View File

@@ -54,8 +54,9 @@ class HTTPStatus:
CONTINUE = 100, 'Continue', 'Request received, please continue'
SWITCHING_PROTOCOLS = (101, 'Switching Protocols',
'Switching to new protocol; obey Upgrade header')
PROCESSING = 102, 'Processing'
EARLY_HINTS = 103, 'Early Hints'
PROCESSING = 102, 'Processing', 'Server is processing the request'
EARLY_HINTS = (103, 'Early Hints',
'Headers sent to prepare for the response')
# success
OK = 200, 'OK', 'Request fulfilled, document follows'
@@ -67,9 +68,11 @@ class HTTPStatus:
NO_CONTENT = 204, 'No Content', 'Request fulfilled, nothing follows'
RESET_CONTENT = 205, 'Reset Content', 'Clear input form for further input'
PARTIAL_CONTENT = 206, 'Partial Content', 'Partial content follows'
MULTI_STATUS = 207, 'Multi-Status'
ALREADY_REPORTED = 208, 'Already Reported'
IM_USED = 226, 'IM Used'
MULTI_STATUS = (207, 'Multi-Status',
'Response contains multiple statuses in the body')
ALREADY_REPORTED = (208, 'Already Reported',
'Operation has already been reported')
IM_USED = 226, 'IM Used', 'Request completed using instance manipulations'
# redirection
MULTIPLE_CHOICES = (300, 'Multiple Choices',
@@ -128,15 +131,19 @@ class HTTPStatus:
EXPECTATION_FAILED = (417, 'Expectation Failed',
'Expect condition could not be satisfied')
IM_A_TEAPOT = (418, 'I\'m a Teapot',
'Server refuses to brew coffee because it is a teapot.')
'Server refuses to brew coffee because it is a teapot')
MISDIRECTED_REQUEST = (421, 'Misdirected Request',
'Server is not able to produce a response')
UNPROCESSABLE_CONTENT = 422, 'Unprocessable Content'
UNPROCESSABLE_CONTENT = (422, 'Unprocessable Content',
'Server is not able to process the contained instructions')
UNPROCESSABLE_ENTITY = UNPROCESSABLE_CONTENT
LOCKED = 423, 'Locked'
FAILED_DEPENDENCY = 424, 'Failed Dependency'
TOO_EARLY = 425, 'Too Early'
UPGRADE_REQUIRED = 426, 'Upgrade Required'
LOCKED = 423, 'Locked', 'Resource of a method is locked'
FAILED_DEPENDENCY = (424, 'Failed Dependency',
'Dependent action of the request failed')
TOO_EARLY = (425, 'Too Early',
'Server refuses to process a request that might be replayed')
UPGRADE_REQUIRED = (426, 'Upgrade Required',
'Server refuses to perform the request using the current protocol')
PRECONDITION_REQUIRED = (428, 'Precondition Required',
'The origin server requires the request to be conditional')
TOO_MANY_REQUESTS = (429, 'Too Many Requests',
@@ -164,10 +171,14 @@ class HTTPStatus:
'The gateway server did not receive a timely response')
HTTP_VERSION_NOT_SUPPORTED = (505, 'HTTP Version Not Supported',
'Cannot fulfill request')
VARIANT_ALSO_NEGOTIATES = 506, 'Variant Also Negotiates'
INSUFFICIENT_STORAGE = 507, 'Insufficient Storage'
LOOP_DETECTED = 508, 'Loop Detected'
NOT_EXTENDED = 510, 'Not Extended'
VARIANT_ALSO_NEGOTIATES = (506, 'Variant Also Negotiates',
'Server has an internal configuration error')
INSUFFICIENT_STORAGE = (507, 'Insufficient Storage',
'Server is not able to store the representation')
LOOP_DETECTED = (508, 'Loop Detected',
'Server encountered an infinite loop while processing a request')
NOT_EXTENDED = (510, 'Not Extended',
'Request does not meet the resource access policy')
NETWORK_AUTHENTICATION_REQUIRED = (511,
'Network Authentication Required',
'The client needs to authenticate to gain network access')

10
Lib/http/client.py vendored
View File

@@ -1047,7 +1047,7 @@ class HTTPConnection:
response.close()
def send(self, data):
"""Send `data' to the server.
"""Send 'data' to the server.
``data`` can be a string object, a bytes object, an array object, a
file-like object that supports a .read() method, or an iterable object.
"""
@@ -1159,10 +1159,10 @@ class HTTPConnection:
skip_accept_encoding=False):
"""Send a request to the server.
`method' specifies an HTTP request method, e.g. 'GET'.
`url' specifies the object being requested, e.g. '/index.html'.
`skip_host' if True does not add automatically a 'Host:' header
`skip_accept_encoding' if True does not add automatically an
'method' specifies an HTTP request method, e.g. 'GET'.
'url' specifies the object being requested, e.g. '/index.html'.
'skip_host' if True does not add automatically a 'Host:' header
'skip_accept_encoding' if True does not add automatically an
'Accept-Encoding:' header
"""

View File

@@ -1987,7 +1987,7 @@ class MozillaCookieJar(FileCookieJar):
This class differs from CookieJar only in the format it uses to save and
load cookies to and from a file. This class uses the Mozilla/Netscape
`cookies.txt' format. curl and lynx use this file format, too.
'cookies.txt' format. curl and lynx use this file format, too.
Don't expect cookies saved while the browser is running to be noticed by
the browser (in fact, Mozilla on unix will overwrite your saved cookies if

33
Lib/http/cookies.py vendored
View File

@@ -87,9 +87,9 @@ within a string. Escaped quotation marks, nested semicolons, and other
such trickeries do not confuse it.
>>> C = cookies.SimpleCookie()
>>> C.load('keebler="E=everybody; L=\\"Loves\\"; fudge=\\012;";')
>>> C.load('keebler="E=everybody; L=\\"Loves\\"; fudge=;";')
>>> print(C)
Set-Cookie: keebler="E=everybody; L=\"Loves\"; fudge=\012;"
Set-Cookie: keebler="E=everybody; L=\"Loves\"; fudge=;"
Each element of the Cookie also supports all of the RFC 2109
Cookie attributes. Here's an example which sets the Path
@@ -170,6 +170,15 @@ _Translator.update({
})
_is_legal_key = re.compile('[%s]+' % re.escape(_LegalChars)).fullmatch
_control_character_re = re.compile(r'[\x00-\x1F\x7F]')
def _has_control_character(*val):
"""Detects control characters within a value.
Supports any type, as header values can be any type.
"""
return any(_control_character_re.search(str(v)) for v in val)
def _quote(str):
r"""Quote a string for use in a cookie header.
@@ -264,17 +273,19 @@ class Morsel(dict):
"httponly" : "HttpOnly",
"version" : "Version",
"samesite" : "SameSite",
"partitioned": "Partitioned",
}
_flags = {'secure', 'httponly'}
_reserved_defaults = dict.fromkeys(_reserved, "")
_flags = {'secure', 'httponly', 'partitioned'}
def __init__(self):
# Set defaults
self._key = self._value = self._coded_value = None
# Set default attributes
for key in self._reserved:
dict.__setitem__(self, key, "")
dict.update(self, self._reserved_defaults)
@property
def key(self):
@@ -292,12 +303,16 @@ class Morsel(dict):
K = K.lower()
if not K in self._reserved:
raise CookieError("Invalid attribute %r" % (K,))
if _has_control_character(K, V):
raise CookieError(f"Control characters are not allowed in cookies {K!r} {V!r}")
dict.__setitem__(self, K, V)
def setdefault(self, key, val=None):
key = key.lower()
if key not in self._reserved:
raise CookieError("Invalid attribute %r" % (key,))
if _has_control_character(key, val):
raise CookieError("Control characters are not allowed in cookies %r %r" % (key, val,))
return dict.setdefault(self, key, val)
def __eq__(self, morsel):
@@ -333,6 +348,9 @@ class Morsel(dict):
raise CookieError('Attempt to set a reserved key %r' % (key,))
if not _is_legal_key(key):
raise CookieError('Illegal key %r' % (key,))
if _has_control_character(key, val, coded_val):
raise CookieError(
"Control characters are not allowed in cookies %r %r %r" % (key, val, coded_val,))
# It's a good key, so save it.
self._key = key
@@ -486,7 +504,10 @@ class BaseCookie(dict):
result = []
items = sorted(self.items())
for key, value in items:
result.append(value.output(attrs, header))
value_output = value.output(attrs, header)
if _has_control_character(value_output):
raise CookieError("Control characters are not allowed in cookies")
result.append(value_output)
return sep.join(result)
__str__ = output

112
Lib/http/server.py vendored
View File

@@ -83,8 +83,10 @@ XXX To do:
__version__ = "0.6"
__all__ = [
"HTTPServer", "ThreadingHTTPServer", "BaseHTTPRequestHandler",
"SimpleHTTPRequestHandler", "CGIHTTPRequestHandler",
"HTTPServer", "ThreadingHTTPServer",
"HTTPSServer", "ThreadingHTTPSServer",
"BaseHTTPRequestHandler", "SimpleHTTPRequestHandler",
"CGIHTTPRequestHandler",
]
import copy
@@ -99,7 +101,7 @@ import os
import posixpath
import select
import shutil
import socket # For gethostbyaddr()
import socket
import socketserver
import sys
import time
@@ -114,6 +116,11 @@ DEFAULT_ERROR_MESSAGE = """\
<html lang="en">
<head>
<meta charset="utf-8">
<style type="text/css">
:root {
color-scheme: light dark;
}
</style>
<title>Error response</title>
</head>
<body>
@@ -133,7 +140,8 @@ _MIN_READ_BUF_SIZE = 1 << 20
class HTTPServer(socketserver.TCPServer):
allow_reuse_address = 1 # Seems to make sense in testing environment
allow_reuse_address = True # Seems to make sense in testing environment
allow_reuse_port = False
def server_bind(self):
"""Override server_bind to store the server name."""
@@ -147,6 +155,47 @@ class ThreadingHTTPServer(socketserver.ThreadingMixIn, HTTPServer):
daemon_threads = True
class HTTPSServer(HTTPServer):
def __init__(self, server_address, RequestHandlerClass,
bind_and_activate=True, *, certfile, keyfile=None,
password=None, alpn_protocols=None):
try:
import ssl
except ImportError:
raise RuntimeError("SSL module is missing; "
"HTTPS support is unavailable")
self.ssl = ssl
self.certfile = certfile
self.keyfile = keyfile
self.password = password
# Support by default HTTP/1.1
self.alpn_protocols = (
["http/1.1"] if alpn_protocols is None else alpn_protocols
)
super().__init__(server_address,
RequestHandlerClass,
bind_and_activate)
def server_activate(self):
"""Wrap the socket in SSLSocket."""
super().server_activate()
context = self._create_context()
self.socket = context.wrap_socket(self.socket, server_side=True)
def _create_context(self):
"""Create a secure SSL context."""
context = self.ssl.create_default_context(self.ssl.Purpose.CLIENT_AUTH)
context.load_cert_chain(self.certfile, self.keyfile, self.password)
context.set_alpn_protocols(self.alpn_protocols)
return context
class ThreadingHTTPSServer(socketserver.ThreadingMixIn, HTTPSServer):
daemon_threads = True
class BaseHTTPRequestHandler(socketserver.StreamRequestHandler):
"""HTTP request handler base class.
@@ -817,6 +866,7 @@ class SimpleHTTPRequestHandler(BaseHTTPRequestHandler):
r.append('<html lang="en">')
r.append('<head>')
r.append(f'<meta charset="{enc}">')
r.append('<style type="text/css">\n:root {\ncolor-scheme: light dark;\n}\n</style>')
r.append(f'<title>{title}</title>\n</head>')
r.append(f'<body>\n<h1>{title}</h1>')
r.append('<hr>\n<ul>')
@@ -1281,7 +1331,8 @@ def _get_best_family(*address):
def test(HandlerClass=BaseHTTPRequestHandler,
ServerClass=ThreadingHTTPServer,
protocol="HTTP/1.0", port=8000, bind=None):
protocol="HTTP/1.0", port=8000, bind=None,
tls_cert=None, tls_key=None, tls_password=None):
"""Test the HTTP request handler class.
This runs an HTTP server on port 8000 (or the port argument).
@@ -1289,12 +1340,20 @@ def test(HandlerClass=BaseHTTPRequestHandler,
"""
ServerClass.address_family, addr = _get_best_family(bind, port)
HandlerClass.protocol_version = protocol
with ServerClass(addr, HandlerClass) as httpd:
if tls_cert:
server = ServerClass(addr, HandlerClass, certfile=tls_cert,
keyfile=tls_key, password=tls_password)
else:
server = ServerClass(addr, HandlerClass)
with server as httpd:
host, port = httpd.socket.getsockname()[:2]
url_host = f'[{host}]' if ':' in host else host
protocol = 'HTTPS' if tls_cert else 'HTTP'
print(
f"Serving HTTP on {host} port {port} "
f"(http://{url_host}:{port}/) ..."
f"Serving {protocol} on {host} port {port} "
f"({protocol.lower()}://{url_host}:{port}/) ..."
)
try:
httpd.serve_forever()
@@ -1306,7 +1365,7 @@ if __name__ == '__main__':
import argparse
import contextlib
parser = argparse.ArgumentParser()
parser = argparse.ArgumentParser(color=True)
parser.add_argument('--cgi', action='store_true',
help='run as CGI server')
parser.add_argument('-b', '--bind', metavar='ADDRESS',
@@ -1319,17 +1378,38 @@ if __name__ == '__main__':
default='HTTP/1.0',
help='conform to this HTTP version '
'(default: %(default)s)')
parser.add_argument('--tls-cert', metavar='PATH',
help='path to the TLS certificate chain file')
parser.add_argument('--tls-key', metavar='PATH',
help='path to the TLS key file')
parser.add_argument('--tls-password-file', metavar='PATH',
help='path to the password file for the TLS key')
parser.add_argument('port', default=8000, type=int, nargs='?',
help='bind to this port '
'(default: %(default)s)')
args = parser.parse_args()
if not args.tls_cert and args.tls_key:
parser.error("--tls-key requires --tls-cert to be set")
tls_key_password = None
if args.tls_password_file:
if not args.tls_cert:
parser.error("--tls-password-file requires --tls-cert to be set")
try:
with open(args.tls_password_file, "r", encoding="utf-8") as f:
tls_key_password = f.read().strip()
except OSError as e:
parser.error(f"Failed to read TLS password file: {e}")
if args.cgi:
handler_class = CGIHTTPRequestHandler
else:
handler_class = SimpleHTTPRequestHandler
# ensure dual-stack is not disabled; ref #38907
class DualStackServer(ThreadingHTTPServer):
class DualStackServerMixin:
def server_bind(self):
# suppress exception when protocol is IPv4
@@ -1342,10 +1422,20 @@ if __name__ == '__main__':
self.RequestHandlerClass(request, client_address, self,
directory=args.directory)
class HTTPDualStackServer(DualStackServerMixin, ThreadingHTTPServer):
pass
class HTTPSDualStackServer(DualStackServerMixin, ThreadingHTTPSServer):
pass
ServerClass = HTTPSDualStackServer if args.tls_cert else HTTPDualStackServer
test(
HandlerClass=handler_class,
ServerClass=DualStackServer,
ServerClass=ServerClass,
port=args.port,
bind=args.bind,
protocol=args.protocol,
tls_cert=args.tls_cert,
tls_key=args.tls_key,
tls_password=tls_key_password,
)

View File

@@ -7,7 +7,6 @@ import re
from test import support
from test.support import os_helper
from test.support import warnings_helper
from test.support.testcase import ExtraAssertions
import time
import unittest
import urllib.request
@@ -222,11 +221,19 @@ class HeaderTests(unittest.TestCase):
expected = [[("expires", "01 Jan 2040 22:23:32 GMT"), ("version", "0")]]
self.assertEqual(parse_ns_headers([hdr]), expected)
def test_join_header_words(self):
joined = join_header_words([[("foo", None), ("bar", "baz")]])
self.assertEqual(joined, "foo; bar=baz")
self.assertEqual(join_header_words([[]]), "")
@support.subTests('src,expected', [
([[("foo", None), ("bar", "baz")]], "foo; bar=baz"),
(([]), ""),
(([[]]), ""),
(([[("a", "_")]]), "a=_"),
(([[("a", ";")]]), 'a=";"'),
([[("n", None), ("foo", "foo;_")], [("bar", "foo_bar")]],
'n; foo="foo;_", bar=foo_bar'),
([[("n", "m"), ("foo", None)], [("bar", "foo_bar")]],
'n=m; foo, bar=foo_bar'),
])
def test_join_header_words(self, src, expected):
self.assertEqual(join_header_words(src), expected)
@support.subTests('arg,expect', [
("foo", [[("foo", None)]]),
@@ -1427,7 +1434,7 @@ class CookieTests(unittest.TestCase):
self.assertIsNone(cookie.expires)
class LWPCookieTests(unittest.TestCase, ExtraAssertions):
class LWPCookieTests(unittest.TestCase):
# Tests taken from libwww-perl, with a few modifications and additions.
def test_netscape_example_1(self):

View File

@@ -6,10 +6,9 @@ import doctest
from http import cookies
import pickle
from test import support
from test.support.testcase import ExtraAssertions
class CookieTests(unittest.TestCase, ExtraAssertions):
class CookieTests(unittest.TestCase):
def test_basic(self):
cases = [
@@ -18,10 +17,10 @@ class CookieTests(unittest.TestCase, ExtraAssertions):
'repr': "<SimpleCookie: chips='ahoy' vienna='finger'>",
'output': 'Set-Cookie: chips=ahoy\nSet-Cookie: vienna=finger'},
{'data': 'keebler="E=mc2; L=\\"Loves\\"; fudge=\\012;"',
'dict': {'keebler' : 'E=mc2; L="Loves"; fudge=\012;'},
'repr': '''<SimpleCookie: keebler='E=mc2; L="Loves"; fudge=\\n;'>''',
'output': 'Set-Cookie: keebler="E=mc2; L=\\"Loves\\"; fudge=\\012;"'},
{'data': 'keebler="E=mc2; L=\\"Loves\\"; fudge=;"',
'dict': {'keebler' : 'E=mc2; L="Loves"; fudge=;'},
'repr': '''<SimpleCookie: keebler='E=mc2; L="Loves"; fudge=;'>''',
'output': 'Set-Cookie: keebler="E=mc2; L=\\"Loves\\"; fudge=;"'},
# Check illegal cookies that have an '=' char in an unquoted value
{'data': 'keebler=E=mc2',
@@ -132,8 +131,8 @@ class CookieTests(unittest.TestCase, ExtraAssertions):
@support.requires_resource('cpu')
def test_unquote_large(self):
#n = 10**6
n = 10**4 # XXX: RUSTPYTHON; This takes more than 10 minutes to run. lower to 4
# n = 10**6
n = 10**4 # XXX: RUSTPYTHON; This takes more than 10 minutes to run. lower to 4
for encoded in r'\\', r'\134':
with self.subTest(encoded):
data = 'a="b=' + encoded*n + ';"'
@@ -207,6 +206,14 @@ class CookieTests(unittest.TestCase, ExtraAssertions):
self.assertEqual(C.output(),
'Set-Cookie: Customer="WILE_E_COYOTE"; HttpOnly; Secure')
def test_set_secure_httponly_partitioned_attrs(self):
C = cookies.SimpleCookie('Customer="WILE_E_COYOTE"')
C['Customer']['secure'] = True
C['Customer']['httponly'] = True
C['Customer']['partitioned'] = True
self.assertEqual(C.output(),
'Set-Cookie: Customer="WILE_E_COYOTE"; HttpOnly; Partitioned; Secure')
def test_samesite_attrs(self):
samesite_values = ['Strict', 'Lax', 'strict', 'lax']
for val in samesite_values:
@@ -565,6 +572,50 @@ class MorselTests(unittest.TestCase):
r'Set-Cookie: key=coded_val; '
r'expires=\w+, \d+ \w+ \d+ \d+:\d+:\d+ \w+')
def test_control_characters(self):
for c0 in support.control_characters_c0():
morsel = cookies.Morsel()
# .__setitem__()
with self.assertRaises(cookies.CookieError):
morsel[c0] = "val"
with self.assertRaises(cookies.CookieError):
morsel["path"] = c0
# .setdefault()
with self.assertRaises(cookies.CookieError):
morsel.setdefault("path", c0)
with self.assertRaises(cookies.CookieError):
morsel.setdefault(c0, "val")
# .set()
with self.assertRaises(cookies.CookieError):
morsel.set(c0, "val", "coded-value")
with self.assertRaises(cookies.CookieError):
morsel.set("path", c0, "coded-value")
with self.assertRaises(cookies.CookieError):
morsel.set("path", "val", c0)
def test_control_characters_output(self):
# Tests that even if the internals of Morsel are modified
# that a call to .output() has control character safeguards.
for c0 in support.control_characters_c0():
morsel = cookies.Morsel()
morsel.set("key", "value", "coded-value")
morsel._key = c0 # Override private variable.
cookie = cookies.SimpleCookie()
cookie["cookie"] = morsel
with self.assertRaises(cookies.CookieError):
cookie.output()
morsel = cookies.Morsel()
morsel.set("key", "value", "coded-value")
morsel._coded_value = c0 # Override private variable.
cookie = cookies.SimpleCookie()
cookie["cookie"] = morsel
with self.assertRaises(cookies.CookieError):
cookie.output()
def load_tests(loader, tests, pattern):
tests.addTest(doctest.DocTestSuite(cookies))

View File

@@ -16,7 +16,6 @@ TestCase = unittest.TestCase
from test import support
from test.support import os_helper
from test.support import socket_helper
from test.support.testcase import ExtraAssertions
support.requires_working_socket(module=True)
@@ -135,7 +134,7 @@ class FakeSocketHTTPConnection(client.HTTPConnection):
def create_connection(self, *pos, **kw):
return FakeSocket(*self.fake_socket_args)
class HeaderTests(TestCase, ExtraAssertions):
class HeaderTests(TestCase):
def test_auto_headers(self):
# Some headers are added automatically, but should not be added by
# .request() if they are explicitly set.
@@ -538,7 +537,7 @@ class TransferEncodingTest(TestCase):
return b''.join(body)
class BasicTest(TestCase, ExtraAssertions):
class BasicTest(TestCase):
def test_dir_with_added_behavior_on_status(self):
# see issue40084
self.assertTrue({'description', 'name', 'phrase', 'value'} <= set(dir(HTTPStatus(404))))
@@ -595,8 +594,9 @@ class BasicTest(TestCase, ExtraAssertions):
CONTINUE = 100, 'Continue', 'Request received, please continue'
SWITCHING_PROTOCOLS = (101, 'Switching Protocols',
'Switching to new protocol; obey Upgrade header')
PROCESSING = 102, 'Processing'
EARLY_HINTS = 103, 'Early Hints'
PROCESSING = 102, 'Processing', 'Server is processing the request'
EARLY_HINTS = (103, 'Early Hints',
'Headers sent to prepare for the response')
# success
OK = 200, 'OK', 'Request fulfilled, document follows'
CREATED = 201, 'Created', 'Document created, URL follows'
@@ -607,9 +607,11 @@ class BasicTest(TestCase, ExtraAssertions):
NO_CONTENT = 204, 'No Content', 'Request fulfilled, nothing follows'
RESET_CONTENT = 205, 'Reset Content', 'Clear input form for further input'
PARTIAL_CONTENT = 206, 'Partial Content', 'Partial content follows'
MULTI_STATUS = 207, 'Multi-Status'
ALREADY_REPORTED = 208, 'Already Reported'
IM_USED = 226, 'IM Used'
MULTI_STATUS = (207, 'Multi-Status',
'Response contains multiple statuses in the body')
ALREADY_REPORTED = (208, 'Already Reported',
'Operation has already been reported')
IM_USED = 226, 'IM Used', 'Request completed using instance manipulations'
# redirection
MULTIPLE_CHOICES = (300, 'Multiple Choices',
'Object has several resources -- see URI list')
@@ -666,15 +668,19 @@ class BasicTest(TestCase, ExtraAssertions):
EXPECTATION_FAILED = (417, 'Expectation Failed',
'Expect condition could not be satisfied')
IM_A_TEAPOT = (418, 'I\'m a Teapot',
'Server refuses to brew coffee because it is a teapot.')
'Server refuses to brew coffee because it is a teapot')
MISDIRECTED_REQUEST = (421, 'Misdirected Request',
'Server is not able to produce a response')
UNPROCESSABLE_CONTENT = 422, 'Unprocessable Content'
UNPROCESSABLE_CONTENT = (422, 'Unprocessable Content',
'Server is not able to process the contained instructions')
UNPROCESSABLE_ENTITY = UNPROCESSABLE_CONTENT
LOCKED = 423, 'Locked'
FAILED_DEPENDENCY = 424, 'Failed Dependency'
TOO_EARLY = 425, 'Too Early'
UPGRADE_REQUIRED = 426, 'Upgrade Required'
LOCKED = 423, 'Locked', 'Resource of a method is locked'
FAILED_DEPENDENCY = (424, 'Failed Dependency',
'Dependent action of the request failed')
TOO_EARLY = (425, 'Too Early',
'Server refuses to process a request that might be replayed')
UPGRADE_REQUIRED = (426, 'Upgrade Required',
'Server refuses to perform the request using the current protocol')
PRECONDITION_REQUIRED = (428, 'Precondition Required',
'The origin server requires the request to be conditional')
TOO_MANY_REQUESTS = (429, 'Too Many Requests',
@@ -701,10 +707,14 @@ class BasicTest(TestCase, ExtraAssertions):
'The gateway server did not receive a timely response')
HTTP_VERSION_NOT_SUPPORTED = (505, 'HTTP Version Not Supported',
'Cannot fulfill request')
VARIANT_ALSO_NEGOTIATES = 506, 'Variant Also Negotiates'
INSUFFICIENT_STORAGE = 507, 'Insufficient Storage'
LOOP_DETECTED = 508, 'Loop Detected'
NOT_EXTENDED = 510, 'Not Extended'
VARIANT_ALSO_NEGOTIATES = (506, 'Variant Also Negotiates',
'Server has an internal configuration error')
INSUFFICIENT_STORAGE = (507, 'Insufficient Storage',
'Server is not able to store the representation')
LOOP_DETECTED = (508, 'Loop Detected',
'Server encountered an infinite loop while processing a request')
NOT_EXTENDED = (510, 'Not Extended',
'Request does not meet the resource access policy')
NETWORK_AUTHENTICATION_REQUIRED = (511,
'Network Authentication Required',
'The client needs to authenticate to gain network access')
@@ -1560,7 +1570,7 @@ class BasicTest(TestCase, ExtraAssertions):
conn.putrequest('GET', '/☃')
class ExtendedReadTest(TestCase, ExtraAssertions):
class ExtendedReadTest(TestCase):
"""
Test peek(), read1(), readline()
"""
@@ -1753,7 +1763,7 @@ class Readliner:
raise
class OfflineTest(TestCase, ExtraAssertions):
class OfflineTest(TestCase):
def test_all(self):
# Documented objects defined in the module should be in __all__
expected = {"responses"} # Allowlist documented dict() object
@@ -2111,6 +2121,7 @@ class HTTPSTest(TestCase):
self.addCleanup(resp.close)
self.assertEqual(resp.status, 404)
@unittest.skip("TODO: RUSTPYTHON; Flaky on CI")
def test_local_bad_hostname(self):
# The (valid) cert doesn't validate the HTTPS hostname
import ssl
@@ -2156,10 +2167,11 @@ class HTTPSTest(TestCase):
self.assertEqual(h, c.host)
self.assertEqual(p, c.port)
@unittest.expectedFailure # TODO: RUSTPYTHON
def test_tls13_pha(self):
import ssl
if not ssl.HAS_TLSv1_3:
self.skipTest('TLS 1.3 support required')
if not ssl.HAS_TLSv1_3 or not ssl.HAS_PHA:
self.skipTest('TLS 1.3 PHA support required')
# just check status of PHA flag
h = client.HTTPSConnection('localhost', 443)
self.assertTrue(h._context.post_handshake_auth)
@@ -2307,7 +2319,7 @@ class HTTPResponseTest(TestCase):
header = self.resp.getheader('No-Such-Header',default=42)
self.assertEqual(header, 42)
class TunnelTests(TestCase, ExtraAssertions):
class TunnelTests(TestCase):
def setUp(self):
response_text = (
'HTTP/1.1 200 OK\r\n\r\n' # Reply to CONNECT

View File

@@ -4,7 +4,7 @@ Written by Cody A.W. Somerville <cody-somerville@ubuntu.com>,
Josip Dzolonga, and Michael Otteneder for the 2007/08 GHOP contest.
"""
from collections import OrderedDict
from http.server import BaseHTTPRequestHandler, HTTPServer, \
from http.server import BaseHTTPRequestHandler, HTTPServer, HTTPSServer, \
SimpleHTTPRequestHandler, CGIHTTPRequestHandler
from http import server, HTTPStatus
@@ -32,9 +32,13 @@ from io import BytesIO, StringIO
import unittest
from test import support
from test.support import (
is_apple, os_helper, requires_subprocess, threading_helper
is_apple, import_helper, os_helper, requires_subprocess, threading_helper
)
from test.support.testcase import ExtraAssertions
try:
import ssl
except ImportError:
ssl = None
support.requires_working_socket(module=True)
@@ -47,14 +51,49 @@ class NoLogRequestHandler:
return ''
class DummyRequestHandler(NoLogRequestHandler, SimpleHTTPRequestHandler):
pass
def create_https_server(
certfile,
keyfile=None,
password=None,
*,
address=('localhost', 0),
request_handler=DummyRequestHandler,
):
return HTTPSServer(
address, request_handler,
certfile=certfile, keyfile=keyfile, password=password
)
class TestSSLDisabled(unittest.TestCase):
def test_https_server_raises_runtime_error(self):
with import_helper.isolated_modules():
sys.modules['ssl'] = None
certfile = certdata_file("keycert.pem")
with self.assertRaises(RuntimeError):
create_https_server(certfile)
class TestServerThread(threading.Thread):
def __init__(self, test_object, request_handler):
def __init__(self, test_object, request_handler, tls=None):
threading.Thread.__init__(self)
self.request_handler = request_handler
self.test_object = test_object
self.tls = tls
def run(self):
self.server = HTTPServer(('localhost', 0), self.request_handler)
if self.tls:
certfile, keyfile, password = self.tls
self.server = create_https_server(
certfile, keyfile, password,
request_handler=self.request_handler,
)
else:
self.server = HTTPServer(('localhost', 0), self.request_handler)
self.test_object.HOST, self.test_object.PORT = self.server.socket.getsockname()
self.test_object.server_started.set()
self.test_object = None
@@ -68,12 +107,16 @@ class TestServerThread(threading.Thread):
self.join()
class BaseTestCase(unittest.TestCase, ExtraAssertions):
class BaseTestCase(unittest.TestCase):
# Optional tuple (certfile, keyfile, password) to use for HTTPS servers.
tls = None
def setUp(self):
self._threads = threading_helper.threading_setup()
os.environ = os_helper.EnvironmentVarGuard()
self.server_started = threading.Event()
self.thread = TestServerThread(self, self.request_handler)
self.thread = TestServerThread(self, self.request_handler, self.tls)
self.thread.start()
self.server_started.wait()
@@ -355,6 +398,74 @@ class HTTP09ServerTestCase(BaseTestCase):
self.assertEqual(res, b'')
def certdata_file(*path):
return os.path.join(os.path.dirname(__file__), "certdata", *path)
@unittest.skipIf(ssl is None, "requires ssl")
class BaseHTTPSServerTestCase(BaseTestCase):
CERTFILE = certdata_file("keycert.pem")
ONLYCERT = certdata_file("ssl_cert.pem")
ONLYKEY = certdata_file("ssl_key.pem")
CERTFILE_PROTECTED = certdata_file("keycert.passwd.pem")
ONLYKEY_PROTECTED = certdata_file("ssl_key.passwd.pem")
EMPTYCERT = certdata_file("nullcert.pem")
BADCERT = certdata_file("badcert.pem")
KEY_PASSWORD = "somepass"
BADPASSWORD = "badpass"
tls = (ONLYCERT, ONLYKEY, None) # values by default
request_handler = DummyRequestHandler
def test_get(self):
response = self.request('/')
self.assertEqual(response.status, HTTPStatus.OK)
def request(self, uri, method='GET', body=None, headers={}):
context = ssl._create_unverified_context()
self.connection = http.client.HTTPSConnection(
self.HOST, self.PORT, context=context
)
self.connection.request(method, uri, body, headers)
return self.connection.getresponse()
def test_valid_certdata(self):
valid_certdata= [
(self.CERTFILE, None, None),
(self.CERTFILE, self.CERTFILE, None),
(self.CERTFILE_PROTECTED, None, self.KEY_PASSWORD),
(self.ONLYCERT, self.ONLYKEY_PROTECTED, self.KEY_PASSWORD),
]
for certfile, keyfile, password in valid_certdata:
with self.subTest(
certfile=certfile, keyfile=keyfile, password=password
):
server = create_https_server(certfile, keyfile, password)
self.assertIsInstance(server, HTTPSServer)
server.server_close()
def test_invalid_certdata(self):
invalid_certdata = [
(self.BADCERT, None, None),
(self.EMPTYCERT, None, None),
(self.ONLYCERT, None, None),
(self.ONLYKEY, None, None),
(self.ONLYKEY, self.ONLYCERT, None),
(self.CERTFILE_PROTECTED, None, self.BADPASSWORD),
# TODO: test the next case and add same case to test_ssl (We
# specify a cert and a password-protected file, but no password):
# (self.CERTFILE_PROTECTED, None, None),
# see issue #132102
]
for certfile, keyfile, password in invalid_certdata:
with self.subTest(
certfile=certfile, keyfile=keyfile, password=password
):
with self.assertRaises(ssl.SSLError):
create_https_server(certfile, keyfile, password)
class RequestHandlerLoggingTestCase(BaseTestCase):
class request_handler(BaseHTTPRequestHandler):
protocol_version = 'HTTP/1.1'
@@ -1153,7 +1264,7 @@ class AuditableBytesIO:
return len(self.datas)
class BaseHTTPRequestHandlerTestCase(unittest.TestCase, ExtraAssertions):
class BaseHTTPRequestHandlerTestCase(unittest.TestCase):
"""Test the functionality of the BaseHTTPServer.
Test the support for the Expect 100-continue header.

View File

@@ -2018,25 +2018,26 @@ mod _ssl {
) -> PyResult<(Option<String>, Option<PyObjectRef>)> {
match password {
OptionalArg::Present(p) => {
// Try string first
if vm.is_none(p) {
return Ok((None, None));
}
// Try string
if let Ok(pwd_str) = PyUtf8StrRef::try_from_object(vm, p.clone()) {
Ok((Some(pwd_str.as_str().to_owned()), None))
}
// Try bytes-like
else if let Ok(pwd_bytes_like) = ArgBytesLike::try_from_object(vm, p.clone())
{
let pwd = String::from_utf8(pwd_bytes_like.borrow_buf().to_vec()).map_err(
|_| vm.new_type_error("password bytes must be valid UTF-8".to_owned()),
)?;
let pwd = String::from_utf8(pwd_bytes_like.borrow_buf().to_vec())
.map_err(|_| vm.new_type_error("password bytes must be valid UTF-8"))?;
Ok((Some(pwd), None))
}
// Try callable
else if p.is_callable() {
Ok((None, Some(p.clone())))
} else {
Err(vm.new_type_error(
"password should be a string, bytes, or callable".to_owned(),
))
Err(vm.new_type_error("password should be a string or callable"))
}
}
_ => Ok((None, None)),