mirror of
https://github.com/RustPython/RustPython.git
synced 2026-06-02 19:39:49 +09:00
Merge pull request #2678 from fanninpm/test-urllib
Add test_urllib from CPython 3.8
This commit is contained in:
@@ -53,12 +53,9 @@ parse_strict_test_cases = [
|
||||
("", ValueError("bad query field: ''")),
|
||||
("&", ValueError("bad query field: ''")),
|
||||
("&&", ValueError("bad query field: ''")),
|
||||
(";", ValueError("bad query field: ''")),
|
||||
(";&;", ValueError("bad query field: ''")),
|
||||
# Should the next few really be valid?
|
||||
("=", {}),
|
||||
("=&=", {}),
|
||||
("=;=", {}),
|
||||
# This rest seem to make sense
|
||||
("=a", {'': ['a']}),
|
||||
("&=a", ValueError("bad query field: ''")),
|
||||
@@ -73,8 +70,6 @@ parse_strict_test_cases = [
|
||||
("a=a+b&b=b+c", {'a': ['a b'], 'b': ['b c']}),
|
||||
("a=a+b&a=b+a", {'a': ['a b', 'b a']}),
|
||||
("x=1&y=2.0&z=2-3.%2b0", {'x': ['1'], 'y': ['2.0'], 'z': ['2-3.+0']}),
|
||||
("x=1;y=2.0&z=2-3.%2b0", {'x': ['1'], 'y': ['2.0'], 'z': ['2-3.+0']}),
|
||||
("x=1;y=2.0;z=2-3.%2b0", {'x': ['1'], 'y': ['2.0'], 'z': ['2-3.+0']}),
|
||||
("Hbc5161168c542333633315dee1182227:key_store_seqid=400006&cuyer=r&view=bustomer&order_id=0bb2e248638833d48cb7fed300000f1b&expire=964546263&lobale=en-US&kid=130003.300038&ss=env",
|
||||
{'Hbc5161168c542333633315dee1182227:key_store_seqid': ['400006'],
|
||||
'cuyer': ['r'],
|
||||
@@ -128,6 +123,22 @@ class CgiTests(unittest.TestCase):
|
||||
'file': [b'Testing 123.\n'], 'title': ['']}
|
||||
self.assertEqual(result, expected)
|
||||
|
||||
# TODO: RUSTPYTHON
|
||||
@unittest.expectedFailure
|
||||
def test_parse_multipart_without_content_length(self):
|
||||
POSTDATA = '''--JfISa01
|
||||
Content-Disposition: form-data; name="submit-name"
|
||||
|
||||
just a string
|
||||
|
||||
--JfISa01--
|
||||
'''
|
||||
fp = BytesIO(POSTDATA.encode('latin1'))
|
||||
env = {'boundary': 'JfISa01'.encode('latin1')}
|
||||
result = cgi.parse_multipart(fp, env)
|
||||
expected = {'submit-name': ['just a string\n']}
|
||||
self.assertEqual(result, expected)
|
||||
|
||||
# TODO RUSTPYTHON - see https://github.com/RustPython/RustPython/issues/935
|
||||
@unittest.expectedFailure
|
||||
def test_parse_multipart_invalid_encoding(self):
|
||||
@@ -189,6 +200,32 @@ Content-Length: 3
|
||||
else:
|
||||
self.assertEqual(fs.getvalue(key), expect_val[0])
|
||||
|
||||
# TODO: RUSTPYTHON
|
||||
@unittest.expectedFailure
|
||||
def test_separator(self):
|
||||
parse_semicolon = [
|
||||
("x=1;y=2.0", {'x': ['1'], 'y': ['2.0']}),
|
||||
("x=1;y=2.0;z=2-3.%2b0", {'x': ['1'], 'y': ['2.0'], 'z': ['2-3.+0']}),
|
||||
(";", ValueError("bad query field: ''")),
|
||||
(";;", ValueError("bad query field: ''")),
|
||||
("=;a", ValueError("bad query field: 'a'")),
|
||||
(";b=a", ValueError("bad query field: ''")),
|
||||
("b;=a", ValueError("bad query field: 'b'")),
|
||||
("a=a+b;b=b+c", {'a': ['a b'], 'b': ['b c']}),
|
||||
("a=a+b;a=b+a", {'a': ['a b', 'b a']}),
|
||||
]
|
||||
for orig, expect in parse_semicolon:
|
||||
env = {'QUERY_STRING': orig}
|
||||
fs = cgi.FieldStorage(separator=';', environ=env)
|
||||
if isinstance(expect, dict):
|
||||
for key in expect.keys():
|
||||
expect_val = expect[key]
|
||||
self.assertIn(key, fs)
|
||||
if len(expect_val) > 1:
|
||||
self.assertEqual(fs.getvalue(key), expect_val)
|
||||
else:
|
||||
self.assertEqual(fs.getvalue(key), expect_val[0])
|
||||
|
||||
def test_log(self):
|
||||
cgi.log("Testing")
|
||||
|
||||
|
||||
1745
Lib/test/test_urllib.py
Normal file
1745
Lib/test/test_urllib.py
Normal file
File diff suppressed because it is too large
Load Diff
@@ -32,16 +32,10 @@ parse_qsl_test_cases = [
|
||||
(b"&a=b", [(b'a', b'b')]),
|
||||
(b"a=a+b&b=b+c", [(b'a', b'a b'), (b'b', b'b c')]),
|
||||
(b"a=1&a=2", [(b'a', b'1'), (b'a', b'2')]),
|
||||
(";", []),
|
||||
(";;", []),
|
||||
(";a=b", [('a', 'b')]),
|
||||
("a=a+b;b=b+c", [('a', 'a b'), ('b', 'b c')]),
|
||||
("a=1;a=2", [('a', '1'), ('a', '2')]),
|
||||
(b";", []),
|
||||
(b";;", []),
|
||||
(b";a=b", [(b'a', b'b')]),
|
||||
(b"a=a+b;b=b+c", [(b'a', b'a b'), (b'b', b'b c')]),
|
||||
(b"a=1;a=2", [(b'a', b'1'), (b'a', b'2')]),
|
||||
(";a=b", [(';a', 'b')]),
|
||||
("a=a+b;b=b+c", [('a', 'a b;b=b c')]),
|
||||
(b";a=b", [(b';a', b'b')]),
|
||||
(b"a=a+b;b=b+c", [(b'a', b'a b;b=b c')]),
|
||||
]
|
||||
|
||||
# Each parse_qs testcase is a two-tuple that contains
|
||||
@@ -68,16 +62,10 @@ parse_qs_test_cases = [
|
||||
(b"&a=b", {b'a': [b'b']}),
|
||||
(b"a=a+b&b=b+c", {b'a': [b'a b'], b'b': [b'b c']}),
|
||||
(b"a=1&a=2", {b'a': [b'1', b'2']}),
|
||||
(";", {}),
|
||||
(";;", {}),
|
||||
(";a=b", {'a': ['b']}),
|
||||
("a=a+b;b=b+c", {'a': ['a b'], 'b': ['b c']}),
|
||||
("a=1;a=2", {'a': ['1', '2']}),
|
||||
(b";", {}),
|
||||
(b";;", {}),
|
||||
(b";a=b", {b'a': [b'b']}),
|
||||
(b"a=a+b;b=b+c", {b'a': [b'a b'], b'b': [b'b c']}),
|
||||
(b"a=1;a=2", {b'a': [b'1', b'2']}),
|
||||
(";a=b", {';a': ['b']}),
|
||||
("a=a+b;b=b+c", {'a': ['a b;b=b c']}),
|
||||
(b";a=b", {b';a': [b'b']}),
|
||||
(b"a=a+b;b=b+c", {b'a':[ b'a b;b=b c']}),
|
||||
]
|
||||
|
||||
class UrlParseTestCase(unittest.TestCase):
|
||||
@@ -624,6 +612,54 @@ class UrlParseTestCase(unittest.TestCase):
|
||||
with self.assertRaisesRegex(ValueError, "out of range"):
|
||||
p.port
|
||||
|
||||
def test_urlsplit_remove_unsafe_bytes(self):
|
||||
# Remove ASCII tabs and newlines from input, for http common case scenario.
|
||||
url = "h\nttp://www.python\n.org\t/java\nscript:\talert('msg\r\n')/?query\n=\tsomething#frag\nment"
|
||||
p = urllib.parse.urlsplit(url)
|
||||
self.assertEqual(p.scheme, "http")
|
||||
self.assertEqual(p.netloc, "www.python.org")
|
||||
self.assertEqual(p.path, "/javascript:alert('msg')/")
|
||||
self.assertEqual(p.query, "query=something")
|
||||
self.assertEqual(p.fragment, "fragment")
|
||||
self.assertEqual(p.username, None)
|
||||
self.assertEqual(p.password, None)
|
||||
self.assertEqual(p.hostname, "www.python.org")
|
||||
self.assertEqual(p.port, None)
|
||||
self.assertEqual(p.geturl(), "http://www.python.org/javascript:alert('msg')/?query=something#fragment")
|
||||
|
||||
# Remove ASCII tabs and newlines from input as bytes, for http common case scenario.
|
||||
url = b"h\nttp://www.python\n.org\t/java\nscript:\talert('msg\r\n')/?query\n=\tsomething#frag\nment"
|
||||
p = urllib.parse.urlsplit(url)
|
||||
self.assertEqual(p.scheme, b"http")
|
||||
self.assertEqual(p.netloc, b"www.python.org")
|
||||
self.assertEqual(p.path, b"/javascript:alert('msg')/")
|
||||
self.assertEqual(p.query, b"query=something")
|
||||
self.assertEqual(p.fragment, b"fragment")
|
||||
self.assertEqual(p.username, None)
|
||||
self.assertEqual(p.password, None)
|
||||
self.assertEqual(p.hostname, b"www.python.org")
|
||||
self.assertEqual(p.port, None)
|
||||
self.assertEqual(p.geturl(), b"http://www.python.org/javascript:alert('msg')/?query=something#fragment")
|
||||
|
||||
# any scheme
|
||||
url = "x-new-scheme\t://www.python\n.org\t/java\nscript:\talert('msg\r\n')/?query\n=\tsomething#frag\nment"
|
||||
p = urllib.parse.urlsplit(url)
|
||||
self.assertEqual(p.geturl(), "x-new-scheme://www.python.org/javascript:alert('msg')/?query=something#fragment")
|
||||
|
||||
# Remove ASCII tabs and newlines from input as bytes, any scheme.
|
||||
url = b"x-new-scheme\t://www.python\n.org\t/java\nscript:\talert('msg\r\n')/?query\n=\tsomething#frag\nment"
|
||||
p = urllib.parse.urlsplit(url)
|
||||
self.assertEqual(p.geturl(), b"x-new-scheme://www.python.org/javascript:alert('msg')/?query=something#fragment")
|
||||
|
||||
# Unsafe bytes is not returned from urlparse cache.
|
||||
# scheme is stored after parsing, sending an scheme with unsafe bytes *will not* return an unsafe scheme
|
||||
url = "https://www.python\n.org\t/java\nscript:\talert('msg\r\n')/?query\n=\tsomething#frag\nment"
|
||||
scheme = "htt\nps"
|
||||
for _ in range(2):
|
||||
p = urllib.parse.urlsplit(url, scheme=scheme)
|
||||
self.assertEqual(p.scheme, "https")
|
||||
self.assertEqual(p.geturl(), "https://www.python.org/javascript:alert('msg')/?query=something#fragment")
|
||||
|
||||
def test_attributes_bad_port(self):
|
||||
"""Check handling of invalid ports."""
|
||||
for bytes in (False, True):
|
||||
@@ -884,10 +920,50 @@ class UrlParseTestCase(unittest.TestCase):
|
||||
def test_parse_qsl_max_num_fields(self):
|
||||
with self.assertRaises(ValueError):
|
||||
urllib.parse.parse_qs('&'.join(['a=a']*11), max_num_fields=10)
|
||||
with self.assertRaises(ValueError):
|
||||
urllib.parse.parse_qs(';'.join(['a=a']*11), max_num_fields=10)
|
||||
urllib.parse.parse_qs('&'.join(['a=a']*10), max_num_fields=10)
|
||||
|
||||
def test_parse_qs_separator(self):
|
||||
parse_qs_semicolon_cases = [
|
||||
(";", {}),
|
||||
(";;", {}),
|
||||
(";a=b", {'a': ['b']}),
|
||||
("a=a+b;b=b+c", {'a': ['a b'], 'b': ['b c']}),
|
||||
("a=1;a=2", {'a': ['1', '2']}),
|
||||
(b";", {}),
|
||||
(b";;", {}),
|
||||
(b";a=b", {b'a': [b'b']}),
|
||||
(b"a=a+b;b=b+c", {b'a': [b'a b'], b'b': [b'b c']}),
|
||||
(b"a=1;a=2", {b'a': [b'1', b'2']}),
|
||||
]
|
||||
for orig, expect in parse_qs_semicolon_cases:
|
||||
with self.subTest(f"Original: {orig!r}, Expected: {expect!r}"):
|
||||
result = urllib.parse.parse_qs(orig, separator=';')
|
||||
self.assertEqual(result, expect, "Error parsing %r" % orig)
|
||||
result_bytes = urllib.parse.parse_qs(orig, separator=b';')
|
||||
self.assertEqual(result_bytes, expect, "Error parsing %r" % orig)
|
||||
|
||||
|
||||
def test_parse_qsl_separator(self):
|
||||
parse_qsl_semicolon_cases = [
|
||||
(";", []),
|
||||
(";;", []),
|
||||
(";a=b", [('a', 'b')]),
|
||||
("a=a+b;b=b+c", [('a', 'a b'), ('b', 'b c')]),
|
||||
("a=1;a=2", [('a', '1'), ('a', '2')]),
|
||||
(b";", []),
|
||||
(b";;", []),
|
||||
(b";a=b", [(b'a', b'b')]),
|
||||
(b"a=a+b;b=b+c", [(b'a', b'a b'), (b'b', b'b c')]),
|
||||
(b"a=1;a=2", [(b'a', b'1'), (b'a', b'2')]),
|
||||
]
|
||||
for orig, expect in parse_qsl_semicolon_cases:
|
||||
with self.subTest(f"Original: {orig!r}, Expected: {expect!r}"):
|
||||
result = urllib.parse.parse_qsl(orig, separator=';')
|
||||
self.assertEqual(result, expect, "Error parsing %r" % orig)
|
||||
result_bytes = urllib.parse.parse_qsl(orig, separator=b';')
|
||||
self.assertEqual(result_bytes, expect, "Error parsing %r" % orig)
|
||||
|
||||
|
||||
def test_urlencode_sequences(self):
|
||||
# Other tests incidentally urlencode things; test non-covered cases:
|
||||
# Sequence and object values.
|
||||
|
||||
@@ -77,6 +77,9 @@ scheme_chars = ('abcdefghijklmnopqrstuvwxyz'
|
||||
'0123456789'
|
||||
'+-.')
|
||||
|
||||
# Unsafe bytes to be removed per WHATWG spec
|
||||
_UNSAFE_URL_BYTES_TO_REMOVE = ['\t', '\r', '\n']
|
||||
|
||||
# XXX: Consider replacing with functools.lru_cache
|
||||
MAX_CACHE_SIZE = 20
|
||||
_parse_cache = {}
|
||||
@@ -414,6 +417,11 @@ def _checknetloc(netloc):
|
||||
raise ValueError("netloc '" + netloc + "' contains invalid " +
|
||||
"characters under NFKC normalization")
|
||||
|
||||
def _remove_unsafe_bytes_from_url(url):
|
||||
for b in _UNSAFE_URL_BYTES_TO_REMOVE:
|
||||
url = url.replace(b, "")
|
||||
return url
|
||||
|
||||
def urlsplit(url, scheme='', allow_fragments=True):
|
||||
"""Parse a URL into 5 components:
|
||||
<scheme>://<netloc>/<path>?<query>#<fragment>
|
||||
@@ -421,6 +429,8 @@ def urlsplit(url, scheme='', allow_fragments=True):
|
||||
Note that we don't break the components up in smaller bits
|
||||
(e.g. netloc is a single string) and we don't expand % escapes."""
|
||||
url, scheme, _coerce_result = _coerce_args(url, scheme)
|
||||
url = _remove_unsafe_bytes_from_url(url)
|
||||
scheme = _remove_unsafe_bytes_from_url(scheme)
|
||||
allow_fragments = bool(allow_fragments)
|
||||
key = url, scheme, allow_fragments, type(url), type(scheme)
|
||||
cached = _parse_cache.get(key, None)
|
||||
@@ -631,6 +641,8 @@ def unquote(string, encoding='utf-8', errors='replace'):
|
||||
|
||||
unquote('abc%20def') -> 'abc def'.
|
||||
"""
|
||||
if isinstance(string, bytes):
|
||||
raise TypeError('Expected str, got bytes')
|
||||
if '%' not in string:
|
||||
string.split
|
||||
return string
|
||||
@@ -648,7 +660,7 @@ def unquote(string, encoding='utf-8', errors='replace'):
|
||||
|
||||
|
||||
def parse_qs(qs, keep_blank_values=False, strict_parsing=False,
|
||||
encoding='utf-8', errors='replace', max_num_fields=None):
|
||||
encoding='utf-8', errors='replace', max_num_fields=None, separator='&'):
|
||||
"""Parse a query given as a string argument.
|
||||
|
||||
Arguments:
|
||||
@@ -672,12 +684,15 @@ def parse_qs(qs, keep_blank_values=False, strict_parsing=False,
|
||||
max_num_fields: int. If set, then throws a ValueError if there
|
||||
are more than n fields read by parse_qsl().
|
||||
|
||||
separator: str. The symbol to use for separating the query arguments.
|
||||
Defaults to &.
|
||||
|
||||
Returns a dictionary.
|
||||
"""
|
||||
parsed_result = {}
|
||||
pairs = parse_qsl(qs, keep_blank_values, strict_parsing,
|
||||
encoding=encoding, errors=errors,
|
||||
max_num_fields=max_num_fields)
|
||||
max_num_fields=max_num_fields, separator=separator)
|
||||
for name, value in pairs:
|
||||
if name in parsed_result:
|
||||
parsed_result[name].append(value)
|
||||
@@ -687,7 +702,7 @@ def parse_qs(qs, keep_blank_values=False, strict_parsing=False,
|
||||
|
||||
|
||||
def parse_qsl(qs, keep_blank_values=False, strict_parsing=False,
|
||||
encoding='utf-8', errors='replace', max_num_fields=None):
|
||||
encoding='utf-8', errors='replace', max_num_fields=None, separator='&'):
|
||||
"""Parse a query given as a string argument.
|
||||
|
||||
Arguments:
|
||||
@@ -710,19 +725,26 @@ def parse_qsl(qs, keep_blank_values=False, strict_parsing=False,
|
||||
max_num_fields: int. If set, then throws a ValueError
|
||||
if there are more than n fields read by parse_qsl().
|
||||
|
||||
separator: str. The symbol to use for separating the query arguments.
|
||||
Defaults to &.
|
||||
|
||||
Returns a list, as G-d intended.
|
||||
"""
|
||||
qs, _coerce_result = _coerce_args(qs)
|
||||
separator, _ = _coerce_args(separator)
|
||||
|
||||
if not separator or (not isinstance(separator, (str, bytes))):
|
||||
raise ValueError("Separator must be of type string or bytes.")
|
||||
|
||||
# If max_num_fields is defined then check that the number of fields
|
||||
# is less than max_num_fields. This prevents a memory exhaustion DOS
|
||||
# attack via post bodies with many fields.
|
||||
if max_num_fields is not None:
|
||||
num_fields = 1 + qs.count('&') + qs.count(';')
|
||||
num_fields = 1 + qs.count(separator)
|
||||
if max_num_fields < num_fields:
|
||||
raise ValueError('Max number of fields exceeded')
|
||||
|
||||
pairs = [s2 for s1 in qs.split('&') for s2 in s1.split(';')]
|
||||
pairs = [s1 for s1 in qs.split(separator)]
|
||||
r = []
|
||||
for name_value in pairs:
|
||||
if not name_value and not strict_parsing:
|
||||
|
||||
@@ -779,7 +779,11 @@ def _parse_proxy(proxy):
|
||||
raise ValueError("proxy URL with no authority: %r" % proxy)
|
||||
# We have an authority, so for RFC 3986-compliant URLs (by ss 3.
|
||||
# and 3.3.), path is empty or starts with '/'
|
||||
end = r_scheme.find("/", 2)
|
||||
if '@' in r_scheme:
|
||||
host_separator = r_scheme.find('@')
|
||||
end = r_scheme.find("/", host_separator)
|
||||
else:
|
||||
end = r_scheme.find("/", 2)
|
||||
if end == -1:
|
||||
end = None
|
||||
authority = r_scheme[2:end]
|
||||
@@ -947,7 +951,7 @@ class AbstractBasicAuthHandler:
|
||||
# (single quotes are a violation of the RFC, but appear in the wild)
|
||||
rx = re.compile('(?:^|,)' # start of the string or ','
|
||||
'[ \t]*' # optional whitespaces
|
||||
'([^ \t]+)' # scheme like "Basic"
|
||||
'([^ \t,]+)' # scheme like "Basic"
|
||||
'[ \t]+' # mandatory whitespaces
|
||||
# realm=xxx
|
||||
# realm='xxx'
|
||||
@@ -2604,6 +2608,11 @@ def _proxy_bypass_macosx_sysconf(host, proxy_settings):
|
||||
mask = 8 * (m.group(1).count('.') + 1)
|
||||
else:
|
||||
mask = int(mask[1:])
|
||||
|
||||
if mask < 0 or mask > 32:
|
||||
# System libraries ignore invalid prefix lengths
|
||||
continue
|
||||
|
||||
mask = 32 - mask
|
||||
|
||||
if (hostIP >> mask) == (base >> mask):
|
||||
|
||||
Reference in New Issue
Block a user