From 2924fe8633c31022745677af82d76fff23c8ba54 Mon Sep 17 00:00:00 2001 From: Padraic Fanning Date: Thu, 22 Apr 2021 20:59:55 -0400 Subject: [PATCH 1/6] Add test_urllib from CPython 3.8 --- Lib/test/test_urllib.py | 1726 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 1726 insertions(+) create mode 100644 Lib/test/test_urllib.py diff --git a/Lib/test/test_urllib.py b/Lib/test/test_urllib.py new file mode 100644 index 000000000..862668715 --- /dev/null +++ b/Lib/test/test_urllib.py @@ -0,0 +1,1726 @@ +"""Regression tests for what was in Python 2's "urllib" module""" + +import urllib.parse +import urllib.request +import urllib.error +import http.client +import email.message +import io +import unittest +from unittest.mock import patch +from test import support +import os +try: + import ssl +except ImportError: + ssl = None +import sys +import tempfile +from nturl2path import url2pathname, pathname2url + +from base64 import b64encode +import collections + + +def hexescape(char): + """Escape char as RFC 2396 specifies""" + hex_repr = hex(ord(char))[2:].upper() + if len(hex_repr) == 1: + hex_repr = "0%s" % hex_repr + return "%" + hex_repr + +# Shortcut for testing FancyURLopener +_urlopener = None + + +def urlopen(url, data=None, proxies=None): + """urlopen(url [, data]) -> open file-like object""" + global _urlopener + if proxies is not None: + opener = urllib.request.FancyURLopener(proxies=proxies) + elif not _urlopener: + opener = FancyURLopener() + _urlopener = opener + else: + opener = _urlopener + if data is None: + return opener.open(url) + else: + return opener.open(url, data) + + +def FancyURLopener(): + with support.check_warnings( + ('FancyURLopener style of invoking requests is deprecated.', + DeprecationWarning)): + return urllib.request.FancyURLopener() + + +def fakehttp(fakedata, mock_close=False): + class FakeSocket(io.BytesIO): + io_refs = 1 + + def sendall(self, data): + FakeHTTPConnection.buf = data + + def makefile(self, *args, **kwds): + self.io_refs += 1 + return self + + def read(self, amt=None): + if self.closed: + return b"" + return io.BytesIO.read(self, amt) + + def readline(self, length=None): + if self.closed: + return b"" + return io.BytesIO.readline(self, length) + + def close(self): + self.io_refs -= 1 + if self.io_refs == 0: + io.BytesIO.close(self) + + class FakeHTTPConnection(http.client.HTTPConnection): + + # buffer to store data for verification in urlopen tests. + buf = None + + def connect(self): + self.sock = FakeSocket(self.fakedata) + type(self).fakesock = self.sock + + if mock_close: + # bpo-36918: HTTPConnection destructor calls close() which calls + # flush(). Problem: flush() calls self.fp.flush() which raises + # "ValueError: I/O operation on closed file" which is logged as an + # "Exception ignored in". Override close() to silence this error. + def close(self): + pass + FakeHTTPConnection.fakedata = fakedata + + return FakeHTTPConnection + + +class FakeHTTPMixin(object): + def fakehttp(self, fakedata, mock_close=False): + fake_http_class = fakehttp(fakedata, mock_close=mock_close) + self._connection_class = http.client.HTTPConnection + http.client.HTTPConnection = fake_http_class + + def unfakehttp(self): + http.client.HTTPConnection = self._connection_class + + +class FakeFTPMixin(object): + def fakeftp(self): + class FakeFtpWrapper(object): + def __init__(self, user, passwd, host, port, dirs, timeout=None, + persistent=True): + pass + + def retrfile(self, file, type): + return io.BytesIO(), 0 + + def close(self): + pass + + self._ftpwrapper_class = urllib.request.ftpwrapper + urllib.request.ftpwrapper = FakeFtpWrapper + + def unfakeftp(self): + urllib.request.ftpwrapper = self._ftpwrapper_class + + +class urlopen_FileTests(unittest.TestCase): + """Test urlopen() opening a temporary file. + + Try to test as much functionality as possible so as to cut down on reliance + on connecting to the Net for testing. + + """ + + def setUp(self): + # Create a temp file to use for testing + self.text = bytes("test_urllib: %s\n" % self.__class__.__name__, + "ascii") + f = open(support.TESTFN, 'wb') + try: + f.write(self.text) + finally: + f.close() + self.pathname = support.TESTFN + self.returned_obj = urlopen("file:%s" % self.pathname) + + def tearDown(self): + """Shut down the open object""" + self.returned_obj.close() + os.remove(support.TESTFN) + + def test_interface(self): + # Make sure object returned by urlopen() has the specified methods + for attr in ("read", "readline", "readlines", "fileno", + "close", "info", "geturl", "getcode", "__iter__"): + self.assertTrue(hasattr(self.returned_obj, attr), + "object returned by urlopen() lacks %s attribute" % + attr) + + def test_read(self): + self.assertEqual(self.text, self.returned_obj.read()) + + def test_readline(self): + self.assertEqual(self.text, self.returned_obj.readline()) + self.assertEqual(b'', self.returned_obj.readline(), + "calling readline() after exhausting the file did not" + " return an empty string") + + def test_readlines(self): + lines_list = self.returned_obj.readlines() + self.assertEqual(len(lines_list), 1, + "readlines() returned the wrong number of lines") + self.assertEqual(lines_list[0], self.text, + "readlines() returned improper text") + + def test_fileno(self): + file_num = self.returned_obj.fileno() + self.assertIsInstance(file_num, int, "fileno() did not return an int") + self.assertEqual(os.read(file_num, len(self.text)), self.text, + "Reading on the file descriptor returned by fileno() " + "did not return the expected text") + + def test_close(self): + # Test close() by calling it here and then having it be called again + # by the tearDown() method for the test + self.returned_obj.close() + + def test_info(self): + self.assertIsInstance(self.returned_obj.info(), email.message.Message) + + def test_geturl(self): + self.assertEqual(self.returned_obj.geturl(), self.pathname) + + def test_getcode(self): + self.assertIsNone(self.returned_obj.getcode()) + + def test_iter(self): + # Test iterator + # Don't need to count number of iterations since test would fail the + # instant it returned anything beyond the first line from the + # comparison. + # Use the iterator in the usual implicit way to test for ticket #4608. + for line in self.returned_obj: + self.assertEqual(line, self.text) + + def test_relativelocalfile(self): + self.assertRaises(ValueError,urllib.request.urlopen,'./' + self.pathname) + + +class ProxyTests(unittest.TestCase): + + def setUp(self): + # Records changes to env vars + self.env = support.EnvironmentVarGuard() + # Delete all proxy related env vars + for k in list(os.environ): + if 'proxy' in k.lower(): + self.env.unset(k) + + def tearDown(self): + # Restore all proxy related env vars + self.env.__exit__() + del self.env + + def test_getproxies_environment_keep_no_proxies(self): + self.env.set('NO_PROXY', 'localhost') + proxies = urllib.request.getproxies_environment() + # getproxies_environment use lowered case truncated (no '_proxy') keys + self.assertEqual('localhost', proxies['no']) + # List of no_proxies with space. + self.env.set('NO_PROXY', 'localhost, anotherdomain.com, newdomain.com:1234') + self.assertTrue(urllib.request.proxy_bypass_environment('anotherdomain.com')) + self.assertTrue(urllib.request.proxy_bypass_environment('anotherdomain.com:8888')) + self.assertTrue(urllib.request.proxy_bypass_environment('newdomain.com:1234')) + + def test_proxy_cgi_ignore(self): + try: + self.env.set('HTTP_PROXY', 'http://somewhere:3128') + proxies = urllib.request.getproxies_environment() + self.assertEqual('http://somewhere:3128', proxies['http']) + self.env.set('REQUEST_METHOD', 'GET') + proxies = urllib.request.getproxies_environment() + self.assertNotIn('http', proxies) + finally: + self.env.unset('REQUEST_METHOD') + self.env.unset('HTTP_PROXY') + + def test_proxy_bypass_environment_host_match(self): + bypass = urllib.request.proxy_bypass_environment + self.env.set('NO_PROXY', + 'localhost, anotherdomain.com, newdomain.com:1234, .d.o.t') + self.assertTrue(bypass('localhost')) + self.assertTrue(bypass('LocalHost')) # MixedCase + self.assertTrue(bypass('LOCALHOST')) # UPPERCASE + self.assertTrue(bypass('.localhost')) + self.assertTrue(bypass('newdomain.com:1234')) + self.assertTrue(bypass('.newdomain.com:1234')) + self.assertTrue(bypass('foo.d.o.t')) # issue 29142 + self.assertTrue(bypass('d.o.t')) + self.assertTrue(bypass('anotherdomain.com:8888')) + self.assertTrue(bypass('.anotherdomain.com:8888')) + self.assertTrue(bypass('www.newdomain.com:1234')) + self.assertFalse(bypass('prelocalhost')) + self.assertFalse(bypass('newdomain.com')) # no port + self.assertFalse(bypass('newdomain.com:1235')) # wrong port + + def test_proxy_bypass_environment_always_match(self): + bypass = urllib.request.proxy_bypass_environment + self.env.set('NO_PROXY', '*') + self.assertTrue(bypass('newdomain.com')) + self.assertTrue(bypass('newdomain.com:1234')) + self.env.set('NO_PROXY', '*, anotherdomain.com') + self.assertTrue(bypass('anotherdomain.com')) + self.assertFalse(bypass('newdomain.com')) + self.assertFalse(bypass('newdomain.com:1234')) + + def test_proxy_bypass_environment_newline(self): + bypass = urllib.request.proxy_bypass_environment + self.env.set('NO_PROXY', + 'localhost, anotherdomain.com, newdomain.com:1234') + self.assertFalse(bypass('localhost\n')) + self.assertFalse(bypass('anotherdomain.com:8888\n')) + self.assertFalse(bypass('newdomain.com:1234\n')) + + +class ProxyTests_withOrderedEnv(unittest.TestCase): + + def setUp(self): + # We need to test conditions, where variable order _is_ significant + self._saved_env = os.environ + # Monkey patch os.environ, start with empty fake environment + os.environ = collections.OrderedDict() + + def tearDown(self): + os.environ = self._saved_env + + def test_getproxies_environment_prefer_lowercase(self): + # Test lowercase preference with removal + os.environ['no_proxy'] = '' + os.environ['No_Proxy'] = 'localhost' + self.assertFalse(urllib.request.proxy_bypass_environment('localhost')) + self.assertFalse(urllib.request.proxy_bypass_environment('arbitrary')) + os.environ['http_proxy'] = '' + os.environ['HTTP_PROXY'] = 'http://somewhere:3128' + proxies = urllib.request.getproxies_environment() + self.assertEqual({}, proxies) + # Test lowercase preference of proxy bypass and correct matching including ports + os.environ['no_proxy'] = 'localhost, noproxy.com, my.proxy:1234' + os.environ['No_Proxy'] = 'xyz.com' + self.assertTrue(urllib.request.proxy_bypass_environment('localhost')) + self.assertTrue(urllib.request.proxy_bypass_environment('noproxy.com:5678')) + self.assertTrue(urllib.request.proxy_bypass_environment('my.proxy:1234')) + self.assertFalse(urllib.request.proxy_bypass_environment('my.proxy')) + self.assertFalse(urllib.request.proxy_bypass_environment('arbitrary')) + # Test lowercase preference with replacement + os.environ['http_proxy'] = 'http://somewhere:3128' + os.environ['Http_Proxy'] = 'http://somewhereelse:3128' + proxies = urllib.request.getproxies_environment() + self.assertEqual('http://somewhere:3128', proxies['http']) + + +class urlopen_HttpTests(unittest.TestCase, FakeHTTPMixin, FakeFTPMixin): + """Test urlopen() opening a fake http connection.""" + + def check_read(self, ver): + self.fakehttp(b"HTTP/" + ver + b" 200 OK\r\n\r\nHello!") + try: + fp = urlopen("http://python.org/") + self.assertEqual(fp.readline(), b"Hello!") + self.assertEqual(fp.readline(), b"") + self.assertEqual(fp.geturl(), 'http://python.org/') + self.assertEqual(fp.getcode(), 200) + finally: + self.unfakehttp() + + def test_url_fragment(self): + # Issue #11703: geturl() omits fragments in the original URL. + url = 'http://docs.python.org/library/urllib.html#OK' + self.fakehttp(b"HTTP/1.1 200 OK\r\n\r\nHello!") + try: + fp = urllib.request.urlopen(url) + self.assertEqual(fp.geturl(), url) + finally: + self.unfakehttp() + + def test_willclose(self): + self.fakehttp(b"HTTP/1.1 200 OK\r\n\r\nHello!") + try: + resp = urlopen("http://www.python.org") + self.assertTrue(resp.fp.will_close) + finally: + self.unfakehttp() + + @unittest.skipUnless(ssl, "ssl module required") + def test_url_path_with_control_char_rejected(self): + for char_no in list(range(0, 0x21)) + [0x7f]: + char = chr(char_no) + schemeless_url = f"//localhost:7777/test{char}/" + self.fakehttp(b"HTTP/1.1 200 OK\r\n\r\nHello.") + try: + # We explicitly test urllib.request.urlopen() instead of the top + # level 'def urlopen()' function defined in this... (quite ugly) + # test suite. They use different url opening codepaths. Plain + # urlopen uses FancyURLOpener which goes via a codepath that + # calls urllib.parse.quote() on the URL which makes all of the + # above attempts at injection within the url _path_ safe. + escaped_char_repr = repr(char).replace('\\', r'\\') + InvalidURL = http.client.InvalidURL + with self.assertRaisesRegex( + InvalidURL, f"contain control.*{escaped_char_repr}"): + urllib.request.urlopen(f"http:{schemeless_url}") + with self.assertRaisesRegex( + InvalidURL, f"contain control.*{escaped_char_repr}"): + urllib.request.urlopen(f"https:{schemeless_url}") + # This code path quotes the URL so there is no injection. + resp = urlopen(f"http:{schemeless_url}") + self.assertNotIn(char, resp.geturl()) + finally: + self.unfakehttp() + + @unittest.skipUnless(ssl, "ssl module required") + def test_url_path_with_newline_header_injection_rejected(self): + self.fakehttp(b"HTTP/1.1 200 OK\r\n\r\nHello.") + host = "localhost:7777?a=1 HTTP/1.1\r\nX-injected: header\r\nTEST: 123" + schemeless_url = "//" + host + ":8080/test/?test=a" + try: + # We explicitly test urllib.request.urlopen() instead of the top + # level 'def urlopen()' function defined in this... (quite ugly) + # test suite. They use different url opening codepaths. Plain + # urlopen uses FancyURLOpener which goes via a codepath that + # calls urllib.parse.quote() on the URL which makes all of the + # above attempts at injection within the url _path_ safe. + InvalidURL = http.client.InvalidURL + with self.assertRaisesRegex( + InvalidURL, r"contain control.*\\r.*(found at least . .)"): + urllib.request.urlopen(f"http:{schemeless_url}") + with self.assertRaisesRegex(InvalidURL, r"contain control.*\\n"): + urllib.request.urlopen(f"https:{schemeless_url}") + # This code path quotes the URL so there is no injection. + resp = urlopen(f"http:{schemeless_url}") + self.assertNotIn(' ', resp.geturl()) + self.assertNotIn('\r', resp.geturl()) + self.assertNotIn('\n', resp.geturl()) + finally: + self.unfakehttp() + + @unittest.skipUnless(ssl, "ssl module required") + def test_url_host_with_control_char_rejected(self): + for char_no in list(range(0, 0x21)) + [0x7f]: + char = chr(char_no) + schemeless_url = f"//localhost{char}/test/" + self.fakehttp(b"HTTP/1.1 200 OK\r\n\r\nHello.") + try: + escaped_char_repr = repr(char).replace('\\', r'\\') + InvalidURL = http.client.InvalidURL + with self.assertRaisesRegex( + InvalidURL, f"contain control.*{escaped_char_repr}"): + urlopen(f"http:{schemeless_url}") + with self.assertRaisesRegex(InvalidURL, f"contain control.*{escaped_char_repr}"): + urlopen(f"https:{schemeless_url}") + finally: + self.unfakehttp() + + @unittest.skipUnless(ssl, "ssl module required") + def test_url_host_with_newline_header_injection_rejected(self): + self.fakehttp(b"HTTP/1.1 200 OK\r\n\r\nHello.") + host = "localhost\r\nX-injected: header\r\n" + schemeless_url = "//" + host + ":8080/test/?test=a" + try: + InvalidURL = http.client.InvalidURL + with self.assertRaisesRegex( + InvalidURL, r"contain control.*\\r"): + urlopen(f"http:{schemeless_url}") + with self.assertRaisesRegex(InvalidURL, r"contain control.*\\n"): + urlopen(f"https:{schemeless_url}") + finally: + self.unfakehttp() + + def test_read_0_9(self): + # "0.9" response accepted (but not "simple responses" without + # a status line) + self.check_read(b"0.9") + + def test_read_1_0(self): + self.check_read(b"1.0") + + def test_read_1_1(self): + self.check_read(b"1.1") + + def test_read_bogus(self): + # urlopen() should raise OSError for many error codes. + self.fakehttp(b'''HTTP/1.1 401 Authentication Required +Date: Wed, 02 Jan 2008 03:03:54 GMT +Server: Apache/1.3.33 (Debian GNU/Linux) mod_ssl/2.8.22 OpenSSL/0.9.7e +Connection: close +Content-Type: text/html; charset=iso-8859-1 +''', mock_close=True) + try: + self.assertRaises(OSError, urlopen, "http://python.org/") + finally: + self.unfakehttp() + + def test_invalid_redirect(self): + # urlopen() should raise OSError for many error codes. + self.fakehttp(b'''HTTP/1.1 302 Found +Date: Wed, 02 Jan 2008 03:03:54 GMT +Server: Apache/1.3.33 (Debian GNU/Linux) mod_ssl/2.8.22 OpenSSL/0.9.7e +Location: file://guidocomputer.athome.com:/python/license +Connection: close +Content-Type: text/html; charset=iso-8859-1 +''', mock_close=True) + try: + msg = "Redirection to url 'file:" + with self.assertRaisesRegex(urllib.error.HTTPError, msg): + urlopen("http://python.org/") + finally: + self.unfakehttp() + + def test_redirect_limit_independent(self): + # Ticket #12923: make sure independent requests each use their + # own retry limit. + for i in range(FancyURLopener().maxtries): + self.fakehttp(b'''HTTP/1.1 302 Found +Location: file://guidocomputer.athome.com:/python/license +Connection: close +''', mock_close=True) + try: + self.assertRaises(urllib.error.HTTPError, urlopen, + "http://something") + finally: + self.unfakehttp() + + def test_empty_socket(self): + # urlopen() raises OSError if the underlying socket does not send any + # data. (#1680230) + self.fakehttp(b'') + try: + self.assertRaises(OSError, urlopen, "http://something") + finally: + self.unfakehttp() + + def test_missing_localfile(self): + # Test for #10836 + with self.assertRaises(urllib.error.URLError) as e: + urlopen('file://localhost/a/file/which/doesnot/exists.py') + self.assertTrue(e.exception.filename) + self.assertTrue(e.exception.reason) + + def test_file_notexists(self): + fd, tmp_file = tempfile.mkstemp() + tmp_fileurl = 'file://localhost/' + tmp_file.replace(os.path.sep, '/') + try: + self.assertTrue(os.path.exists(tmp_file)) + with urlopen(tmp_fileurl) as fobj: + self.assertTrue(fobj) + finally: + os.close(fd) + os.unlink(tmp_file) + self.assertFalse(os.path.exists(tmp_file)) + with self.assertRaises(urllib.error.URLError): + urlopen(tmp_fileurl) + + def test_ftp_nohost(self): + test_ftp_url = 'ftp:///path' + with self.assertRaises(urllib.error.URLError) as e: + urlopen(test_ftp_url) + self.assertFalse(e.exception.filename) + self.assertTrue(e.exception.reason) + + def test_ftp_nonexisting(self): + with self.assertRaises(urllib.error.URLError) as e: + urlopen('ftp://localhost/a/file/which/doesnot/exists.py') + self.assertFalse(e.exception.filename) + self.assertTrue(e.exception.reason) + + @patch.object(urllib.request, 'MAXFTPCACHE', 0) + def test_ftp_cache_pruning(self): + self.fakeftp() + try: + urllib.request.ftpcache['test'] = urllib.request.ftpwrapper('user', 'pass', 'localhost', 21, []) + urlopen('ftp://localhost') + finally: + self.unfakeftp() + + def test_userpass_inurl(self): + self.fakehttp(b"HTTP/1.0 200 OK\r\n\r\nHello!") + try: + fp = urlopen("http://user:pass@python.org/") + self.assertEqual(fp.readline(), b"Hello!") + self.assertEqual(fp.readline(), b"") + self.assertEqual(fp.geturl(), 'http://user:pass@python.org/') + self.assertEqual(fp.getcode(), 200) + finally: + self.unfakehttp() + + def test_userpass_inurl_w_spaces(self): + self.fakehttp(b"HTTP/1.0 200 OK\r\n\r\nHello!") + try: + userpass = "a b:c d" + url = "http://{}@python.org/".format(userpass) + fakehttp_wrapper = http.client.HTTPConnection + authorization = ("Authorization: Basic %s\r\n" % + b64encode(userpass.encode("ASCII")).decode("ASCII")) + fp = urlopen(url) + # The authorization header must be in place + self.assertIn(authorization, fakehttp_wrapper.buf.decode("UTF-8")) + self.assertEqual(fp.readline(), b"Hello!") + self.assertEqual(fp.readline(), b"") + # the spaces are quoted in URL so no match + self.assertNotEqual(fp.geturl(), url) + self.assertEqual(fp.getcode(), 200) + finally: + self.unfakehttp() + + def test_URLopener_deprecation(self): + with support.check_warnings(('',DeprecationWarning)): + urllib.request.URLopener() + + @unittest.skipUnless(ssl, "ssl module required") + def test_cafile_and_context(self): + context = ssl.create_default_context() + with support.check_warnings(('', DeprecationWarning)): + with self.assertRaises(ValueError): + urllib.request.urlopen( + "https://localhost", cafile="/nonexistent/path", context=context + ) + + +class urlopen_DataTests(unittest.TestCase): + """Test urlopen() opening a data URL.""" + + def setUp(self): + # text containing URL special- and unicode-characters + self.text = "test data URLs :;,%=& \u00f6 \u00c4 " + # 2x1 pixel RGB PNG image with one black and one white pixel + self.image = ( + b'\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR\x00\x00\x00\x02\x00\x00\x00' + b'\x01\x08\x02\x00\x00\x00{@\xe8\xdd\x00\x00\x00\x01sRGB\x00\xae' + b'\xce\x1c\xe9\x00\x00\x00\x0fIDAT\x08\xd7c```\xf8\xff\xff?\x00' + b'\x06\x01\x02\xfe\no/\x1e\x00\x00\x00\x00IEND\xaeB`\x82') + + self.text_url = ( + "data:text/plain;charset=UTF-8,test%20data%20URLs%20%3A%3B%2C%25%3" + "D%26%20%C3%B6%20%C3%84%20") + self.text_url_base64 = ( + "data:text/plain;charset=ISO-8859-1;base64,dGVzdCBkYXRhIFVSTHMgOjs" + "sJT0mIPYgxCA%3D") + # base64 encoded data URL that contains ignorable spaces, + # such as "\n", " ", "%0A", and "%20". + self.image_url = ( + "data:image/png;base64,iVBORw0KGgoAAAANSUhEUgAAAAIAAAABCAIAAAB7\n" + "QOjdAAAAAXNSR0IArs4c6QAAAA9JREFUCNdj%0AYGBg%2BP//PwAGAQL%2BCm8 " + "vHgAAAABJRU5ErkJggg%3D%3D%0A%20") + + self.text_url_resp = urllib.request.urlopen(self.text_url) + self.text_url_base64_resp = urllib.request.urlopen( + self.text_url_base64) + self.image_url_resp = urllib.request.urlopen(self.image_url) + + def test_interface(self): + # Make sure object returned by urlopen() has the specified methods + for attr in ("read", "readline", "readlines", + "close", "info", "geturl", "getcode", "__iter__"): + self.assertTrue(hasattr(self.text_url_resp, attr), + "object returned by urlopen() lacks %s attribute" % + attr) + + def test_info(self): + self.assertIsInstance(self.text_url_resp.info(), email.message.Message) + self.assertEqual(self.text_url_base64_resp.info().get_params(), + [('text/plain', ''), ('charset', 'ISO-8859-1')]) + self.assertEqual(self.image_url_resp.info()['content-length'], + str(len(self.image))) + self.assertEqual(urllib.request.urlopen("data:,").info().get_params(), + [('text/plain', ''), ('charset', 'US-ASCII')]) + + def test_geturl(self): + self.assertEqual(self.text_url_resp.geturl(), self.text_url) + self.assertEqual(self.text_url_base64_resp.geturl(), + self.text_url_base64) + self.assertEqual(self.image_url_resp.geturl(), self.image_url) + + def test_read_text(self): + self.assertEqual(self.text_url_resp.read().decode( + dict(self.text_url_resp.info().get_params())['charset']), self.text) + + def test_read_text_base64(self): + self.assertEqual(self.text_url_base64_resp.read().decode( + dict(self.text_url_base64_resp.info().get_params())['charset']), + self.text) + + def test_read_image(self): + self.assertEqual(self.image_url_resp.read(), self.image) + + def test_missing_comma(self): + self.assertRaises(ValueError,urllib.request.urlopen,'data:text/plain') + + def test_invalid_base64_data(self): + # missing padding character + self.assertRaises(ValueError,urllib.request.urlopen,'data:;base64,Cg=') + + +class urlretrieve_FileTests(unittest.TestCase): + """Test urllib.urlretrieve() on local files""" + + def setUp(self): + # Create a list of temporary files. Each item in the list is a file + # name (absolute path or relative to the current working directory). + # All files in this list will be deleted in the tearDown method. Note, + # this only helps to makes sure temporary files get deleted, but it + # does nothing about trying to close files that may still be open. It + # is the responsibility of the developer to properly close files even + # when exceptional conditions occur. + self.tempFiles = [] + + # Create a temporary file. + self.registerFileForCleanUp(support.TESTFN) + self.text = b'testing urllib.urlretrieve' + try: + FILE = open(support.TESTFN, 'wb') + FILE.write(self.text) + FILE.close() + finally: + try: FILE.close() + except: pass + + def tearDown(self): + # Delete the temporary files. + for each in self.tempFiles: + try: os.remove(each) + except: pass + + def constructLocalFileUrl(self, filePath): + filePath = os.path.abspath(filePath) + try: + filePath.encode("utf-8") + except UnicodeEncodeError: + raise unittest.SkipTest("filePath is not encodable to utf8") + return "file://%s" % urllib.request.pathname2url(filePath) + + def createNewTempFile(self, data=b""): + """Creates a new temporary file containing the specified data, + registers the file for deletion during the test fixture tear down, and + returns the absolute path of the file.""" + + newFd, newFilePath = tempfile.mkstemp() + try: + self.registerFileForCleanUp(newFilePath) + newFile = os.fdopen(newFd, "wb") + newFile.write(data) + newFile.close() + finally: + try: newFile.close() + except: pass + return newFilePath + + def registerFileForCleanUp(self, fileName): + self.tempFiles.append(fileName) + + def test_basic(self): + # Make sure that a local file just gets its own location returned and + # a headers value is returned. + result = urllib.request.urlretrieve("file:%s" % support.TESTFN) + self.assertEqual(result[0], support.TESTFN) + self.assertIsInstance(result[1], email.message.Message, + "did not get an email.message.Message instance " + "as second returned value") + + def test_copy(self): + # Test that setting the filename argument works. + second_temp = "%s.2" % support.TESTFN + self.registerFileForCleanUp(second_temp) + result = urllib.request.urlretrieve(self.constructLocalFileUrl( + support.TESTFN), second_temp) + self.assertEqual(second_temp, result[0]) + self.assertTrue(os.path.exists(second_temp), "copy of the file was not " + "made") + FILE = open(second_temp, 'rb') + try: + text = FILE.read() + FILE.close() + finally: + try: FILE.close() + except: pass + self.assertEqual(self.text, text) + + def test_reporthook(self): + # Make sure that the reporthook works. + def hooktester(block_count, block_read_size, file_size, count_holder=[0]): + self.assertIsInstance(block_count, int) + self.assertIsInstance(block_read_size, int) + self.assertIsInstance(file_size, int) + self.assertEqual(block_count, count_holder[0]) + count_holder[0] = count_holder[0] + 1 + second_temp = "%s.2" % support.TESTFN + self.registerFileForCleanUp(second_temp) + urllib.request.urlretrieve( + self.constructLocalFileUrl(support.TESTFN), + second_temp, hooktester) + + def test_reporthook_0_bytes(self): + # Test on zero length file. Should call reporthook only 1 time. + report = [] + def hooktester(block_count, block_read_size, file_size, _report=report): + _report.append((block_count, block_read_size, file_size)) + srcFileName = self.createNewTempFile() + urllib.request.urlretrieve(self.constructLocalFileUrl(srcFileName), + support.TESTFN, hooktester) + self.assertEqual(len(report), 1) + self.assertEqual(report[0][2], 0) + + def test_reporthook_5_bytes(self): + # Test on 5 byte file. Should call reporthook only 2 times (once when + # the "network connection" is established and once when the block is + # read). + report = [] + def hooktester(block_count, block_read_size, file_size, _report=report): + _report.append((block_count, block_read_size, file_size)) + srcFileName = self.createNewTempFile(b"x" * 5) + urllib.request.urlretrieve(self.constructLocalFileUrl(srcFileName), + support.TESTFN, hooktester) + self.assertEqual(len(report), 2) + self.assertEqual(report[0][2], 5) + self.assertEqual(report[1][2], 5) + + def test_reporthook_8193_bytes(self): + # Test on 8193 byte file. Should call reporthook only 3 times (once + # when the "network connection" is established, once for the next 8192 + # bytes, and once for the last byte). + report = [] + def hooktester(block_count, block_read_size, file_size, _report=report): + _report.append((block_count, block_read_size, file_size)) + srcFileName = self.createNewTempFile(b"x" * 8193) + urllib.request.urlretrieve(self.constructLocalFileUrl(srcFileName), + support.TESTFN, hooktester) + self.assertEqual(len(report), 3) + self.assertEqual(report[0][2], 8193) + self.assertEqual(report[0][1], 8192) + self.assertEqual(report[1][1], 8192) + self.assertEqual(report[2][1], 8192) + + +class urlretrieve_HttpTests(unittest.TestCase, FakeHTTPMixin): + """Test urllib.urlretrieve() using fake http connections""" + + def test_short_content_raises_ContentTooShortError(self): + self.fakehttp(b'''HTTP/1.1 200 OK +Date: Wed, 02 Jan 2008 03:03:54 GMT +Server: Apache/1.3.33 (Debian GNU/Linux) mod_ssl/2.8.22 OpenSSL/0.9.7e +Connection: close +Content-Length: 100 +Content-Type: text/html; charset=iso-8859-1 + +FF +''') + + def _reporthook(par1, par2, par3): + pass + + with self.assertRaises(urllib.error.ContentTooShortError): + try: + urllib.request.urlretrieve(support.TEST_HTTP_URL, + reporthook=_reporthook) + finally: + self.unfakehttp() + + def test_short_content_raises_ContentTooShortError_without_reporthook(self): + self.fakehttp(b'''HTTP/1.1 200 OK +Date: Wed, 02 Jan 2008 03:03:54 GMT +Server: Apache/1.3.33 (Debian GNU/Linux) mod_ssl/2.8.22 OpenSSL/0.9.7e +Connection: close +Content-Length: 100 +Content-Type: text/html; charset=iso-8859-1 + +FF +''') + with self.assertRaises(urllib.error.ContentTooShortError): + try: + urllib.request.urlretrieve(support.TEST_HTTP_URL) + finally: + self.unfakehttp() + + +class QuotingTests(unittest.TestCase): + r"""Tests for urllib.quote() and urllib.quote_plus() + + According to RFC 3986 (Uniform Resource Identifiers), to escape a + character you write it as '%' + <2 character US-ASCII hex value>. + The Python code of ``'%' + hex(ord())[2:]`` escapes a + character properly. Case does not matter on the hex letters. + + The various character sets specified are: + + Reserved characters : ";/?:@&=+$," + Have special meaning in URIs and must be escaped if not being used for + their special meaning + Data characters : letters, digits, and "-_.!~*'()" + Unreserved and do not need to be escaped; can be, though, if desired + Control characters : 0x00 - 0x1F, 0x7F + Have no use in URIs so must be escaped + space : 0x20 + Must be escaped + Delimiters : '<>#%"' + Must be escaped + Unwise : "{}|\^[]`" + Must be escaped + + """ + + def test_never_quote(self): + # Make sure quote() does not quote letters, digits, and "_,.-" + do_not_quote = '' .join(["ABCDEFGHIJKLMNOPQRSTUVWXYZ", + "abcdefghijklmnopqrstuvwxyz", + "0123456789", + "_.-~"]) + result = urllib.parse.quote(do_not_quote) + self.assertEqual(do_not_quote, result, + "using quote(): %r != %r" % (do_not_quote, result)) + result = urllib.parse.quote_plus(do_not_quote) + self.assertEqual(do_not_quote, result, + "using quote_plus(): %r != %r" % (do_not_quote, result)) + + def test_default_safe(self): + # Test '/' is default value for 'safe' parameter + self.assertEqual(urllib.parse.quote.__defaults__[0], '/') + + def test_safe(self): + # Test setting 'safe' parameter does what it should do + quote_by_default = "<>" + result = urllib.parse.quote(quote_by_default, safe=quote_by_default) + self.assertEqual(quote_by_default, result, + "using quote(): %r != %r" % (quote_by_default, result)) + result = urllib.parse.quote_plus(quote_by_default, + safe=quote_by_default) + self.assertEqual(quote_by_default, result, + "using quote_plus(): %r != %r" % + (quote_by_default, result)) + # Safe expressed as bytes rather than str + result = urllib.parse.quote(quote_by_default, safe=b"<>") + self.assertEqual(quote_by_default, result, + "using quote(): %r != %r" % (quote_by_default, result)) + # "Safe" non-ASCII characters should have no effect + # (Since URIs are not allowed to have non-ASCII characters) + result = urllib.parse.quote("a\xfcb", encoding="latin-1", safe="\xfc") + expect = urllib.parse.quote("a\xfcb", encoding="latin-1", safe="") + self.assertEqual(expect, result, + "using quote(): %r != %r" % + (expect, result)) + # Same as above, but using a bytes rather than str + result = urllib.parse.quote("a\xfcb", encoding="latin-1", safe=b"\xfc") + expect = urllib.parse.quote("a\xfcb", encoding="latin-1", safe="") + self.assertEqual(expect, result, + "using quote(): %r != %r" % + (expect, result)) + + def test_default_quoting(self): + # Make sure all characters that should be quoted are by default sans + # space (separate test for that). + should_quote = [chr(num) for num in range(32)] # For 0x00 - 0x1F + should_quote.append(r'<>#%"{}|\^[]`') + should_quote.append(chr(127)) # For 0x7F + should_quote = ''.join(should_quote) + for char in should_quote: + result = urllib.parse.quote(char) + self.assertEqual(hexescape(char), result, + "using quote(): " + "%s should be escaped to %s, not %s" % + (char, hexescape(char), result)) + result = urllib.parse.quote_plus(char) + self.assertEqual(hexescape(char), result, + "using quote_plus(): " + "%s should be escapes to %s, not %s" % + (char, hexescape(char), result)) + del should_quote + partial_quote = "ab[]cd" + expected = "ab%5B%5Dcd" + result = urllib.parse.quote(partial_quote) + self.assertEqual(expected, result, + "using quote(): %r != %r" % (expected, result)) + result = urllib.parse.quote_plus(partial_quote) + self.assertEqual(expected, result, + "using quote_plus(): %r != %r" % (expected, result)) + + def test_quoting_space(self): + # Make sure quote() and quote_plus() handle spaces as specified in + # their unique way + result = urllib.parse.quote(' ') + self.assertEqual(result, hexescape(' '), + "using quote(): %r != %r" % (result, hexescape(' '))) + result = urllib.parse.quote_plus(' ') + self.assertEqual(result, '+', + "using quote_plus(): %r != +" % result) + given = "a b cd e f" + expect = given.replace(' ', hexescape(' ')) + result = urllib.parse.quote(given) + self.assertEqual(expect, result, + "using quote(): %r != %r" % (expect, result)) + expect = given.replace(' ', '+') + result = urllib.parse.quote_plus(given) + self.assertEqual(expect, result, + "using quote_plus(): %r != %r" % (expect, result)) + + def test_quoting_plus(self): + self.assertEqual(urllib.parse.quote_plus('alpha+beta gamma'), + 'alpha%2Bbeta+gamma') + self.assertEqual(urllib.parse.quote_plus('alpha+beta gamma', '+'), + 'alpha+beta+gamma') + # Test with bytes + self.assertEqual(urllib.parse.quote_plus(b'alpha+beta gamma'), + 'alpha%2Bbeta+gamma') + # Test with safe bytes + self.assertEqual(urllib.parse.quote_plus('alpha+beta gamma', b'+'), + 'alpha+beta+gamma') + + def test_quote_bytes(self): + # Bytes should quote directly to percent-encoded values + given = b"\xa2\xd8ab\xff" + expect = "%A2%D8ab%FF" + result = urllib.parse.quote(given) + self.assertEqual(expect, result, + "using quote(): %r != %r" % (expect, result)) + # Encoding argument should raise type error on bytes input + self.assertRaises(TypeError, urllib.parse.quote, given, + encoding="latin-1") + # quote_from_bytes should work the same + result = urllib.parse.quote_from_bytes(given) + self.assertEqual(expect, result, + "using quote_from_bytes(): %r != %r" + % (expect, result)) + + def test_quote_with_unicode(self): + # Characters in Latin-1 range, encoded by default in UTF-8 + given = "\xa2\xd8ab\xff" + expect = "%C2%A2%C3%98ab%C3%BF" + result = urllib.parse.quote(given) + self.assertEqual(expect, result, + "using quote(): %r != %r" % (expect, result)) + # Characters in Latin-1 range, encoded by with None (default) + result = urllib.parse.quote(given, encoding=None, errors=None) + self.assertEqual(expect, result, + "using quote(): %r != %r" % (expect, result)) + # Characters in Latin-1 range, encoded with Latin-1 + given = "\xa2\xd8ab\xff" + expect = "%A2%D8ab%FF" + result = urllib.parse.quote(given, encoding="latin-1") + self.assertEqual(expect, result, + "using quote(): %r != %r" % (expect, result)) + # Characters in BMP, encoded by default in UTF-8 + given = "\u6f22\u5b57" # "Kanji" + expect = "%E6%BC%A2%E5%AD%97" + result = urllib.parse.quote(given) + self.assertEqual(expect, result, + "using quote(): %r != %r" % (expect, result)) + # Characters in BMP, encoded with Latin-1 + given = "\u6f22\u5b57" + self.assertRaises(UnicodeEncodeError, urllib.parse.quote, given, + encoding="latin-1") + # Characters in BMP, encoded with Latin-1, with replace error handling + given = "\u6f22\u5b57" + expect = "%3F%3F" # "??" + result = urllib.parse.quote(given, encoding="latin-1", + errors="replace") + self.assertEqual(expect, result, + "using quote(): %r != %r" % (expect, result)) + # Characters in BMP, Latin-1, with xmlcharref error handling + given = "\u6f22\u5b57" + expect = "%26%2328450%3B%26%2323383%3B" # "漢字" + result = urllib.parse.quote(given, encoding="latin-1", + errors="xmlcharrefreplace") + self.assertEqual(expect, result, + "using quote(): %r != %r" % (expect, result)) + + def test_quote_plus_with_unicode(self): + # Encoding (latin-1) test for quote_plus + given = "\xa2\xd8 \xff" + expect = "%A2%D8+%FF" + result = urllib.parse.quote_plus(given, encoding="latin-1") + self.assertEqual(expect, result, + "using quote_plus(): %r != %r" % (expect, result)) + # Errors test for quote_plus + given = "ab\u6f22\u5b57 cd" + expect = "ab%3F%3F+cd" + result = urllib.parse.quote_plus(given, encoding="latin-1", + errors="replace") + self.assertEqual(expect, result, + "using quote_plus(): %r != %r" % (expect, result)) + + +class UnquotingTests(unittest.TestCase): + """Tests for unquote() and unquote_plus() + + See the doc string for quoting_Tests for details on quoting and such. + + """ + + def test_unquoting(self): + # Make sure unquoting of all ASCII values works + escape_list = [] + for num in range(128): + given = hexescape(chr(num)) + expect = chr(num) + result = urllib.parse.unquote(given) + self.assertEqual(expect, result, + "using unquote(): %r != %r" % (expect, result)) + result = urllib.parse.unquote_plus(given) + self.assertEqual(expect, result, + "using unquote_plus(): %r != %r" % + (expect, result)) + escape_list.append(given) + escape_string = ''.join(escape_list) + del escape_list + result = urllib.parse.unquote(escape_string) + self.assertEqual(result.count('%'), 1, + "using unquote(): not all characters escaped: " + "%s" % result) + self.assertRaises((TypeError, AttributeError), urllib.parse.unquote, None) + self.assertRaises((TypeError, AttributeError), urllib.parse.unquote, ()) + with support.check_warnings(('', BytesWarning), quiet=True): + self.assertRaises((TypeError, AttributeError), urllib.parse.unquote, b'') + + def test_unquoting_badpercent(self): + # Test unquoting on bad percent-escapes + given = '%xab' + expect = given + result = urllib.parse.unquote(given) + self.assertEqual(expect, result, "using unquote(): %r != %r" + % (expect, result)) + given = '%x' + expect = given + result = urllib.parse.unquote(given) + self.assertEqual(expect, result, "using unquote(): %r != %r" + % (expect, result)) + given = '%' + expect = given + result = urllib.parse.unquote(given) + self.assertEqual(expect, result, "using unquote(): %r != %r" + % (expect, result)) + # unquote_to_bytes + given = '%xab' + expect = bytes(given, 'ascii') + result = urllib.parse.unquote_to_bytes(given) + self.assertEqual(expect, result, "using unquote_to_bytes(): %r != %r" + % (expect, result)) + given = '%x' + expect = bytes(given, 'ascii') + result = urllib.parse.unquote_to_bytes(given) + self.assertEqual(expect, result, "using unquote_to_bytes(): %r != %r" + % (expect, result)) + given = '%' + expect = bytes(given, 'ascii') + result = urllib.parse.unquote_to_bytes(given) + self.assertEqual(expect, result, "using unquote_to_bytes(): %r != %r" + % (expect, result)) + self.assertRaises((TypeError, AttributeError), urllib.parse.unquote_to_bytes, None) + self.assertRaises((TypeError, AttributeError), urllib.parse.unquote_to_bytes, ()) + + def test_unquoting_mixed_case(self): + # Test unquoting on mixed-case hex digits in the percent-escapes + given = '%Ab%eA' + expect = b'\xab\xea' + result = urllib.parse.unquote_to_bytes(given) + self.assertEqual(expect, result, + "using unquote_to_bytes(): %r != %r" + % (expect, result)) + + def test_unquoting_parts(self): + # Make sure unquoting works when have non-quoted characters + # interspersed + given = 'ab%sd' % hexescape('c') + expect = "abcd" + result = urllib.parse.unquote(given) + self.assertEqual(expect, result, + "using quote(): %r != %r" % (expect, result)) + result = urllib.parse.unquote_plus(given) + self.assertEqual(expect, result, + "using unquote_plus(): %r != %r" % (expect, result)) + + def test_unquoting_plus(self): + # Test difference between unquote() and unquote_plus() + given = "are+there+spaces..." + expect = given + result = urllib.parse.unquote(given) + self.assertEqual(expect, result, + "using unquote(): %r != %r" % (expect, result)) + expect = given.replace('+', ' ') + result = urllib.parse.unquote_plus(given) + self.assertEqual(expect, result, + "using unquote_plus(): %r != %r" % (expect, result)) + + def test_unquote_to_bytes(self): + given = 'br%C3%BCckner_sapporo_20050930.doc' + expect = b'br\xc3\xbcckner_sapporo_20050930.doc' + result = urllib.parse.unquote_to_bytes(given) + self.assertEqual(expect, result, + "using unquote_to_bytes(): %r != %r" + % (expect, result)) + # Test on a string with unescaped non-ASCII characters + # (Technically an invalid URI; expect those characters to be UTF-8 + # encoded). + result = urllib.parse.unquote_to_bytes("\u6f22%C3%BC") + expect = b'\xe6\xbc\xa2\xc3\xbc' # UTF-8 for "\u6f22\u00fc" + self.assertEqual(expect, result, + "using unquote_to_bytes(): %r != %r" + % (expect, result)) + # Test with a bytes as input + given = b'%A2%D8ab%FF' + expect = b'\xa2\xd8ab\xff' + result = urllib.parse.unquote_to_bytes(given) + self.assertEqual(expect, result, + "using unquote_to_bytes(): %r != %r" + % (expect, result)) + # Test with a bytes as input, with unescaped non-ASCII bytes + # (Technically an invalid URI; expect those bytes to be preserved) + given = b'%A2\xd8ab%FF' + expect = b'\xa2\xd8ab\xff' + result = urllib.parse.unquote_to_bytes(given) + self.assertEqual(expect, result, + "using unquote_to_bytes(): %r != %r" + % (expect, result)) + + def test_unquote_with_unicode(self): + # Characters in the Latin-1 range, encoded with UTF-8 + given = 'br%C3%BCckner_sapporo_20050930.doc' + expect = 'br\u00fcckner_sapporo_20050930.doc' + result = urllib.parse.unquote(given) + self.assertEqual(expect, result, + "using unquote(): %r != %r" % (expect, result)) + # Characters in the Latin-1 range, encoded with None (default) + result = urllib.parse.unquote(given, encoding=None, errors=None) + self.assertEqual(expect, result, + "using unquote(): %r != %r" % (expect, result)) + + # Characters in the Latin-1 range, encoded with Latin-1 + result = urllib.parse.unquote('br%FCckner_sapporo_20050930.doc', + encoding="latin-1") + expect = 'br\u00fcckner_sapporo_20050930.doc' + self.assertEqual(expect, result, + "using unquote(): %r != %r" % (expect, result)) + + # Characters in BMP, encoded with UTF-8 + given = "%E6%BC%A2%E5%AD%97" + expect = "\u6f22\u5b57" # "Kanji" + result = urllib.parse.unquote(given) + self.assertEqual(expect, result, + "using unquote(): %r != %r" % (expect, result)) + + # Decode with UTF-8, invalid sequence + given = "%F3%B1" + expect = "\ufffd" # Replacement character + result = urllib.parse.unquote(given) + self.assertEqual(expect, result, + "using unquote(): %r != %r" % (expect, result)) + + # Decode with UTF-8, invalid sequence, replace errors + result = urllib.parse.unquote(given, errors="replace") + self.assertEqual(expect, result, + "using unquote(): %r != %r" % (expect, result)) + + # Decode with UTF-8, invalid sequence, ignoring errors + given = "%F3%B1" + expect = "" + result = urllib.parse.unquote(given, errors="ignore") + self.assertEqual(expect, result, + "using unquote(): %r != %r" % (expect, result)) + + # A mix of non-ASCII and percent-encoded characters, UTF-8 + result = urllib.parse.unquote("\u6f22%C3%BC") + expect = '\u6f22\u00fc' + self.assertEqual(expect, result, + "using unquote(): %r != %r" % (expect, result)) + + # A mix of non-ASCII and percent-encoded characters, Latin-1 + # (Note, the string contains non-Latin-1-representable characters) + result = urllib.parse.unquote("\u6f22%FC", encoding="latin-1") + expect = '\u6f22\u00fc' + self.assertEqual(expect, result, + "using unquote(): %r != %r" % (expect, result)) + + def test_unquoting_with_bytes_input(self): + # Bytes not supported yet + with self.assertRaisesRegex(TypeError, 'Expected str, got bytes'): + given = b'bl\xc3\xa5b\xc3\xa6rsyltet\xc3\xb8y' + urllib.parse.unquote(given) + +class urlencode_Tests(unittest.TestCase): + """Tests for urlencode()""" + + def help_inputtype(self, given, test_type): + """Helper method for testing different input types. + + 'given' must lead to only the pairs: + * 1st, 1 + * 2nd, 2 + * 3rd, 3 + + Test cannot assume anything about order. Docs make no guarantee and + have possible dictionary input. + + """ + expect_somewhere = ["1st=1", "2nd=2", "3rd=3"] + result = urllib.parse.urlencode(given) + for expected in expect_somewhere: + self.assertIn(expected, result, + "testing %s: %s not found in %s" % + (test_type, expected, result)) + self.assertEqual(result.count('&'), 2, + "testing %s: expected 2 '&'s; got %s" % + (test_type, result.count('&'))) + amp_location = result.index('&') + on_amp_left = result[amp_location - 1] + on_amp_right = result[amp_location + 1] + self.assertTrue(on_amp_left.isdigit() and on_amp_right.isdigit(), + "testing %s: '&' not located in proper place in %s" % + (test_type, result)) + self.assertEqual(len(result), (5 * 3) + 2, #5 chars per thing and amps + "testing %s: " + "unexpected number of characters: %s != %s" % + (test_type, len(result), (5 * 3) + 2)) + + def test_using_mapping(self): + # Test passing in a mapping object as an argument. + self.help_inputtype({"1st":'1', "2nd":'2', "3rd":'3'}, + "using dict as input type") + + def test_using_sequence(self): + # Test passing in a sequence of two-item sequences as an argument. + self.help_inputtype([('1st', '1'), ('2nd', '2'), ('3rd', '3')], + "using sequence of two-item tuples as input") + + def test_quoting(self): + # Make sure keys and values are quoted using quote_plus() + given = {"&":"="} + expect = "%s=%s" % (hexescape('&'), hexescape('=')) + result = urllib.parse.urlencode(given) + self.assertEqual(expect, result) + given = {"key name":"A bunch of pluses"} + expect = "key+name=A+bunch+of+pluses" + result = urllib.parse.urlencode(given) + self.assertEqual(expect, result) + + def test_doseq(self): + # Test that passing True for 'doseq' parameter works correctly + given = {'sequence':['1', '2', '3']} + expect = "sequence=%s" % urllib.parse.quote_plus(str(['1', '2', '3'])) + result = urllib.parse.urlencode(given) + self.assertEqual(expect, result) + result = urllib.parse.urlencode(given, True) + for value in given["sequence"]: + expect = "sequence=%s" % value + self.assertIn(expect, result) + self.assertEqual(result.count('&'), 2, + "Expected 2 '&'s, got %s" % result.count('&')) + + def test_empty_sequence(self): + self.assertEqual("", urllib.parse.urlencode({})) + self.assertEqual("", urllib.parse.urlencode([])) + + def test_nonstring_values(self): + self.assertEqual("a=1", urllib.parse.urlencode({"a": 1})) + self.assertEqual("a=None", urllib.parse.urlencode({"a": None})) + + def test_nonstring_seq_values(self): + self.assertEqual("a=1&a=2", urllib.parse.urlencode({"a": [1, 2]}, True)) + self.assertEqual("a=None&a=a", + urllib.parse.urlencode({"a": [None, "a"]}, True)) + data = collections.OrderedDict([("a", 1), ("b", 1)]) + self.assertEqual("a=a&a=b", + urllib.parse.urlencode({"a": data}, True)) + + def test_urlencode_encoding(self): + # ASCII encoding. Expect %3F with errors="replace' + given = (('\u00a0', '\u00c1'),) + expect = '%3F=%3F' + result = urllib.parse.urlencode(given, encoding="ASCII", errors="replace") + self.assertEqual(expect, result) + + # Default is UTF-8 encoding. + given = (('\u00a0', '\u00c1'),) + expect = '%C2%A0=%C3%81' + result = urllib.parse.urlencode(given) + self.assertEqual(expect, result) + + # Latin-1 encoding. + given = (('\u00a0', '\u00c1'),) + expect = '%A0=%C1' + result = urllib.parse.urlencode(given, encoding="latin-1") + self.assertEqual(expect, result) + + def test_urlencode_encoding_doseq(self): + # ASCII Encoding. Expect %3F with errors="replace' + given = (('\u00a0', '\u00c1'),) + expect = '%3F=%3F' + result = urllib.parse.urlencode(given, doseq=True, + encoding="ASCII", errors="replace") + self.assertEqual(expect, result) + + # ASCII Encoding. On a sequence of values. + given = (("\u00a0", (1, "\u00c1")),) + expect = '%3F=1&%3F=%3F' + result = urllib.parse.urlencode(given, True, + encoding="ASCII", errors="replace") + self.assertEqual(expect, result) + + # Utf-8 + given = (("\u00a0", "\u00c1"),) + expect = '%C2%A0=%C3%81' + result = urllib.parse.urlencode(given, True) + self.assertEqual(expect, result) + + given = (("\u00a0", (42, "\u00c1")),) + expect = '%C2%A0=42&%C2%A0=%C3%81' + result = urllib.parse.urlencode(given, True) + self.assertEqual(expect, result) + + # latin-1 + given = (("\u00a0", "\u00c1"),) + expect = '%A0=%C1' + result = urllib.parse.urlencode(given, True, encoding="latin-1") + self.assertEqual(expect, result) + + given = (("\u00a0", (42, "\u00c1")),) + expect = '%A0=42&%A0=%C1' + result = urllib.parse.urlencode(given, True, encoding="latin-1") + self.assertEqual(expect, result) + + def test_urlencode_bytes(self): + given = ((b'\xa0\x24', b'\xc1\x24'),) + expect = '%A0%24=%C1%24' + result = urllib.parse.urlencode(given) + self.assertEqual(expect, result) + result = urllib.parse.urlencode(given, True) + self.assertEqual(expect, result) + + # Sequence of values + given = ((b'\xa0\x24', (42, b'\xc1\x24')),) + expect = '%A0%24=42&%A0%24=%C1%24' + result = urllib.parse.urlencode(given, True) + self.assertEqual(expect, result) + + def test_urlencode_encoding_safe_parameter(self): + + # Send '$' (\x24) as safe character + # Default utf-8 encoding + + given = ((b'\xa0\x24', b'\xc1\x24'),) + result = urllib.parse.urlencode(given, safe=":$") + expect = '%A0$=%C1$' + self.assertEqual(expect, result) + + given = ((b'\xa0\x24', b'\xc1\x24'),) + result = urllib.parse.urlencode(given, doseq=True, safe=":$") + expect = '%A0$=%C1$' + self.assertEqual(expect, result) + + # Safe parameter in sequence + given = ((b'\xa0\x24', (b'\xc1\x24', 0xd, 42)),) + expect = '%A0$=%C1$&%A0$=13&%A0$=42' + result = urllib.parse.urlencode(given, True, safe=":$") + self.assertEqual(expect, result) + + # Test all above in latin-1 encoding + + given = ((b'\xa0\x24', b'\xc1\x24'),) + result = urllib.parse.urlencode(given, safe=":$", + encoding="latin-1") + expect = '%A0$=%C1$' + self.assertEqual(expect, result) + + given = ((b'\xa0\x24', b'\xc1\x24'),) + expect = '%A0$=%C1$' + result = urllib.parse.urlencode(given, doseq=True, safe=":$", + encoding="latin-1") + + given = ((b'\xa0\x24', (b'\xc1\x24', 0xd, 42)),) + expect = '%A0$=%C1$&%A0$=13&%A0$=42' + result = urllib.parse.urlencode(given, True, safe=":$", + encoding="latin-1") + self.assertEqual(expect, result) + +class Pathname_Tests(unittest.TestCase): + """Test pathname2url() and url2pathname()""" + + def test_basic(self): + # Make sure simple tests pass + expected_path = os.path.join("parts", "of", "a", "path") + expected_url = "parts/of/a/path" + result = urllib.request.pathname2url(expected_path) + self.assertEqual(expected_url, result, + "pathname2url() failed; %s != %s" % + (result, expected_url)) + result = urllib.request.url2pathname(expected_url) + self.assertEqual(expected_path, result, + "url2pathame() failed; %s != %s" % + (result, expected_path)) + + def test_quoting(self): + # Test automatic quoting and unquoting works for pathnam2url() and + # url2pathname() respectively + given = os.path.join("needs", "quot=ing", "here") + expect = "needs/%s/here" % urllib.parse.quote("quot=ing") + result = urllib.request.pathname2url(given) + self.assertEqual(expect, result, + "pathname2url() failed; %s != %s" % + (expect, result)) + expect = given + result = urllib.request.url2pathname(result) + self.assertEqual(expect, result, + "url2pathname() failed; %s != %s" % + (expect, result)) + given = os.path.join("make sure", "using_quote") + expect = "%s/using_quote" % urllib.parse.quote("make sure") + result = urllib.request.pathname2url(given) + self.assertEqual(expect, result, + "pathname2url() failed; %s != %s" % + (expect, result)) + given = "make+sure/using_unquote" + expect = os.path.join("make+sure", "using_unquote") + result = urllib.request.url2pathname(given) + self.assertEqual(expect, result, + "url2pathname() failed; %s != %s" % + (expect, result)) + + @unittest.skipUnless(sys.platform == 'win32', + 'test specific to the urllib.url2path function.') + def test_ntpath(self): + given = ('/C:/', '///C:/', '/C|//') + expect = 'C:\\' + for url in given: + result = urllib.request.url2pathname(url) + self.assertEqual(expect, result, + 'urllib.request..url2pathname() failed; %s != %s' % + (expect, result)) + given = '///C|/path' + expect = 'C:\\path' + result = urllib.request.url2pathname(given) + self.assertEqual(expect, result, + 'urllib.request.url2pathname() failed; %s != %s' % + (expect, result)) + +class Utility_Tests(unittest.TestCase): + """Testcase to test the various utility functions in the urllib.""" + + def test_thishost(self): + """Test the urllib.request.thishost utility function returns a tuple""" + self.assertIsInstance(urllib.request.thishost(), tuple) + + +class URLopener_Tests(FakeHTTPMixin, unittest.TestCase): + """Testcase to test the open method of URLopener class.""" + + def test_quoted_open(self): + class DummyURLopener(urllib.request.URLopener): + def open_spam(self, url): + return url + with support.check_warnings( + ('DummyURLopener style of invoking requests is deprecated.', + DeprecationWarning)): + self.assertEqual(DummyURLopener().open( + 'spam://example/ /'),'//example/%20/') + + # test the safe characters are not quoted by urlopen + self.assertEqual(DummyURLopener().open( + "spam://c:|windows%/:=&?~#+!$,;'@()*[]|/path/"), + "//c:|windows%/:=&?~#+!$,;'@()*[]|/path/") + + @support.ignore_warnings(category=DeprecationWarning) + def test_urlopener_retrieve_file(self): + with support.temp_dir() as tmpdir: + fd, tmpfile = tempfile.mkstemp(dir=tmpdir) + os.close(fd) + fileurl = "file:" + urllib.request.pathname2url(tmpfile) + filename, _ = urllib.request.URLopener().retrieve(fileurl) + # Some buildbots have TEMP folder that uses a lowercase drive letter. + self.assertEqual(os.path.normcase(filename), os.path.normcase(tmpfile)) + + @support.ignore_warnings(category=DeprecationWarning) + def test_urlopener_retrieve_remote(self): + url = "http://www.python.org/file.txt" + self.fakehttp(b"HTTP/1.1 200 OK\r\n\r\nHello!") + self.addCleanup(self.unfakehttp) + filename, _ = urllib.request.URLopener().retrieve(url) + self.assertEqual(os.path.splitext(filename)[1], ".txt") + + @support.ignore_warnings(category=DeprecationWarning) + def test_local_file_open(self): + # bpo-35907, CVE-2019-9948: urllib must reject local_file:// scheme + class DummyURLopener(urllib.request.URLopener): + def open_local_file(self, url): + return url + for url in ('local_file://example', 'local-file://example'): + self.assertRaises(OSError, urllib.request.urlopen, url) + self.assertRaises(OSError, urllib.request.URLopener().open, url) + self.assertRaises(OSError, urllib.request.URLopener().retrieve, url) + self.assertRaises(OSError, DummyURLopener().open, url) + self.assertRaises(OSError, DummyURLopener().retrieve, url) + + +# Just commented them out. +# Can't really tell why keep failing in windows and sparc. +# Everywhere else they work ok, but on those machines, sometimes +# fail in one of the tests, sometimes in other. I have a linux, and +# the tests go ok. +# If anybody has one of the problematic environments, please help! +# . Facundo +# +# def server(evt): +# import socket, time +# serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM) +# serv.settimeout(3) +# serv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) +# serv.bind(("", 9093)) +# serv.listen() +# try: +# conn, addr = serv.accept() +# conn.send("1 Hola mundo\n") +# cantdata = 0 +# while cantdata < 13: +# data = conn.recv(13-cantdata) +# cantdata += len(data) +# time.sleep(.3) +# conn.send("2 No more lines\n") +# conn.close() +# except socket.timeout: +# pass +# finally: +# serv.close() +# evt.set() +# +# class FTPWrapperTests(unittest.TestCase): +# +# def setUp(self): +# import ftplib, time, threading +# ftplib.FTP.port = 9093 +# self.evt = threading.Event() +# threading.Thread(target=server, args=(self.evt,)).start() +# time.sleep(.1) +# +# def tearDown(self): +# self.evt.wait() +# +# def testBasic(self): +# # connects +# ftp = urllib.ftpwrapper("myuser", "mypass", "localhost", 9093, []) +# ftp.close() +# +# def testTimeoutNone(self): +# # global default timeout is ignored +# import socket +# self.assertIsNone(socket.getdefaulttimeout()) +# socket.setdefaulttimeout(30) +# try: +# ftp = urllib.ftpwrapper("myuser", "mypass", "localhost", 9093, []) +# finally: +# socket.setdefaulttimeout(None) +# self.assertEqual(ftp.ftp.sock.gettimeout(), 30) +# ftp.close() +# +# def testTimeoutDefault(self): +# # global default timeout is used +# import socket +# self.assertIsNone(socket.getdefaulttimeout()) +# socket.setdefaulttimeout(30) +# try: +# ftp = urllib.ftpwrapper("myuser", "mypass", "localhost", 9093, []) +# finally: +# socket.setdefaulttimeout(None) +# self.assertEqual(ftp.ftp.sock.gettimeout(), 30) +# ftp.close() +# +# def testTimeoutValue(self): +# ftp = urllib.ftpwrapper("myuser", "mypass", "localhost", 9093, [], +# timeout=30) +# self.assertEqual(ftp.ftp.sock.gettimeout(), 30) +# ftp.close() + + +class RequestTests(unittest.TestCase): + """Unit tests for urllib.request.Request.""" + + def test_default_values(self): + Request = urllib.request.Request + request = Request("http://www.python.org") + self.assertEqual(request.get_method(), 'GET') + request = Request("http://www.python.org", {}) + self.assertEqual(request.get_method(), 'POST') + + def test_with_method_arg(self): + Request = urllib.request.Request + request = Request("http://www.python.org", method='HEAD') + self.assertEqual(request.method, 'HEAD') + self.assertEqual(request.get_method(), 'HEAD') + request = Request("http://www.python.org", {}, method='HEAD') + self.assertEqual(request.method, 'HEAD') + self.assertEqual(request.get_method(), 'HEAD') + request = Request("http://www.python.org", method='GET') + self.assertEqual(request.get_method(), 'GET') + request.method = 'HEAD' + self.assertEqual(request.get_method(), 'HEAD') + + +class URL2PathNameTests(unittest.TestCase): + + def test_converting_drive_letter(self): + self.assertEqual(url2pathname("///C|"), 'C:') + self.assertEqual(url2pathname("///C:"), 'C:') + self.assertEqual(url2pathname("///C|/"), 'C:\\') + + def test_converting_when_no_drive_letter(self): + # cannot end a raw string in \ + self.assertEqual(url2pathname("///C/test/"), r'\\\C\test' '\\') + self.assertEqual(url2pathname("////C/test/"), r'\\C\test' '\\') + + def test_simple_compare(self): + self.assertEqual(url2pathname("///C|/foo/bar/spam.foo"), + r'C:\foo\bar\spam.foo') + + def test_non_ascii_drive_letter(self): + self.assertRaises(IOError, url2pathname, "///\u00e8|/") + + def test_roundtrip_url2pathname(self): + list_of_paths = ['C:', + r'\\\C\test\\', + r'C:\foo\bar\spam.foo' + ] + for path in list_of_paths: + self.assertEqual(url2pathname(pathname2url(path)), path) + +class PathName2URLTests(unittest.TestCase): + + def test_converting_drive_letter(self): + self.assertEqual(pathname2url("C:"), '///C:') + self.assertEqual(pathname2url("C:\\"), '///C:') + + def test_converting_when_no_drive_letter(self): + self.assertEqual(pathname2url(r"\\\folder\test" "\\"), + '/////folder/test/') + self.assertEqual(pathname2url(r"\\folder\test" "\\"), + '////folder/test/') + self.assertEqual(pathname2url(r"\folder\test" "\\"), + '/folder/test/') + + def test_simple_compare(self): + self.assertEqual(pathname2url(r'C:\foo\bar\spam.foo'), + "///C:/foo/bar/spam.foo" ) + + def test_long_drive_letter(self): + self.assertRaises(IOError, pathname2url, "XX:\\") + + def test_roundtrip_pathname2url(self): + list_of_paths = ['///C:', + '/////folder/test/', + '///C:/foo/bar/spam.foo'] + for path in list_of_paths: + self.assertEqual(pathname2url(url2pathname(path)), path) + +if __name__ == '__main__': + unittest.main() From 91c58e8b1f3e27cb3eceb31ce134f9c8d645bd7a Mon Sep 17 00:00:00 2001 From: Padraic Fanning Date: Wed, 2 Jun 2021 18:18:21 -0400 Subject: [PATCH 2/6] Update urllib to CPython 3.8.10 --- Lib/urllib/parse.py | 32 +++++++++++++++++++++++++++----- Lib/urllib/request.py | 13 +++++++++++-- 2 files changed, 38 insertions(+), 7 deletions(-) diff --git a/Lib/urllib/parse.py b/Lib/urllib/parse.py index e2b6f133e..f0d9d4d80 100644 --- a/Lib/urllib/parse.py +++ b/Lib/urllib/parse.py @@ -77,6 +77,9 @@ scheme_chars = ('abcdefghijklmnopqrstuvwxyz' '0123456789' '+-.') +# Unsafe bytes to be removed per WHATWG spec +_UNSAFE_URL_BYTES_TO_REMOVE = ['\t', '\r', '\n'] + # XXX: Consider replacing with functools.lru_cache MAX_CACHE_SIZE = 20 _parse_cache = {} @@ -414,6 +417,11 @@ def _checknetloc(netloc): raise ValueError("netloc '" + netloc + "' contains invalid " + "characters under NFKC normalization") +def _remove_unsafe_bytes_from_url(url): + for b in _UNSAFE_URL_BYTES_TO_REMOVE: + url = url.replace(b, "") + return url + def urlsplit(url, scheme='', allow_fragments=True): """Parse a URL into 5 components: :///?# @@ -421,6 +429,8 @@ def urlsplit(url, scheme='', allow_fragments=True): Note that we don't break the components up in smaller bits (e.g. netloc is a single string) and we don't expand % escapes.""" url, scheme, _coerce_result = _coerce_args(url, scheme) + url = _remove_unsafe_bytes_from_url(url) + scheme = _remove_unsafe_bytes_from_url(scheme) allow_fragments = bool(allow_fragments) key = url, scheme, allow_fragments, type(url), type(scheme) cached = _parse_cache.get(key, None) @@ -631,6 +641,8 @@ def unquote(string, encoding='utf-8', errors='replace'): unquote('abc%20def') -> 'abc def'. """ + if isinstance(string, bytes): + raise TypeError('Expected str, got bytes') if '%' not in string: string.split return string @@ -648,7 +660,7 @@ def unquote(string, encoding='utf-8', errors='replace'): def parse_qs(qs, keep_blank_values=False, strict_parsing=False, - encoding='utf-8', errors='replace', max_num_fields=None): + encoding='utf-8', errors='replace', max_num_fields=None, separator='&'): """Parse a query given as a string argument. Arguments: @@ -672,12 +684,15 @@ def parse_qs(qs, keep_blank_values=False, strict_parsing=False, max_num_fields: int. If set, then throws a ValueError if there are more than n fields read by parse_qsl(). + separator: str. The symbol to use for separating the query arguments. + Defaults to &. + Returns a dictionary. """ parsed_result = {} pairs = parse_qsl(qs, keep_blank_values, strict_parsing, encoding=encoding, errors=errors, - max_num_fields=max_num_fields) + max_num_fields=max_num_fields, separator=separator) for name, value in pairs: if name in parsed_result: parsed_result[name].append(value) @@ -687,7 +702,7 @@ def parse_qs(qs, keep_blank_values=False, strict_parsing=False, def parse_qsl(qs, keep_blank_values=False, strict_parsing=False, - encoding='utf-8', errors='replace', max_num_fields=None): + encoding='utf-8', errors='replace', max_num_fields=None, separator='&'): """Parse a query given as a string argument. Arguments: @@ -710,19 +725,26 @@ def parse_qsl(qs, keep_blank_values=False, strict_parsing=False, max_num_fields: int. If set, then throws a ValueError if there are more than n fields read by parse_qsl(). + separator: str. The symbol to use for separating the query arguments. + Defaults to &. + Returns a list, as G-d intended. """ qs, _coerce_result = _coerce_args(qs) + separator, _ = _coerce_args(separator) + + if not separator or (not isinstance(separator, (str, bytes))): + raise ValueError("Separator must be of type string or bytes.") # If max_num_fields is defined then check that the number of fields # is less than max_num_fields. This prevents a memory exhaustion DOS # attack via post bodies with many fields. if max_num_fields is not None: - num_fields = 1 + qs.count('&') + qs.count(';') + num_fields = 1 + qs.count(separator) if max_num_fields < num_fields: raise ValueError('Max number of fields exceeded') - pairs = [s2 for s1 in qs.split('&') for s2 in s1.split(';')] + pairs = [s1 for s1 in qs.split(separator)] r = [] for name_value in pairs: if not name_value and not strict_parsing: diff --git a/Lib/urllib/request.py b/Lib/urllib/request.py index e44073886..5f67077fb 100644 --- a/Lib/urllib/request.py +++ b/Lib/urllib/request.py @@ -779,7 +779,11 @@ def _parse_proxy(proxy): raise ValueError("proxy URL with no authority: %r" % proxy) # We have an authority, so for RFC 3986-compliant URLs (by ss 3. # and 3.3.), path is empty or starts with '/' - end = r_scheme.find("/", 2) + if '@' in r_scheme: + host_separator = r_scheme.find('@') + end = r_scheme.find("/", host_separator) + else: + end = r_scheme.find("/", 2) if end == -1: end = None authority = r_scheme[2:end] @@ -947,7 +951,7 @@ class AbstractBasicAuthHandler: # (single quotes are a violation of the RFC, but appear in the wild) rx = re.compile('(?:^|,)' # start of the string or ',' '[ \t]*' # optional whitespaces - '([^ \t]+)' # scheme like "Basic" + '([^ \t,]+)' # scheme like "Basic" '[ \t]+' # mandatory whitespaces # realm=xxx # realm='xxx' @@ -2604,6 +2608,11 @@ def _proxy_bypass_macosx_sysconf(host, proxy_settings): mask = 8 * (m.group(1).count('.') + 1) else: mask = int(mask[1:]) + + if mask < 0 or mask > 32: + # System libraries ignore invalid prefix lengths + continue + mask = 32 - mask if (hostIP >> mask) == (base >> mask): From 0aa36ff0b053dd711f8d3a4f9771de3032917e9f Mon Sep 17 00:00:00 2001 From: Padraic Fanning Date: Wed, 2 Jun 2021 18:22:27 -0400 Subject: [PATCH 3/6] Mark erroring/failing tests --- Lib/test/test_urllib.py | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/Lib/test/test_urllib.py b/Lib/test/test_urllib.py index 862668715..c47b3b007 100644 --- a/Lib/test/test_urllib.py +++ b/Lib/test/test_urllib.py @@ -360,6 +360,8 @@ class urlopen_HttpTests(unittest.TestCase, FakeHTTPMixin, FakeFTPMixin): finally: self.unfakehttp() + # TODO: RUSTPYTHON + @unittest.expectedFailure @unittest.skipUnless(ssl, "ssl module required") def test_url_path_with_control_char_rejected(self): for char_no in list(range(0, 0x21)) + [0x7f]: @@ -387,6 +389,8 @@ class urlopen_HttpTests(unittest.TestCase, FakeHTTPMixin, FakeFTPMixin): finally: self.unfakehttp() + # TODO: RUSTPYTHON + @unittest.expectedFailure @unittest.skipUnless(ssl, "ssl module required") def test_url_path_with_newline_header_injection_rejected(self): self.fakehttp(b"HTTP/1.1 200 OK\r\n\r\nHello.") @@ -413,6 +417,8 @@ class urlopen_HttpTests(unittest.TestCase, FakeHTTPMixin, FakeFTPMixin): finally: self.unfakehttp() + # TODO: RUSTPYTHON + @unittest.expectedFailure @unittest.skipUnless(ssl, "ssl module required") def test_url_host_with_control_char_rejected(self): for char_no in list(range(0, 0x21)) + [0x7f]: @@ -430,6 +436,8 @@ class urlopen_HttpTests(unittest.TestCase, FakeHTTPMixin, FakeFTPMixin): finally: self.unfakehttp() + # TODO: RUSTPYTHON + @unittest.expectedFailure @unittest.skipUnless(ssl, "ssl module required") def test_url_host_with_newline_header_injection_rejected(self): self.fakehttp(b"HTTP/1.1 200 OK\r\n\r\nHello.") @@ -508,6 +516,8 @@ Connection: close finally: self.unfakehttp() + # TODO: RUSTPYTHON + @unittest.expectedFailure def test_missing_localfile(self): # Test for #10836 with self.assertRaises(urllib.error.URLError) as e: @@ -515,6 +525,8 @@ Connection: close self.assertTrue(e.exception.filename) self.assertTrue(e.exception.reason) + # TODO: RUSTPYTHON + @unittest.expectedFailure def test_file_notexists(self): fd, tmp_file = tempfile.mkstemp() tmp_fileurl = 'file://localhost/' + tmp_file.replace(os.path.sep, '/') @@ -529,6 +541,8 @@ Connection: close with self.assertRaises(urllib.error.URLError): urlopen(tmp_fileurl) + # TODO: RUSTPYTHON + @unittest.expectedFailure def test_ftp_nohost(self): test_ftp_url = 'ftp:///path' with self.assertRaises(urllib.error.URLError) as e: @@ -536,6 +550,8 @@ Connection: close self.assertFalse(e.exception.filename) self.assertTrue(e.exception.reason) + # TODO: RUSTPYTHON + @unittest.expectedFailure def test_ftp_nonexisting(self): with self.assertRaises(urllib.error.URLError) as e: urlopen('ftp://localhost/a/file/which/doesnot/exists.py') @@ -595,6 +611,7 @@ Connection: close ) +@unittest.skip("TODO: RUSTPYTHON, error in setUp(); ValueError: error decoding base64: Invalid byte 32, offset 95.") class urlopen_DataTests(unittest.TestCase): """Test urlopen() opening a data URL.""" @@ -1509,6 +1526,8 @@ class Pathname_Tests(unittest.TestCase): class Utility_Tests(unittest.TestCase): """Testcase to test the various utility functions in the urllib.""" + # TODO: RUSTPYTHON + @unittest.expectedFailure def test_thishost(self): """Test the urllib.request.thishost utility function returns a tuple""" self.assertIsInstance(urllib.request.thishost(), tuple) From 18fb3dc93c56138ce22b958db80a593d901719ba Mon Sep 17 00:00:00 2001 From: Padraic Fanning Date: Wed, 2 Jun 2021 19:18:40 -0400 Subject: [PATCH 4/6] Update test_cgi to CPython 3.8.10 --- Lib/test/test_cgi.py | 43 ++++++++++++++++++++++++++++++++++++++----- 1 file changed, 38 insertions(+), 5 deletions(-) diff --git a/Lib/test/test_cgi.py b/Lib/test/test_cgi.py index 205697cb2..df6bc7c3b 100644 --- a/Lib/test/test_cgi.py +++ b/Lib/test/test_cgi.py @@ -53,12 +53,9 @@ parse_strict_test_cases = [ ("", ValueError("bad query field: ''")), ("&", ValueError("bad query field: ''")), ("&&", ValueError("bad query field: ''")), - (";", ValueError("bad query field: ''")), - (";&;", ValueError("bad query field: ''")), # Should the next few really be valid? ("=", {}), ("=&=", {}), - ("=;=", {}), # This rest seem to make sense ("=a", {'': ['a']}), ("&=a", ValueError("bad query field: ''")), @@ -73,8 +70,6 @@ parse_strict_test_cases = [ ("a=a+b&b=b+c", {'a': ['a b'], 'b': ['b c']}), ("a=a+b&a=b+a", {'a': ['a b', 'b a']}), ("x=1&y=2.0&z=2-3.%2b0", {'x': ['1'], 'y': ['2.0'], 'z': ['2-3.+0']}), - ("x=1;y=2.0&z=2-3.%2b0", {'x': ['1'], 'y': ['2.0'], 'z': ['2-3.+0']}), - ("x=1;y=2.0;z=2-3.%2b0", {'x': ['1'], 'y': ['2.0'], 'z': ['2-3.+0']}), ("Hbc5161168c542333633315dee1182227:key_store_seqid=400006&cuyer=r&view=bustomer&order_id=0bb2e248638833d48cb7fed300000f1b&expire=964546263&lobale=en-US&kid=130003.300038&ss=env", {'Hbc5161168c542333633315dee1182227:key_store_seqid': ['400006'], 'cuyer': ['r'], @@ -128,6 +123,20 @@ class CgiTests(unittest.TestCase): 'file': [b'Testing 123.\n'], 'title': ['']} self.assertEqual(result, expected) + def test_parse_multipart_without_content_length(self): + POSTDATA = '''--JfISa01 +Content-Disposition: form-data; name="submit-name" + +just a string + +--JfISa01-- +''' + fp = BytesIO(POSTDATA.encode('latin1')) + env = {'boundary': 'JfISa01'.encode('latin1')} + result = cgi.parse_multipart(fp, env) + expected = {'submit-name': ['just a string\n']} + self.assertEqual(result, expected) + # TODO RUSTPYTHON - see https://github.com/RustPython/RustPython/issues/935 @unittest.expectedFailure def test_parse_multipart_invalid_encoding(self): @@ -189,6 +198,30 @@ Content-Length: 3 else: self.assertEqual(fs.getvalue(key), expect_val[0]) + def test_separator(self): + parse_semicolon = [ + ("x=1;y=2.0", {'x': ['1'], 'y': ['2.0']}), + ("x=1;y=2.0;z=2-3.%2b0", {'x': ['1'], 'y': ['2.0'], 'z': ['2-3.+0']}), + (";", ValueError("bad query field: ''")), + (";;", ValueError("bad query field: ''")), + ("=;a", ValueError("bad query field: 'a'")), + (";b=a", ValueError("bad query field: ''")), + ("b;=a", ValueError("bad query field: 'b'")), + ("a=a+b;b=b+c", {'a': ['a b'], 'b': ['b c']}), + ("a=a+b;a=b+a", {'a': ['a b', 'b a']}), + ] + for orig, expect in parse_semicolon: + env = {'QUERY_STRING': orig} + fs = cgi.FieldStorage(separator=';', environ=env) + if isinstance(expect, dict): + for key in expect.keys(): + expect_val = expect[key] + self.assertIn(key, fs) + if len(expect_val) > 1: + self.assertEqual(fs.getvalue(key), expect_val) + else: + self.assertEqual(fs.getvalue(key), expect_val[0]) + def test_log(self): cgi.log("Testing") From cf96a42332e6ae9045857918cb4acc888c4ddd0b Mon Sep 17 00:00:00 2001 From: Padraic Fanning Date: Wed, 2 Jun 2021 19:21:21 -0400 Subject: [PATCH 5/6] Mark erroring tests in test_cgi --- Lib/test/test_cgi.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/Lib/test/test_cgi.py b/Lib/test/test_cgi.py index df6bc7c3b..e65553af6 100644 --- a/Lib/test/test_cgi.py +++ b/Lib/test/test_cgi.py @@ -123,6 +123,8 @@ class CgiTests(unittest.TestCase): 'file': [b'Testing 123.\n'], 'title': ['']} self.assertEqual(result, expected) + # TODO: RUSTPYTHON + @unittest.expectedFailure def test_parse_multipart_without_content_length(self): POSTDATA = '''--JfISa01 Content-Disposition: form-data; name="submit-name" @@ -198,6 +200,8 @@ Content-Length: 3 else: self.assertEqual(fs.getvalue(key), expect_val[0]) + # TODO: RUSTPYTHON + @unittest.expectedFailure def test_separator(self): parse_semicolon = [ ("x=1;y=2.0", {'x': ['1'], 'y': ['2.0']}), From 4b5117c80c8d4b116b7fe7e0228da575cd562b0f Mon Sep 17 00:00:00 2001 From: Padraic Fanning Date: Wed, 2 Jun 2021 19:22:51 -0400 Subject: [PATCH 6/6] Update test_urlparse to CPython 3.8.10 --- Lib/test/test_urlparse.py | 120 +++++++++++++++++++++++++++++++------- 1 file changed, 98 insertions(+), 22 deletions(-) diff --git a/Lib/test/test_urlparse.py b/Lib/test/test_urlparse.py index 61234b764..60bd01db1 100644 --- a/Lib/test/test_urlparse.py +++ b/Lib/test/test_urlparse.py @@ -32,16 +32,10 @@ parse_qsl_test_cases = [ (b"&a=b", [(b'a', b'b')]), (b"a=a+b&b=b+c", [(b'a', b'a b'), (b'b', b'b c')]), (b"a=1&a=2", [(b'a', b'1'), (b'a', b'2')]), - (";", []), - (";;", []), - (";a=b", [('a', 'b')]), - ("a=a+b;b=b+c", [('a', 'a b'), ('b', 'b c')]), - ("a=1;a=2", [('a', '1'), ('a', '2')]), - (b";", []), - (b";;", []), - (b";a=b", [(b'a', b'b')]), - (b"a=a+b;b=b+c", [(b'a', b'a b'), (b'b', b'b c')]), - (b"a=1;a=2", [(b'a', b'1'), (b'a', b'2')]), + (";a=b", [(';a', 'b')]), + ("a=a+b;b=b+c", [('a', 'a b;b=b c')]), + (b";a=b", [(b';a', b'b')]), + (b"a=a+b;b=b+c", [(b'a', b'a b;b=b c')]), ] # Each parse_qs testcase is a two-tuple that contains @@ -68,16 +62,10 @@ parse_qs_test_cases = [ (b"&a=b", {b'a': [b'b']}), (b"a=a+b&b=b+c", {b'a': [b'a b'], b'b': [b'b c']}), (b"a=1&a=2", {b'a': [b'1', b'2']}), - (";", {}), - (";;", {}), - (";a=b", {'a': ['b']}), - ("a=a+b;b=b+c", {'a': ['a b'], 'b': ['b c']}), - ("a=1;a=2", {'a': ['1', '2']}), - (b";", {}), - (b";;", {}), - (b";a=b", {b'a': [b'b']}), - (b"a=a+b;b=b+c", {b'a': [b'a b'], b'b': [b'b c']}), - (b"a=1;a=2", {b'a': [b'1', b'2']}), + (";a=b", {';a': ['b']}), + ("a=a+b;b=b+c", {'a': ['a b;b=b c']}), + (b";a=b", {b';a': [b'b']}), + (b"a=a+b;b=b+c", {b'a':[ b'a b;b=b c']}), ] class UrlParseTestCase(unittest.TestCase): @@ -624,6 +612,54 @@ class UrlParseTestCase(unittest.TestCase): with self.assertRaisesRegex(ValueError, "out of range"): p.port + def test_urlsplit_remove_unsafe_bytes(self): + # Remove ASCII tabs and newlines from input, for http common case scenario. + url = "h\nttp://www.python\n.org\t/java\nscript:\talert('msg\r\n')/?query\n=\tsomething#frag\nment" + p = urllib.parse.urlsplit(url) + self.assertEqual(p.scheme, "http") + self.assertEqual(p.netloc, "www.python.org") + self.assertEqual(p.path, "/javascript:alert('msg')/") + self.assertEqual(p.query, "query=something") + self.assertEqual(p.fragment, "fragment") + self.assertEqual(p.username, None) + self.assertEqual(p.password, None) + self.assertEqual(p.hostname, "www.python.org") + self.assertEqual(p.port, None) + self.assertEqual(p.geturl(), "http://www.python.org/javascript:alert('msg')/?query=something#fragment") + + # Remove ASCII tabs and newlines from input as bytes, for http common case scenario. + url = b"h\nttp://www.python\n.org\t/java\nscript:\talert('msg\r\n')/?query\n=\tsomething#frag\nment" + p = urllib.parse.urlsplit(url) + self.assertEqual(p.scheme, b"http") + self.assertEqual(p.netloc, b"www.python.org") + self.assertEqual(p.path, b"/javascript:alert('msg')/") + self.assertEqual(p.query, b"query=something") + self.assertEqual(p.fragment, b"fragment") + self.assertEqual(p.username, None) + self.assertEqual(p.password, None) + self.assertEqual(p.hostname, b"www.python.org") + self.assertEqual(p.port, None) + self.assertEqual(p.geturl(), b"http://www.python.org/javascript:alert('msg')/?query=something#fragment") + + # any scheme + url = "x-new-scheme\t://www.python\n.org\t/java\nscript:\talert('msg\r\n')/?query\n=\tsomething#frag\nment" + p = urllib.parse.urlsplit(url) + self.assertEqual(p.geturl(), "x-new-scheme://www.python.org/javascript:alert('msg')/?query=something#fragment") + + # Remove ASCII tabs and newlines from input as bytes, any scheme. + url = b"x-new-scheme\t://www.python\n.org\t/java\nscript:\talert('msg\r\n')/?query\n=\tsomething#frag\nment" + p = urllib.parse.urlsplit(url) + self.assertEqual(p.geturl(), b"x-new-scheme://www.python.org/javascript:alert('msg')/?query=something#fragment") + + # Unsafe bytes is not returned from urlparse cache. + # scheme is stored after parsing, sending an scheme with unsafe bytes *will not* return an unsafe scheme + url = "https://www.python\n.org\t/java\nscript:\talert('msg\r\n')/?query\n=\tsomething#frag\nment" + scheme = "htt\nps" + for _ in range(2): + p = urllib.parse.urlsplit(url, scheme=scheme) + self.assertEqual(p.scheme, "https") + self.assertEqual(p.geturl(), "https://www.python.org/javascript:alert('msg')/?query=something#fragment") + def test_attributes_bad_port(self): """Check handling of invalid ports.""" for bytes in (False, True): @@ -884,10 +920,50 @@ class UrlParseTestCase(unittest.TestCase): def test_parse_qsl_max_num_fields(self): with self.assertRaises(ValueError): urllib.parse.parse_qs('&'.join(['a=a']*11), max_num_fields=10) - with self.assertRaises(ValueError): - urllib.parse.parse_qs(';'.join(['a=a']*11), max_num_fields=10) urllib.parse.parse_qs('&'.join(['a=a']*10), max_num_fields=10) + def test_parse_qs_separator(self): + parse_qs_semicolon_cases = [ + (";", {}), + (";;", {}), + (";a=b", {'a': ['b']}), + ("a=a+b;b=b+c", {'a': ['a b'], 'b': ['b c']}), + ("a=1;a=2", {'a': ['1', '2']}), + (b";", {}), + (b";;", {}), + (b";a=b", {b'a': [b'b']}), + (b"a=a+b;b=b+c", {b'a': [b'a b'], b'b': [b'b c']}), + (b"a=1;a=2", {b'a': [b'1', b'2']}), + ] + for orig, expect in parse_qs_semicolon_cases: + with self.subTest(f"Original: {orig!r}, Expected: {expect!r}"): + result = urllib.parse.parse_qs(orig, separator=';') + self.assertEqual(result, expect, "Error parsing %r" % orig) + result_bytes = urllib.parse.parse_qs(orig, separator=b';') + self.assertEqual(result_bytes, expect, "Error parsing %r" % orig) + + + def test_parse_qsl_separator(self): + parse_qsl_semicolon_cases = [ + (";", []), + (";;", []), + (";a=b", [('a', 'b')]), + ("a=a+b;b=b+c", [('a', 'a b'), ('b', 'b c')]), + ("a=1;a=2", [('a', '1'), ('a', '2')]), + (b";", []), + (b";;", []), + (b";a=b", [(b'a', b'b')]), + (b"a=a+b;b=b+c", [(b'a', b'a b'), (b'b', b'b c')]), + (b"a=1;a=2", [(b'a', b'1'), (b'a', b'2')]), + ] + for orig, expect in parse_qsl_semicolon_cases: + with self.subTest(f"Original: {orig!r}, Expected: {expect!r}"): + result = urllib.parse.parse_qsl(orig, separator=';') + self.assertEqual(result, expect, "Error parsing %r" % orig) + result_bytes = urllib.parse.parse_qsl(orig, separator=b';') + self.assertEqual(result_bytes, expect, "Error parsing %r" % orig) + + def test_urlencode_sequences(self): # Other tests incidentally urlencode things; test non-covered cases: # Sequence and object values.