Update urllib from v3.14.2

This commit is contained in:
Jeong YunWon
2026-01-24 22:11:53 +09:00
parent 77add04d3d
commit 448658e49d
10 changed files with 1233 additions and 1271 deletions

View File

@@ -16,6 +16,14 @@ class BaseRobotTest:
bad = []
site_maps = None
def __init_subclass__(cls):
super().__init_subclass__()
# Remove tests that do nothing.
if not cls.good:
cls.test_good_urls = None
if not cls.bad:
cls.test_bad_urls = None
def setUp(self):
lines = io.StringIO(self.robots_txt).readlines()
self.parser = urllib.robotparser.RobotFileParser()
@@ -231,9 +239,16 @@ class DisallowQueryStringTest(BaseRobotTest, unittest.TestCase):
robots_txt = """\
User-agent: *
Disallow: /some/path?name=value
Disallow: /another/path?
Disallow: /yet/one/path?name=value&more
"""
good = ['/some/path']
bad = ['/some/path?name=value']
good = ['/some/path', '/some/path?',
'/some/path%3Fname=value', '/some/path?name%3Dvalue',
'/another/path', '/another/path%3F',
'/yet/one/path?name=value%26more']
bad = ['/some/path?name=value'
'/another/path?', '/another/path?name=value',
'/yet/one/path?name=value&more']
class UseFirstUserAgentWildcardTest(BaseRobotTest, unittest.TestCase):
@@ -249,19 +264,79 @@ Disallow: /another/path
bad = ['/some/path']
class EmptyQueryStringTest(BaseRobotTest, unittest.TestCase):
# normalize the URL first (#17403)
class PercentEncodingTest(BaseRobotTest, unittest.TestCase):
robots_txt = """\
User-agent: *
Allow: /some/path?
Disallow: /another/path?
"""
good = ['/some/path?']
bad = ['/another/path?']
@unittest.expectedFailure # TODO: RUSTPYTHON; self.assertFalse(self.parser.can_fetch(agent, url))\nAssertionError: True is not false
def test_bad_urls(self):
super().test_bad_urls()
Disallow: /a1/Z-._~ # unreserved characters
Disallow: /a2/%5A%2D%2E%5F%7E # percent-encoded unreserved characters
Disallow: /u1/%F0%9F%90%8D # percent-encoded ASCII Unicode character
Disallow: /u2/%f0%9f%90%8d
Disallow: /u3/\U0001f40d # raw non-ASCII Unicode character
Disallow: /v1/%F0 # percent-encoded non-ASCII octet
Disallow: /v2/%f0
Disallow: /v3/\udcf0 # raw non-ASCII octet
Disallow: /p1%xy # raw percent
Disallow: /p2%
Disallow: /p3%25xy # percent-encoded percent
Disallow: /p4%2525xy # double percent-encoded percent
Disallow: /john%20smith # space
Disallow: /john doe
Disallow: /trailingspace%20
Disallow: /question%3Fq=v # not query
Disallow: /hash%23f # not fragment
Disallow: /dollar%24
Disallow: /asterisk%2A
Disallow: /sub/dir
Disallow: /slash%2F
Disallow: /query/question?q=%3F
Disallow: /query/raw/question?q=?
Disallow: /query/eq?q%3Dv
Disallow: /query/amp?q=v%26a
"""
good = [
'/u1/%F0', '/u1/%f0',
'/u2/%F0', '/u2/%f0',
'/u3/%F0', '/u3/%f0',
'/p1%2525xy', '/p2%f0', '/p3%2525xy', '/p4%xy', '/p4%25xy',
'/question?q=v',
'/dollar', '/asterisk',
'/query/eq?q=v',
'/query/amp?q=v&a',
]
bad = [
'/a1/Z-._~', '/a1/%5A%2D%2E%5F%7E',
'/a2/Z-._~', '/a2/%5A%2D%2E%5F%7E',
'/u1/%F0%9F%90%8D', '/u1/%f0%9f%90%8d', '/u1/\U0001f40d',
'/u2/%F0%9F%90%8D', '/u2/%f0%9f%90%8d', '/u2/\U0001f40d',
'/u3/%F0%9F%90%8D', '/u3/%f0%9f%90%8d', '/u3/\U0001f40d',
'/v1/%F0', '/v1/%f0', '/v1/\udcf0', '/v1/\U0001f40d',
'/v2/%F0', '/v2/%f0', '/v2/\udcf0', '/v2/\U0001f40d',
'/v3/%F0', '/v3/%f0', '/v3/\udcf0', '/v3/\U0001f40d',
'/p1%xy', '/p1%25xy',
'/p2%', '/p2%25', '/p2%2525', '/p2%xy',
'/p3%xy', '/p3%25xy',
'/p4%2525xy',
'/john%20smith', '/john smith',
'/john%20doe', '/john doe',
'/trailingspace%20', '/trailingspace ',
'/question%3Fq=v',
'/hash#f', '/hash%23f',
'/dollar$', '/dollar%24',
'/asterisk*', '/asterisk%2A',
'/sub/dir', '/sub%2Fdir',
'/slash%2F', '/slash/',
'/query/question?q=?', '/query/question?q=%3F',
'/query/raw/question?q=?', '/query/raw/question?q=%3F',
'/query/eq?q%3Dv',
'/query/amp?q=v%26a',
]
# other reserved characters
for c in ":/#[]@!$&'()*+,;=":
robots_txt += f'Disallow: /raw{c}\nDisallow: /pc%{ord(c):02X}\n'
bad.append(f'/raw{c}')
bad.append(f'/raw%{ord(c):02X}')
bad.append(f'/pc{c}')
bad.append(f'/pc%{ord(c):02X}')
class DefaultEntryTest(BaseRequestRateTest, unittest.TestCase):
@@ -303,22 +378,17 @@ Disallow: /cyberworld/map/\
self.assertEqual(str(self.parser), self.expected_output)
class RobotHandler(BaseHTTPRequestHandler):
def do_GET(self):
self.send_error(403, "Forbidden access")
def log_message(self, format, *args):
pass
class PasswordProtectedSiteTestCase(unittest.TestCase):
@unittest.skipUnless(
support.has_socket_support,
"Socket server requires working socket."
)
class BaseLocalNetworkTestCase:
def setUp(self):
# clear _opener global variable
self.addCleanup(urllib.request.urlcleanup)
self.server = HTTPServer((socket_helper.HOST, 0), RobotHandler)
self.server = HTTPServer((socket_helper.HOST, 0), self.RobotHandler)
self.t = threading.Thread(
name='HTTPServer serving',
@@ -335,6 +405,57 @@ class PasswordProtectedSiteTestCase(unittest.TestCase):
self.t.join()
self.server.server_close()
SAMPLE_ROBOTS_TXT = b'''\
User-agent: test_robotparser
Disallow: /utf8/\xf0\x9f\x90\x8d
Disallow: /non-utf8/\xf0
Disallow: //[spam]/path
'''
class LocalNetworkTestCase(BaseLocalNetworkTestCase, unittest.TestCase):
class RobotHandler(BaseHTTPRequestHandler):
def do_GET(self):
self.send_response(200)
self.end_headers()
self.wfile.write(SAMPLE_ROBOTS_TXT)
def log_message(self, format, *args):
pass
@threading_helper.reap_threads
def testRead(self):
# Test that reading a weird robots.txt doesn't fail.
addr = self.server.server_address
url = f'http://{socket_helper.HOST}:{addr[1]}'
robots_url = url + '/robots.txt'
parser = urllib.robotparser.RobotFileParser()
parser.set_url(robots_url)
parser.read()
# And it can even interpret the weird paths in some reasonable way.
agent = 'test_robotparser'
self.assertTrue(parser.can_fetch(agent, robots_url))
self.assertTrue(parser.can_fetch(agent, url + '/utf8/'))
self.assertFalse(parser.can_fetch(agent, url + '/utf8/\U0001f40d'))
self.assertFalse(parser.can_fetch(agent, url + '/utf8/%F0%9F%90%8D'))
self.assertFalse(parser.can_fetch(agent, url + '/utf8/\U0001f40d'))
self.assertTrue(parser.can_fetch(agent, url + '/non-utf8/'))
self.assertFalse(parser.can_fetch(agent, url + '/non-utf8/%F0'))
self.assertFalse(parser.can_fetch(agent, url + '/non-utf8/\U0001f40d'))
self.assertFalse(parser.can_fetch(agent, url + '/%2F[spam]/path'))
class PasswordProtectedSiteTestCase(BaseLocalNetworkTestCase, unittest.TestCase):
class RobotHandler(BaseHTTPRequestHandler):
def do_GET(self):
self.send_error(403, "Forbidden access")
def log_message(self, format, *args):
pass
@threading_helper.reap_threads
def testPasswordProtectedSite(self):
addr = self.server.server_address
@@ -346,6 +467,7 @@ class PasswordProtectedSiteTestCase(unittest.TestCase):
self.assertFalse(parser.can_fetch("*", robots_url))
@support.requires_working_socket()
class NetworkTestCase(unittest.TestCase):
base_url = 'http://www.pythontest.net/'

View File

@@ -7,13 +7,11 @@ import http.client
import email.message
import io
import unittest
from unittest.mock import patch
from test import support
from test.support import os_helper
from test.support import socket_helper
from test.support import warnings_helper
from test.support.testcase import ExtraAssertions
import os
import socket
try:
import ssl
except ImportError:
@@ -21,7 +19,6 @@ except ImportError:
import sys
import tempfile
from base64 import b64encode
import collections
@@ -36,32 +33,6 @@ def hexescape(char):
hex_repr = "0%s" % hex_repr
return "%" + hex_repr
# Shortcut for testing FancyURLopener
_urlopener = None
def urlopen(url, data=None, proxies=None):
"""urlopen(url [, data]) -> open file-like object"""
global _urlopener
if proxies is not None:
opener = urllib.request.FancyURLopener(proxies=proxies)
elif not _urlopener:
opener = FancyURLopener()
_urlopener = opener
else:
opener = _urlopener
if data is None:
return opener.open(url)
else:
return opener.open(url, data)
def FancyURLopener():
with warnings_helper.check_warnings(
('FancyURLopener style of invoking requests is deprecated.',
DeprecationWarning)):
return urllib.request.FancyURLopener()
def fakehttp(fakedata, mock_close=False):
class FakeSocket(io.BytesIO):
@@ -120,27 +91,7 @@ class FakeHTTPMixin(object):
http.client.HTTPConnection = self._connection_class
class FakeFTPMixin(object):
def fakeftp(self):
class FakeFtpWrapper(object):
def __init__(self, user, passwd, host, port, dirs, timeout=None,
persistent=True):
pass
def retrfile(self, file, type):
return io.BytesIO(), 0
def close(self):
pass
self._ftpwrapper_class = urllib.request.ftpwrapper
urllib.request.ftpwrapper = FakeFtpWrapper
def unfakeftp(self):
urllib.request.ftpwrapper = self._ftpwrapper_class
class urlopen_FileTests(unittest.TestCase, ExtraAssertions):
class urlopen_FileTests(unittest.TestCase):
"""Test urlopen() opening a temporary file.
Try to test as much functionality as possible so as to cut down on reliance
@@ -159,7 +110,7 @@ class urlopen_FileTests(unittest.TestCase, ExtraAssertions):
f.close()
self.pathname = os_helper.TESTFN
self.quoted_pathname = urllib.parse.quote(os.fsencode(self.pathname))
self.returned_obj = urlopen("file:%s" % self.quoted_pathname)
self.returned_obj = urllib.request.urlopen("file:%s" % self.quoted_pathname)
def tearDown(self):
"""Shut down the open object"""
@@ -204,7 +155,7 @@ class urlopen_FileTests(unittest.TestCase, ExtraAssertions):
self.assertIsInstance(self.returned_obj.headers, email.message.Message)
def test_url(self):
self.assertEqual(self.returned_obj.url, self.quoted_pathname)
self.assertEqual(self.returned_obj.url, "file:" + self.quoted_pathname)
def test_status(self):
self.assertIsNone(self.returned_obj.status)
@@ -213,7 +164,7 @@ class urlopen_FileTests(unittest.TestCase, ExtraAssertions):
self.assertIsInstance(self.returned_obj.info(), email.message.Message)
def test_geturl(self):
self.assertEqual(self.returned_obj.geturl(), self.quoted_pathname)
self.assertEqual(self.returned_obj.geturl(), "file:" + self.quoted_pathname)
def test_getcode(self):
self.assertIsNone(self.returned_obj.getcode())
@@ -230,6 +181,16 @@ class urlopen_FileTests(unittest.TestCase, ExtraAssertions):
def test_relativelocalfile(self):
self.assertRaises(ValueError,urllib.request.urlopen,'./' + self.pathname)
def test_remote_authority(self):
# Test for GH-90812.
url = 'file://pythontest.net/foo/bar'
with self.assertRaises(urllib.error.URLError) as e:
urllib.request.urlopen(url)
if os.name == 'nt':
self.assertEqual(e.exception.filename, r'\\pythontest.net\foo\bar')
else:
self.assertEqual(e.exception.reason, 'file:// scheme is supported only on localhost')
class ProxyTests(unittest.TestCase):
@@ -338,13 +299,13 @@ class ProxyTests_withOrderedEnv(unittest.TestCase):
self.assertEqual('http://somewhere:3128', proxies['http'])
class urlopen_HttpTests(unittest.TestCase, FakeHTTPMixin, FakeFTPMixin):
class urlopen_HttpTests(unittest.TestCase, FakeHTTPMixin):
"""Test urlopen() opening a fake http connection."""
def check_read(self, ver):
self.fakehttp(b"HTTP/" + ver + b" 200 OK\r\n\r\nHello!")
try:
fp = urlopen("http://python.org/")
fp = urllib.request.urlopen("http://python.org/")
self.assertEqual(fp.readline(), b"Hello!")
self.assertEqual(fp.readline(), b"")
self.assertEqual(fp.geturl(), 'http://python.org/')
@@ -365,8 +326,8 @@ class urlopen_HttpTests(unittest.TestCase, FakeHTTPMixin, FakeFTPMixin):
def test_willclose(self):
self.fakehttp(b"HTTP/1.1 200 OK\r\n\r\nHello!")
try:
resp = urlopen("http://www.python.org")
self.assertTrue(resp.fp.will_close)
resp = urllib.request.urlopen("http://www.python.org")
self.assertTrue(resp.will_close)
finally:
self.unfakehttp()
@@ -391,9 +352,6 @@ class urlopen_HttpTests(unittest.TestCase, FakeHTTPMixin, FakeFTPMixin):
with self.assertRaisesRegex(
InvalidURL, f"contain control.*{escaped_char_repr}"):
urllib.request.urlopen(f"https:{schemeless_url}")
# This code path quotes the URL so there is no injection.
resp = urlopen(f"http:{schemeless_url}")
self.assertNotIn(char, resp.geturl())
finally:
self.unfakehttp()
@@ -415,11 +373,6 @@ class urlopen_HttpTests(unittest.TestCase, FakeHTTPMixin, FakeFTPMixin):
urllib.request.urlopen(f"http:{schemeless_url}")
with self.assertRaisesRegex(InvalidURL, r"contain control.*\\n"):
urllib.request.urlopen(f"https:{schemeless_url}")
# This code path quotes the URL so there is no injection.
resp = urlopen(f"http:{schemeless_url}")
self.assertNotIn(' ', resp.geturl())
self.assertNotIn('\r', resp.geturl())
self.assertNotIn('\n', resp.geturl())
finally:
self.unfakehttp()
@@ -434,9 +387,9 @@ class urlopen_HttpTests(unittest.TestCase, FakeHTTPMixin, FakeFTPMixin):
InvalidURL = http.client.InvalidURL
with self.assertRaisesRegex(
InvalidURL, f"contain control.*{escaped_char_repr}"):
urlopen(f"http:{schemeless_url}")
urllib.request.urlopen(f"http:{schemeless_url}")
with self.assertRaisesRegex(InvalidURL, f"contain control.*{escaped_char_repr}"):
urlopen(f"https:{schemeless_url}")
urllib.request.urlopen(f"https:{schemeless_url}")
finally:
self.unfakehttp()
@@ -449,9 +402,9 @@ class urlopen_HttpTests(unittest.TestCase, FakeHTTPMixin, FakeFTPMixin):
InvalidURL = http.client.InvalidURL
with self.assertRaisesRegex(
InvalidURL, r"contain control.*\\r"):
urlopen(f"http:{schemeless_url}")
urllib.request.urlopen(f"http:{schemeless_url}")
with self.assertRaisesRegex(InvalidURL, r"contain control.*\\n"):
urlopen(f"https:{schemeless_url}")
urllib.request.urlopen(f"https:{schemeless_url}")
finally:
self.unfakehttp()
@@ -501,7 +454,7 @@ Content-Type: text/html; charset=iso-8859-1
def test_redirect_limit_independent(self):
# Ticket #12923: make sure independent requests each use their
# own retry limit.
for i in range(FancyURLopener().maxtries):
for i in range(urllib.request.HTTPRedirectHandler.max_redirections):
self.fakehttp(b'''HTTP/1.1 302 Found
Location: file://guidocomputer.athome.com:/python/license
Connection: close
@@ -518,89 +471,49 @@ Connection: close
# data. (#1680230)
self.fakehttp(b'')
try:
self.assertRaises(OSError, urlopen, "http://something")
self.assertRaises(OSError, urllib.request.urlopen, "http://something")
finally:
self.unfakehttp()
def test_missing_localfile(self):
# Test for #10836
with self.assertRaises(urllib.error.URLError) as e:
urlopen('file://localhost/a/file/which/doesnot/exists.py')
urllib.request.urlopen('file://localhost/a/file/which/doesnot/exists.py')
self.assertTrue(e.exception.filename)
self.assertTrue(e.exception.reason)
def test_file_notexists(self):
fd, tmp_file = tempfile.mkstemp()
tmp_fileurl = 'file://localhost/' + tmp_file.replace(os.path.sep, '/')
tmp_file_canon_url = urllib.request.pathname2url(tmp_file, add_scheme=True)
parsed = urllib.parse.urlsplit(tmp_file_canon_url)
tmp_fileurl = parsed._replace(netloc='localhost').geturl()
try:
self.assertTrue(os.path.exists(tmp_file))
with urlopen(tmp_fileurl) as fobj:
with urllib.request.urlopen(tmp_fileurl) as fobj:
self.assertTrue(fobj)
self.assertEqual(fobj.url, tmp_file_canon_url)
finally:
os.close(fd)
os.unlink(tmp_file)
self.assertFalse(os.path.exists(tmp_file))
with self.assertRaises(urllib.error.URLError):
urlopen(tmp_fileurl)
urllib.request.urlopen(tmp_fileurl)
def test_ftp_nohost(self):
test_ftp_url = 'ftp:///path'
with self.assertRaises(urllib.error.URLError) as e:
urlopen(test_ftp_url)
urllib.request.urlopen(test_ftp_url)
self.assertFalse(e.exception.filename)
self.assertTrue(e.exception.reason)
def test_ftp_nonexisting(self):
with self.assertRaises(urllib.error.URLError) as e:
urlopen('ftp://localhost/a/file/which/doesnot/exists.py')
urllib.request.urlopen('ftp://localhost/a/file/which/doesnot/exists.py')
self.assertFalse(e.exception.filename)
self.assertTrue(e.exception.reason)
@patch.object(urllib.request, 'MAXFTPCACHE', 0)
def test_ftp_cache_pruning(self):
self.fakeftp()
try:
urllib.request.ftpcache['test'] = urllib.request.ftpwrapper('user', 'pass', 'localhost', 21, [])
urlopen('ftp://localhost')
finally:
self.unfakeftp()
def test_userpass_inurl(self):
self.fakehttp(b"HTTP/1.0 200 OK\r\n\r\nHello!")
try:
fp = urlopen("http://user:pass@python.org/")
self.assertEqual(fp.readline(), b"Hello!")
self.assertEqual(fp.readline(), b"")
self.assertEqual(fp.geturl(), 'http://user:pass@python.org/')
self.assertEqual(fp.getcode(), 200)
finally:
self.unfakehttp()
def test_userpass_inurl_w_spaces(self):
self.fakehttp(b"HTTP/1.0 200 OK\r\n\r\nHello!")
try:
userpass = "a b:c d"
url = "http://{}@python.org/".format(userpass)
fakehttp_wrapper = http.client.HTTPConnection
authorization = ("Authorization: Basic %s\r\n" %
b64encode(userpass.encode("ASCII")).decode("ASCII"))
fp = urlopen(url)
# The authorization header must be in place
self.assertIn(authorization, fakehttp_wrapper.buf.decode("UTF-8"))
self.assertEqual(fp.readline(), b"Hello!")
self.assertEqual(fp.readline(), b"")
# the spaces are quoted in URL so no match
self.assertNotEqual(fp.geturl(), url)
self.assertEqual(fp.getcode(), 200)
finally:
self.unfakehttp()
def test_URLopener_deprecation(self):
with warnings_helper.check_warnings(('',DeprecationWarning)):
urllib.request.URLopener()
class urlopen_DataTests(unittest.TestCase, ExtraAssertions):
class urlopen_DataTests(unittest.TestCase):
"""Test urlopen() opening a data URL."""
def setUp(self):
@@ -713,7 +626,7 @@ class urlretrieve_FileTests(unittest.TestCase):
def constructLocalFileUrl(self, filePath):
filePath = os.path.abspath(filePath)
return "file://%s" % urllib.request.pathname2url(filePath)
return urllib.request.pathname2url(filePath, add_scheme=True)
def createNewTempFile(self, data=b""):
"""Creates a new temporary file containing the specified data,
@@ -1518,6 +1431,32 @@ class Pathname_Tests(unittest.TestCase):
"url2pathname() failed; %s != %s" %
(expect, result))
def test_pathname2url(self):
# Test cases common to Windows and POSIX.
fn = urllib.request.pathname2url
sep = os.path.sep
self.assertEqual(fn(''), '')
self.assertEqual(fn(sep), '///')
self.assertEqual(fn('a'), 'a')
self.assertEqual(fn(f'a{sep}b.c'), 'a/b.c')
self.assertEqual(fn(f'{sep}a{sep}b.c'), '///a/b.c')
self.assertEqual(fn(f'{sep}a{sep}b%#c'), '///a/b%25%23c')
def test_pathname2url_add_scheme(self):
sep = os.path.sep
subtests = [
('', 'file:'),
(sep, 'file:///'),
('a', 'file:a'),
(f'a{sep}b.c', 'file:a/b.c'),
(f'{sep}a{sep}b.c', 'file:///a/b.c'),
(f'{sep}a{sep}b%#c', 'file:///a/b%25%23c'),
]
for path, expected_url in subtests:
with self.subTest(path=path):
self.assertEqual(
urllib.request.pathname2url(path, add_scheme=True), expected_url)
@unittest.skipUnless(sys.platform == 'win32',
'test specific to Windows pathnames.')
def test_pathname2url_win(self):
@@ -1527,16 +1466,18 @@ class Pathname_Tests(unittest.TestCase):
self.assertEqual(fn('\\\\?\\unc\\server\\share\\dir'), '//server/share/dir')
self.assertEqual(fn("C:"), '///C:')
self.assertEqual(fn("C:\\"), '///C:/')
self.assertEqual(fn('c:\\a\\b.c'), '///c:/a/b.c')
self.assertEqual(fn('C:\\a\\b.c'), '///C:/a/b.c')
self.assertEqual(fn('C:\\a\\b.c\\'), '///C:/a/b.c/')
self.assertEqual(fn('C:\\a\\\\b.c'), '///C:/a//b.c')
self.assertEqual(fn('C:\\a\\b%#c'), '///C:/a/b%25%23c')
self.assertEqual(fn('C:\\a\\b\xe9'), '///C:/a/b%C3%A9')
self.assertEqual(fn('C:\\foo\\bar\\spam.foo'), "///C:/foo/bar/spam.foo")
# Long drive letter
self.assertRaises(IOError, fn, "XX:\\")
# NTFS alternate data streams
self.assertEqual(fn('C:\\foo:bar'), '///C:/foo%3Abar')
self.assertEqual(fn('foo:bar'), 'foo%3Abar')
# No drive letter
self.assertEqual(fn("\\folder\\test\\"), '/folder/test/')
self.assertEqual(fn("\\folder\\test\\"), '///folder/test/')
self.assertEqual(fn("\\\\folder\\test\\"), '//folder/test/')
self.assertEqual(fn("\\\\\\folder\\test\\"), '///folder/test/')
self.assertEqual(fn('\\\\some\\share\\'), '//some/share/')
@@ -1549,7 +1490,7 @@ class Pathname_Tests(unittest.TestCase):
self.assertEqual(fn('//?/unc/server/share/dir'), '//server/share/dir')
# Round-tripping
urls = ['///C:',
'/folder/test/',
'///folder/test/',
'///C:/foo/bar/spam.foo']
for url in urls:
self.assertEqual(fn(urllib.request.url2pathname(url)), url)
@@ -1558,12 +1499,9 @@ class Pathname_Tests(unittest.TestCase):
'test specific to POSIX pathnames')
def test_pathname2url_posix(self):
fn = urllib.request.pathname2url
self.assertEqual(fn('/'), '/')
self.assertEqual(fn('/a/b.c'), '/a/b.c')
self.assertEqual(fn('//a/b.c'), '////a/b.c')
self.assertEqual(fn('///a/b.c'), '/////a/b.c')
self.assertEqual(fn('////a/b.c'), '//////a/b.c')
self.assertEqual(fn('/a/b%#c'), '/a/b%25%23c')
@unittest.skipUnless(os_helper.FS_NONASCII, 'need os_helper.FS_NONASCII')
def test_pathname2url_nonascii(self):
@@ -1572,11 +1510,90 @@ class Pathname_Tests(unittest.TestCase):
url = urllib.parse.quote(os_helper.FS_NONASCII, encoding=encoding, errors=errors)
self.assertEqual(urllib.request.pathname2url(os_helper.FS_NONASCII), url)
def test_url2pathname(self):
# Test cases common to Windows and POSIX.
fn = urllib.request.url2pathname
sep = os.path.sep
self.assertEqual(fn(''), '')
self.assertEqual(fn('/'), f'{sep}')
self.assertEqual(fn('///'), f'{sep}')
self.assertEqual(fn('////'), f'{sep}{sep}')
self.assertEqual(fn('foo'), 'foo')
self.assertEqual(fn('foo/bar'), f'foo{sep}bar')
self.assertEqual(fn('/foo/bar'), f'{sep}foo{sep}bar')
self.assertEqual(fn('//localhost/foo/bar'), f'{sep}foo{sep}bar')
self.assertEqual(fn('///foo/bar'), f'{sep}foo{sep}bar')
self.assertEqual(fn('////foo/bar'), f'{sep}{sep}foo{sep}bar')
self.assertEqual(fn('data:blah'), 'data:blah')
self.assertEqual(fn('data://blah'), f'data:{sep}{sep}blah')
self.assertEqual(fn('foo?bar'), 'foo')
self.assertEqual(fn('foo#bar'), 'foo')
self.assertEqual(fn('foo?bar=baz'), 'foo')
self.assertEqual(fn('foo?bar#baz'), 'foo')
self.assertEqual(fn('foo%3Fbar'), 'foo?bar')
self.assertEqual(fn('foo%23bar'), 'foo#bar')
self.assertEqual(fn('foo%3Fbar%3Dbaz'), 'foo?bar=baz')
self.assertEqual(fn('foo%3Fbar%23baz'), 'foo?bar#baz')
def test_url2pathname_require_scheme(self):
sep = os.path.sep
subtests = [
('file:', ''),
('FILE:', ''),
('FiLe:', ''),
('file:/', f'{sep}'),
('file:///', f'{sep}'),
('file:////', f'{sep}{sep}'),
('file:foo', 'foo'),
('file:foo/bar', f'foo{sep}bar'),
('file:/foo/bar', f'{sep}foo{sep}bar'),
('file://localhost/foo/bar', f'{sep}foo{sep}bar'),
('file:///foo/bar', f'{sep}foo{sep}bar'),
('file:////foo/bar', f'{sep}{sep}foo{sep}bar'),
('file:data:blah', 'data:blah'),
('file:data://blah', f'data:{sep}{sep}blah'),
]
for url, expected_path in subtests:
with self.subTest(url=url):
self.assertEqual(
urllib.request.url2pathname(url, require_scheme=True),
expected_path)
def test_url2pathname_require_scheme_errors(self):
subtests = [
'',
':',
'foo',
'http:foo',
'localfile:foo',
'data:foo',
'data:file:foo',
'data:file://foo',
]
for url in subtests:
with self.subTest(url=url):
self.assertRaises(
urllib.error.URLError,
urllib.request.url2pathname,
url, require_scheme=True)
@unittest.skipIf(support.is_emscripten, "Fixed by https://github.com/emscripten-core/emscripten/pull/24593")
def test_url2pathname_resolve_host(self):
fn = urllib.request.url2pathname
sep = os.path.sep
self.assertEqual(fn('//127.0.0.1/foo/bar', resolve_host=True), f'{sep}foo{sep}bar')
self.assertEqual(fn(f'//{socket.gethostname()}/foo/bar'), f'{sep}foo{sep}bar')
self.assertEqual(fn(f'//{socket.gethostname()}/foo/bar', resolve_host=True), f'{sep}foo{sep}bar')
@unittest.skipUnless(sys.platform == 'win32',
'test specific to Windows pathnames.')
def test_url2pathname_win(self):
fn = urllib.request.url2pathname
self.assertEqual(fn('/C:/'), 'C:\\')
self.assertEqual(fn('//C:'), 'C:')
self.assertEqual(fn('//C:/'), 'C:\\')
self.assertEqual(fn('//C:\\'), 'C:\\')
self.assertEqual(fn('//C:80/'), 'C:80\\')
self.assertEqual(fn("///C|"), 'C:')
self.assertEqual(fn("///C:"), 'C:')
self.assertEqual(fn('///C:/'), 'C:\\')
@@ -1586,6 +1603,7 @@ class Pathname_Tests(unittest.TestCase):
self.assertEqual(fn("///C/test/"), '\\C\\test\\')
self.assertEqual(fn("////C/test/"), '\\\\C\\test\\')
# DOS drive paths
self.assertEqual(fn('c:/path/to/file'), 'c:\\path\\to\\file')
self.assertEqual(fn('C:/path/to/file'), 'C:\\path\\to\\file')
self.assertEqual(fn('C:/path/to/file/'), 'C:\\path\\to\\file\\')
self.assertEqual(fn('C:/path/to//file'), 'C:\\path\\to\\\\file')
@@ -1593,12 +1611,15 @@ class Pathname_Tests(unittest.TestCase):
self.assertEqual(fn('/C|/path/to/file'), 'C:\\path\\to\\file')
self.assertEqual(fn('///C|/path/to/file'), 'C:\\path\\to\\file')
self.assertEqual(fn("///C|/foo/bar/spam.foo"), 'C:\\foo\\bar\\spam.foo')
# Non-ASCII drive letter
self.assertRaises(IOError, fn, "///\u00e8|/")
# Colons in URI
self.assertEqual(fn('///\u00e8|/'), '\u00e8:\\')
self.assertEqual(fn('//host/share/spam.txt:eggs'), '\\\\host\\share\\spam.txt:eggs')
self.assertEqual(fn('///c:/spam.txt:eggs'), 'c:\\spam.txt:eggs')
# UNC paths
self.assertEqual(fn('//server/path/to/file'), '\\\\server\\path\\to\\file')
self.assertEqual(fn('////server/path/to/file'), '\\\\server\\path\\to\\file')
self.assertEqual(fn('/////server/path/to/file'), '\\\\server\\path\\to\\file')
self.assertEqual(fn('//127.0.0.1/path/to/file'), '\\\\127.0.0.1\\path\\to\\file')
# Localhost paths
self.assertEqual(fn('//localhost/C:/path/to/file'), 'C:\\path\\to\\file')
self.assertEqual(fn('//localhost/C|/path/to/file'), 'C:\\path\\to\\file')
@@ -1618,11 +1639,12 @@ class Pathname_Tests(unittest.TestCase):
'test specific to POSIX pathnames')
def test_url2pathname_posix(self):
fn = urllib.request.url2pathname
self.assertEqual(fn('/foo/bar'), '/foo/bar')
self.assertEqual(fn('//foo/bar'), '//foo/bar')
self.assertEqual(fn('///foo/bar'), '/foo/bar')
self.assertEqual(fn('////foo/bar'), '//foo/bar')
self.assertEqual(fn('//localhost/foo/bar'), '/foo/bar')
self.assertRaises(urllib.error.URLError, fn, '//foo/bar')
self.assertRaises(urllib.error.URLError, fn, '//localhost:/foo/bar')
self.assertRaises(urllib.error.URLError, fn, '//:80/foo/bar')
self.assertRaises(urllib.error.URLError, fn, '//:/foo/bar')
self.assertRaises(urllib.error.URLError, fn, '//c:80/foo/bar')
self.assertRaises(urllib.error.URLError, fn, '//127.0.0.1/foo/bar')
@unittest.skipUnless(os_helper.FS_NONASCII, 'need os_helper.FS_NONASCII')
def test_url2pathname_nonascii(self):
@@ -1641,56 +1663,6 @@ class Utility_Tests(unittest.TestCase):
self.assertIsInstance(urllib.request.thishost(), tuple)
class URLopener_Tests(FakeHTTPMixin, unittest.TestCase):
"""Testcase to test the open method of URLopener class."""
def test_quoted_open(self):
class DummyURLopener(urllib.request.URLopener):
def open_spam(self, url):
return url
with warnings_helper.check_warnings(
('DummyURLopener style of invoking requests is deprecated.',
DeprecationWarning)):
self.assertEqual(DummyURLopener().open(
'spam://example/ /'),'//example/%20/')
# test the safe characters are not quoted by urlopen
self.assertEqual(DummyURLopener().open(
"spam://c:|windows%/:=&?~#+!$,;'@()*[]|/path/"),
"//c:|windows%/:=&?~#+!$,;'@()*[]|/path/")
@warnings_helper.ignore_warnings(category=DeprecationWarning)
def test_urlopener_retrieve_file(self):
with os_helper.temp_dir() as tmpdir:
fd, tmpfile = tempfile.mkstemp(dir=tmpdir)
os.close(fd)
fileurl = "file:" + urllib.request.pathname2url(tmpfile)
filename, _ = urllib.request.URLopener().retrieve(fileurl)
# Some buildbots have TEMP folder that uses a lowercase drive letter.
self.assertEqual(os.path.normcase(filename), os.path.normcase(tmpfile))
@warnings_helper.ignore_warnings(category=DeprecationWarning)
def test_urlopener_retrieve_remote(self):
url = "http://www.python.org/file.txt"
self.fakehttp(b"HTTP/1.1 200 OK\r\n\r\nHello!")
self.addCleanup(self.unfakehttp)
filename, _ = urllib.request.URLopener().retrieve(url)
self.assertEqual(os.path.splitext(filename)[1], ".txt")
@warnings_helper.ignore_warnings(category=DeprecationWarning)
def test_local_file_open(self):
# bpo-35907, CVE-2019-9948: urllib must reject local_file:// scheme
class DummyURLopener(urllib.request.URLopener):
def open_local_file(self, url):
return url
for url in ('local_file://example', 'local-file://example'):
self.assertRaises(OSError, urllib.request.urlopen, url)
self.assertRaises(OSError, urllib.request.URLopener().open, url)
self.assertRaises(OSError, urllib.request.URLopener().retrieve, url)
self.assertRaises(OSError, DummyURLopener().open, url)
self.assertRaises(OSError, DummyURLopener().retrieve, url)
class RequestTests(unittest.TestCase):
"""Unit tests for urllib.request.Request."""

View File

@@ -3,12 +3,12 @@ from test import support
from test.support import os_helper
from test.support import requires_subprocess
from test.support import warnings_helper
from test.support.testcase import ExtraAssertions
from test import test_urllib
from unittest import mock
import os
import io
import ftplib
import socket
import array
import sys
@@ -23,10 +23,11 @@ from urllib.request import (Request, OpenerDirector, HTTPBasicAuthHandler,
_proxy_bypass_winreg_override,
_proxy_bypass_macosx_sysconf,
AbstractDigestAuthHandler)
from urllib.parse import urlparse
from urllib.parse import urlsplit
import urllib.error
import http.client
support.requires_working_socket(module=True)
# XXX
@@ -43,10 +44,6 @@ class TrivialTests(unittest.TestCase):
context = {}
exec('from urllib.%s import *' % module, context)
del context['__builtins__']
if module == 'request' and os.name == 'nt':
u, p = context.pop('url2pathname'), context.pop('pathname2url')
self.assertEqual(u.__module__, 'nturl2path')
self.assertEqual(p.__module__, 'nturl2path')
for k, v in context.items():
self.assertEqual(v.__module__, 'urllib.%s' % module,
"%r is exposed in 'urllib.%s' but defined in %r" %
@@ -717,15 +714,7 @@ class OpenerDirectorTests(unittest.TestCase):
self.assertIsInstance(args[1], MockResponse)
def sanepathname2url(path):
urlpath = urllib.request.pathname2url(path)
if os.name == "nt" and urlpath.startswith("///"):
urlpath = urlpath[2:]
# XXX don't ask me about the mac...
return urlpath
class HandlerTests(unittest.TestCase, ExtraAssertions):
class HandlerTests(unittest.TestCase):
def test_ftp(self):
class MockFTPWrapper:
@@ -751,7 +740,6 @@ class HandlerTests(unittest.TestCase, ExtraAssertions):
self.ftpwrapper = MockFTPWrapper(self.data)
return self.ftpwrapper
import ftplib
data = "rheum rhaponicum"
h = NullFTPHandler(data)
h.parent = MockOpener()
@@ -792,25 +780,50 @@ class HandlerTests(unittest.TestCase, ExtraAssertions):
self.assertEqual(int(headers["Content-length"]), len(data))
r.close()
@support.requires_resource("network")
def test_ftp_error(self):
class ErrorFTPHandler(urllib.request.FTPHandler):
def __init__(self, exception):
self._exception = exception
def connect_ftp(self, user, passwd, host, port, dirs,
timeout=socket._GLOBAL_DEFAULT_TIMEOUT):
raise self._exception
exception = ftplib.error_perm(
"500 OOPS: cannot change directory:/nonexistent")
h = ErrorFTPHandler(exception)
urlopen = urllib.request.build_opener(h).open
try:
urlopen("ftp://www.pythontest.net/")
except urllib.error.URLError as raised:
self.assertEqual(raised.reason,
f"ftp error: {exception.args[0]}")
else:
self.fail("Did not raise ftplib exception")
def test_file(self):
import email.utils
h = urllib.request.FileHandler()
o = h.parent = MockOpener()
TESTFN = os_helper.TESTFN
urlpath = sanepathname2url(os.path.abspath(TESTFN))
towrite = b"hello, world\n"
canonurl = urllib.request.pathname2url(os.path.abspath(TESTFN), add_scheme=True)
parsed = urlsplit(canonurl)
if parsed.netloc:
raise unittest.SkipTest("non-local working directory")
urls = [
"file://localhost%s" % urlpath,
"file://%s" % urlpath,
"file://%s%s" % (socket.gethostbyname('localhost'), urlpath),
canonurl,
parsed._replace(netloc='localhost').geturl(),
parsed._replace(netloc=socket.gethostbyname('localhost')).geturl(),
]
try:
localaddr = socket.gethostbyname(socket.gethostname())
except socket.gaierror:
localaddr = ''
if localaddr:
urls.append("file://%s%s" % (localaddr, urlpath))
urls.append(parsed._replace(netloc=localaddr).geturl())
for url in urls:
f = open(TESTFN, "wb")
@@ -835,10 +848,10 @@ class HandlerTests(unittest.TestCase, ExtraAssertions):
self.assertEqual(headers["Content-type"], "text/plain")
self.assertEqual(headers["Content-length"], "13")
self.assertEqual(headers["Last-modified"], modified)
self.assertEqual(respurl, url)
self.assertEqual(respurl, canonurl)
for url in [
"file://localhost:80%s" % urlpath,
parsed._replace(netloc='localhost:80').geturl(),
"file:///file_does_not_exist.txt",
"file://not-a-local-host.com//dir/file.txt",
"file://%s:80%s/%s" % (socket.gethostbyname('localhost'),
@@ -1136,13 +1149,13 @@ class HandlerTests(unittest.TestCase, ExtraAssertions):
r = Request('http://example.com')
for url in urls:
r.full_url = url
parsed = urlparse(url)
parsed = urlsplit(url)
self.assertEqual(r.get_full_url(), url)
# full_url setter uses splittag to split into components.
# splittag sets the fragment as None while urlparse sets it to ''
self.assertEqual(r.fragment or '', parsed.fragment)
self.assertEqual(urlparse(r.get_full_url()).query, parsed.query)
self.assertEqual(urlsplit(r.get_full_url()).query, parsed.query)
def test_full_url_deleter(self):
r = Request('http://www.example.com')
@@ -1834,7 +1847,7 @@ class HandlerTests(unittest.TestCase, ExtraAssertions):
self.assertTrue(conn.fakesock.closed, "Connection not closed")
class MiscTests(unittest.TestCase, ExtraAssertions):
class MiscTests(unittest.TestCase):
def opener_has_handler(self, opener, handler_class):
self.assertTrue(any(h.__class__ == handler_class
@@ -1954,10 +1967,38 @@ class MiscTests(unittest.TestCase, ExtraAssertions):
self.assertRaises(ValueError, _parse_proxy, 'file:/ftp.example.com'),
def test_unsupported_algorithm(self):
handler = AbstractDigestAuthHandler()
skip_libssl_fips_mode = unittest.skipIf(
support.is_libssl_fips_mode(),
"conservative skip due to OpenSSL FIPS mode possible algorithm nerfing",
)
class TestDigestAuthAlgorithms(unittest.TestCase):
def setUp(self):
self.handler = AbstractDigestAuthHandler()
@skip_libssl_fips_mode
def test_md5_algorithm(self):
H, KD = self.handler.get_algorithm_impls('MD5')
self.assertEqual(H("foo"), "acbd18db4cc2f85cedef654fccc4a4d8")
self.assertEqual(KD("foo", "bar"), "4e99e8c12de7e01535248d2bac85e732")
@skip_libssl_fips_mode
def test_sha_algorithm(self):
H, KD = self.handler.get_algorithm_impls('SHA')
self.assertEqual(H("foo"), "0beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a33")
self.assertEqual(KD("foo", "bar"), "54dcbe67d21d5eb39493d46d89ae1f412d3bd6de")
@skip_libssl_fips_mode
def test_sha256_algorithm(self):
H, KD = self.handler.get_algorithm_impls('SHA-256')
self.assertEqual(H("foo"), "2c26b46b68ffc68ff99b453c1d30413413422d706483bfa0f98a5e886266e7ae")
self.assertEqual(KD("foo", "bar"), "a765a8beaa9d561d4c5cbed29d8f4e30870297fdfa9cb7d6e9848a95fec9f937")
def test_invalid_algorithm(self):
with self.assertRaises(ValueError) as exc:
handler.get_algorithm_impls('invalid')
self.handler.get_algorithm_impls('invalid')
self.assertEqual(
str(exc.exception),
"Unsupported digest authentication algorithm 'invalid'"

View File

@@ -11,7 +11,6 @@ import hashlib
from test import support
from test.support import hashlib_helper
from test.support import threading_helper
from test.support.testcase import ExtraAssertions
try:
import ssl
@@ -361,7 +360,7 @@ class ProxyAuthTests(unittest.TestCase):
self.server.stop()
self.server = None
@unittest.skipIf(os.name == 'nt', 'TODO: RUSTPYTHON; ValueError: illegal environment variable name')
@unittest.skipIf(os.name == "nt", "TODO: RUSTPYTHON; ValueError: illegal environment variable name")
def test_proxy_with_bad_password_raises_httperror(self):
self.proxy_digest_handler.add_password(self.REALM, self.URL,
self.USER, self.PASSWD+"bad")
@@ -370,14 +369,14 @@ class ProxyAuthTests(unittest.TestCase):
self.opener.open(self.URL)
cm.exception.close()
@unittest.skipIf(os.name == 'nt', 'TODO: RUSTPYTHON; ValueError: illegal environment variable name')
@unittest.skipIf(os.name == "nt", "TODO: RUSTPYTHON; ValueError: illegal environment variable name")
def test_proxy_with_no_password_raises_httperror(self):
self.digest_auth_handler.set_qop("auth")
with self.assertRaises(urllib.error.HTTPError) as cm:
self.opener.open(self.URL)
cm.exception.close()
@unittest.skipIf(os.name == 'nt', 'TODO: RUSTPYTHON; ValueError: illegal environment variable name')
@unittest.skipIf(os.name == "nt", "TODO: RUSTPYTHON; ValueError: illegal environment variable name")
def test_proxy_qop_auth_works(self):
self.proxy_digest_handler.add_password(self.REALM, self.URL,
self.USER, self.PASSWD)
@@ -386,7 +385,7 @@ class ProxyAuthTests(unittest.TestCase):
while result.read():
pass
@unittest.skipIf(os.name == 'nt', 'TODO: RUSTPYTHON; ValueError: illegal environment variable name')
@unittest.skipIf(os.name == "nt", "TODO: RUSTPYTHON; ValueError: illegal environment variable name")
def test_proxy_qop_auth_int_works_or_throws_urlerror(self):
self.proxy_digest_handler.add_password(self.REALM, self.URL,
self.USER, self.PASSWD)
@@ -447,7 +446,7 @@ def GetRequestHandler(responses):
return FakeHTTPRequestHandler
class TestUrlopen(unittest.TestCase, ExtraAssertions):
class TestUrlopen(unittest.TestCase):
"""Tests urllib.request.urlopen using the network.
These tests are not exhaustive. Assuming that testing using files does a
@@ -511,7 +510,7 @@ class TestUrlopen(unittest.TestCase, ExtraAssertions):
handler.port = server.port
return handler
@unittest.skipIf(os.name == 'nt', 'TODO: RUSTPYTHON; ValueError: illegal environment variable name')
@unittest.skipIf(os.name == "nt", "TODO: RUSTPYTHON; ValueError: illegal environment variable name")
def test_redirection(self):
expected_response = b"We got here..."
responses = [
@@ -525,7 +524,7 @@ class TestUrlopen(unittest.TestCase, ExtraAssertions):
self.assertEqual(data, expected_response)
self.assertEqual(handler.requests, ["/", "/somewhere_else"])
@unittest.skipIf(os.name == 'nt', 'TODO: RUSTPYTHON; ValueError: illegal environment variable name')
@unittest.skipIf(os.name == "nt", "TODO: RUSTPYTHON; ValueError: illegal environment variable name")
def test_chunked(self):
expected_response = b"hello world"
chunked_start = (
@@ -540,7 +539,7 @@ class TestUrlopen(unittest.TestCase, ExtraAssertions):
data = self.urlopen("http://localhost:%s/" % handler.port)
self.assertEqual(data, expected_response)
@unittest.skipIf(os.name == 'nt', 'TODO: RUSTPYTHON; ValueError: illegal environment variable name')
@unittest.skipIf(os.name == "nt", "TODO: RUSTPYTHON; ValueError: illegal environment variable name")
def test_404(self):
expected_response = b"Bad bad bad..."
handler = self.start_server([(404, [], expected_response)])
@@ -556,7 +555,7 @@ class TestUrlopen(unittest.TestCase, ExtraAssertions):
self.assertEqual(data, expected_response)
self.assertEqual(handler.requests, ["/weeble"])
@unittest.skipIf(os.name == 'nt', 'TODO: RUSTPYTHON; ValueError: illegal environment variable name')
@unittest.skipIf(os.name == "nt", "TODO: RUSTPYTHON; ValueError: illegal environment variable name")
def test_200(self):
expected_response = b"pycon 2008..."
handler = self.start_server([(200, [], expected_response)])
@@ -564,7 +563,7 @@ class TestUrlopen(unittest.TestCase, ExtraAssertions):
self.assertEqual(data, expected_response)
self.assertEqual(handler.requests, ["/bizarre"])
@unittest.skipIf(os.name == 'nt', 'TODO: RUSTPYTHON; ValueError: illegal environment variable name')
@unittest.skipIf(os.name == "nt", "TODO: RUSTPYTHON; ValueError: illegal environment variable name")
def test_200_with_parameters(self):
expected_response = b"pycon 2008..."
handler = self.start_server([(200, [], expected_response)])
@@ -573,14 +572,14 @@ class TestUrlopen(unittest.TestCase, ExtraAssertions):
self.assertEqual(data, expected_response)
self.assertEqual(handler.requests, ["/bizarre", b"get=with_feeling"])
@unittest.skipIf(os.name == 'nt', 'TODO: RUSTPYTHON; ValueError: illegal environment variable name')
@unittest.skipIf(os.name == "nt", "TODO: RUSTPYTHON; ValueError: illegal environment variable name")
def test_https(self):
handler = self.start_https_server()
context = ssl.create_default_context(cafile=CERT_localhost)
data = self.urlopen("https://localhost:%s/bizarre" % handler.port, context=context)
self.assertEqual(data, b"we care a bit")
@unittest.skipIf(os.name == 'nt', 'TODO: RUSTPYTHON; ValueError: illegal environment variable name')
@unittest.skipIf(os.name == "nt", "TODO: RUSTPYTHON; ValueError: illegal environment variable name")
def test_https_sni(self):
if ssl is None:
self.skipTest("ssl module required")
@@ -597,7 +596,7 @@ class TestUrlopen(unittest.TestCase, ExtraAssertions):
self.urlopen("https://localhost:%s" % handler.port, context=context)
self.assertEqual(sni_name, "localhost")
@unittest.skipIf(os.name == 'nt', 'TODO: RUSTPYTHON; ValueError: illegal environment variable name')
@unittest.skipIf(os.name == "nt", "TODO: RUSTPYTHON; ValueError: illegal environment variable name")
def test_sending_headers(self):
handler = self.start_server()
req = urllib.request.Request("http://localhost:%s/" % handler.port,
@@ -606,7 +605,7 @@ class TestUrlopen(unittest.TestCase, ExtraAssertions):
pass
self.assertEqual(handler.headers_received["Range"], "bytes=20-39")
@unittest.skipIf(os.name == 'nt', 'TODO: RUSTPYTHON; ValueError: illegal environment variable name')
@unittest.skipIf(os.name == "nt", "TODO: RUSTPYTHON; ValueError: illegal environment variable name")
def test_sending_headers_camel(self):
handler = self.start_server()
req = urllib.request.Request("http://localhost:%s/" % handler.port,
@@ -616,7 +615,7 @@ class TestUrlopen(unittest.TestCase, ExtraAssertions):
self.assertIn("X-Some-Header", handler.headers_received.keys())
self.assertNotIn("X-SoMe-hEader", handler.headers_received.keys())
@unittest.skipIf(os.name == 'nt', 'TODO: RUSTPYTHON; ValueError: illegal environment variable name')
@unittest.skipIf(os.name == "nt", "TODO: RUSTPYTHON; ValueError: illegal environment variable name")
def test_basic(self):
handler = self.start_server()
with urllib.request.urlopen("http://localhost:%s" % handler.port) as open_url:
@@ -624,7 +623,7 @@ class TestUrlopen(unittest.TestCase, ExtraAssertions):
self.assertHasAttr(open_url, attr)
self.assertTrue(open_url.read(), "calling 'read' failed")
@unittest.skipIf(os.name == 'nt', 'TODO: RUSTPYTHON; ValueError: illegal environment variable name')
@unittest.skipIf(os.name == "nt", "TODO: RUSTPYTHON; ValueError: illegal environment variable name")
def test_info(self):
handler = self.start_server()
open_url = urllib.request.urlopen(
@@ -636,7 +635,7 @@ class TestUrlopen(unittest.TestCase, ExtraAssertions):
"instance of email.message.Message")
self.assertEqual(info_obj.get_content_subtype(), "plain")
@unittest.skipIf(os.name == 'nt', 'TODO: RUSTPYTHON; ValueError: illegal environment variable name')
@unittest.skipIf(os.name == "nt", "TODO: RUSTPYTHON; ValueError: illegal environment variable name")
def test_geturl(self):
# Make sure same URL as opened is returned by geturl.
handler = self.start_server()
@@ -645,7 +644,7 @@ class TestUrlopen(unittest.TestCase, ExtraAssertions):
url = open_url.geturl()
self.assertEqual(url, "http://localhost:%s" % handler.port)
@unittest.skipIf(os.name == 'nt', 'TODO: RUSTPYTHON; ValueError: illegal environment variable name')
@unittest.skipIf(os.name == "nt", "TODO: RUSTPYTHON; ValueError: illegal environment variable name")
def test_iteration(self):
expected_response = b"pycon 2008..."
handler = self.start_server([(200, [], expected_response)])
@@ -653,7 +652,7 @@ class TestUrlopen(unittest.TestCase, ExtraAssertions):
for line in data:
self.assertEqual(line, expected_response)
@unittest.skipIf(os.name == 'nt', 'TODO: RUSTPYTHON; ValueError: illegal environment variable name')
@unittest.skipIf(os.name == "nt", "TODO: RUSTPYTHON; ValueError: illegal environment variable name")
def test_line_iteration(self):
lines = [b"We\n", b"got\n", b"here\n", b"verylong " * 8192 + b"\n"]
expected_response = b"".join(lines)
@@ -666,7 +665,7 @@ class TestUrlopen(unittest.TestCase, ExtraAssertions):
(index, len(lines[index]), len(line)))
self.assertEqual(index + 1, len(lines))
@unittest.skipIf(os.name == 'nt', 'TODO: RUSTPYTHON; ValueError: illegal environment variable name')
@unittest.skipIf(os.name == "nt", "TODO: RUSTPYTHON; ValueError: illegal environment variable name")
def test_issue16464(self):
# See https://bugs.python.org/issue16464
# and https://bugs.python.org/issue46648
@@ -686,7 +685,6 @@ class TestUrlopen(unittest.TestCase, ExtraAssertions):
self.assertEqual(b"1234567890", request.data)
self.assertEqual("10", request.get_header("Content-length"))
def setUpModule():
thread_info = threading_helper.threading_setup()
unittest.addModuleCleanup(threading_helper.threading_cleanup, *thread_info)

View File

@@ -7,7 +7,6 @@ from test import support
from test.support import os_helper
from test.support import socket_helper
from test.support import ResourceDenied
from test.test_urllib2 import sanepathname2url
from test.support.warnings_helper import check_no_resource_warning
import os
@@ -192,7 +191,7 @@ class OtherNetworkTests(unittest.TestCase):
f.write('hi there\n')
f.close()
urls = [
'file:' + sanepathname2url(os.path.abspath(TESTFN)),
urllib.request.pathname2url(os.path.abspath(TESTFN), add_scheme=True),
('file:///nonsensename/etc/passwd', None,
urllib.error.URLError),
]

View File

@@ -2,10 +2,10 @@ import unittest
from test import support
from test.support import os_helper
from test.support import socket_helper
from test.support.testcase import ExtraAssertions
import contextlib
import socket
import urllib.error
import urllib.parse
import urllib.request
import os
@@ -35,7 +35,7 @@ class URLTimeoutTest(unittest.TestCase):
f.read()
class urlopenNetworkTests(unittest.TestCase, ExtraAssertions):
class urlopenNetworkTests(unittest.TestCase):
"""Tests urllib.request.urlopen using the network.
These tests are not exhaustive. Assuming that testing using files does a
@@ -101,13 +101,11 @@ class urlopenNetworkTests(unittest.TestCase, ExtraAssertions):
# test getcode() with the fancy opener to get 404 error codes
URL = self.url + "XXXinvalidXXX"
with socket_helper.transient_internet(URL):
with self.assertWarns(DeprecationWarning):
open_url = urllib.request.FancyURLopener().open(URL)
try:
code = open_url.getcode()
finally:
open_url.close()
self.assertEqual(code, 404)
with self.assertRaises(urllib.error.URLError) as e:
with urllib.request.urlopen(URL):
pass
self.assertEqual(e.exception.code, 404)
e.exception.close()
@support.requires_resource('walltime')
def test_bad_address(self):

File diff suppressed because it is too large Load Diff

145
Lib/urllib/parse.py vendored
View File

@@ -247,11 +247,11 @@ class _NetlocResultMixinBytes(_NetlocResultMixinBase, _ResultMixinBytes):
return hostname, port
_DefragResultBase = namedtuple('DefragResult', 'url fragment')
_DefragResultBase = namedtuple('_DefragResultBase', 'url fragment')
_SplitResultBase = namedtuple(
'SplitResult', 'scheme netloc path query fragment')
'_SplitResultBase', 'scheme netloc path query fragment')
_ParseResultBase = namedtuple(
'ParseResult', 'scheme netloc path params query fragment')
'_ParseResultBase', 'scheme netloc path params query fragment')
_DefragResultBase.__doc__ = """
DefragResult(url, fragment)
@@ -392,20 +392,23 @@ def urlparse(url, scheme='', allow_fragments=True):
Note that % escapes are not expanded.
"""
url, scheme, _coerce_result = _coerce_args(url, scheme)
splitresult = urlsplit(url, scheme, allow_fragments)
scheme, netloc, url, query, fragment = splitresult
if scheme in uses_params and ';' in url:
url, params = _splitparams(url)
else:
params = ''
result = ParseResult(scheme, netloc, url, params, query, fragment)
scheme, netloc, url, params, query, fragment = _urlparse(url, scheme, allow_fragments)
result = ParseResult(scheme or '', netloc or '', url, params or '', query or '', fragment or '')
return _coerce_result(result)
def _splitparams(url):
def _urlparse(url, scheme=None, allow_fragments=True):
scheme, netloc, url, query, fragment = _urlsplit(url, scheme, allow_fragments)
if (scheme or '') in uses_params and ';' in url:
url, params = _splitparams(url, allow_none=True)
else:
params = None
return (scheme, netloc, url, params, query, fragment)
def _splitparams(url, allow_none=False):
if '/' in url:
i = url.find(';', url.rfind('/'))
if i < 0:
return url, ''
return url, None if allow_none else ''
else:
i = url.find(';')
return url[:i], url[i+1:]
@@ -457,7 +460,7 @@ def _check_bracketed_netloc(netloc):
# https://www.rfc-editor.org/rfc/rfc3986#page-49 and https://url.spec.whatwg.org/
def _check_bracketed_host(hostname):
if hostname.startswith('v'):
if not re.match(r"\Av[a-fA-F0-9]+\..+\Z", hostname):
if not re.match(r"\Av[a-fA-F0-9]+\..+\z", hostname):
raise ValueError(f"IPvFuture address is invalid")
else:
ip = ipaddress.ip_address(hostname) # Throws Value Error if not IPv6 or IPv4
@@ -489,17 +492,23 @@ def urlsplit(url, scheme='', allow_fragments=True):
"""
url, scheme, _coerce_result = _coerce_args(url, scheme)
scheme, netloc, url, query, fragment = _urlsplit(url, scheme, allow_fragments)
v = SplitResult(scheme or '', netloc or '', url, query or '', fragment or '')
return _coerce_result(v)
def _urlsplit(url, scheme=None, allow_fragments=True):
# Only lstrip url as some applications rely on preserving trailing space.
# (https://url.spec.whatwg.org/#concept-basic-url-parser would strip both)
url = url.lstrip(_WHATWG_C0_CONTROL_OR_SPACE)
scheme = scheme.strip(_WHATWG_C0_CONTROL_OR_SPACE)
for b in _UNSAFE_URL_BYTES_TO_REMOVE:
url = url.replace(b, "")
scheme = scheme.replace(b, "")
if scheme is not None:
scheme = scheme.strip(_WHATWG_C0_CONTROL_OR_SPACE)
for b in _UNSAFE_URL_BYTES_TO_REMOVE:
scheme = scheme.replace(b, "")
allow_fragments = bool(allow_fragments)
netloc = query = fragment = ''
netloc = query = fragment = None
i = url.find(':')
if i > 0 and url[0].isascii() and url[0].isalpha():
for c in url[:i]:
@@ -519,8 +528,7 @@ def urlsplit(url, scheme='', allow_fragments=True):
if '?' in url:
url, query = url.split('?', 1)
_checknetloc(netloc)
v = SplitResult(scheme, netloc, url, query, fragment)
return _coerce_result(v)
return (scheme, netloc, url, query, fragment)
def urlunparse(components):
"""Put a parsed URL back together again. This may result in a
@@ -529,9 +537,15 @@ def urlunparse(components):
(the draft states that these are equivalent)."""
scheme, netloc, url, params, query, fragment, _coerce_result = (
_coerce_args(*components))
if not netloc:
if scheme and scheme in uses_netloc and (not url or url[:1] == '/'):
netloc = ''
else:
netloc = None
if params:
url = "%s;%s" % (url, params)
return _coerce_result(urlunsplit((scheme, netloc, url, query, fragment)))
return _coerce_result(_urlunsplit(scheme or None, netloc, url,
query or None, fragment or None))
def urlunsplit(components):
"""Combine the elements of a tuple as returned by urlsplit() into a
@@ -541,20 +555,27 @@ def urlunsplit(components):
empty query; the RFC states that these are equivalent)."""
scheme, netloc, url, query, fragment, _coerce_result = (
_coerce_args(*components))
if netloc:
if not netloc:
if scheme and scheme in uses_netloc and (not url or url[:1] == '/'):
netloc = ''
else:
netloc = None
return _coerce_result(_urlunsplit(scheme or None, netloc, url,
query or None, fragment or None))
def _urlunsplit(scheme, netloc, url, query, fragment):
if netloc is not None:
if url and url[:1] != '/': url = '/' + url
url = '//' + netloc + url
elif url[:2] == '//':
url = '//' + url
elif scheme and scheme in uses_netloc and (not url or url[:1] == '/'):
url = '//' + url
if scheme:
url = scheme + ':' + url
if query:
if query is not None:
url = url + '?' + query
if fragment:
if fragment is not None:
url = url + '#' + fragment
return _coerce_result(url)
return url
def urljoin(base, url, allow_fragments=True):
"""Join a base URL and a possibly relative URL to form an absolute
@@ -565,26 +586,29 @@ def urljoin(base, url, allow_fragments=True):
return base
base, url, _coerce_result = _coerce_args(base, url)
bscheme, bnetloc, bpath, bparams, bquery, bfragment = \
urlparse(base, '', allow_fragments)
scheme, netloc, path, params, query, fragment = \
urlparse(url, bscheme, allow_fragments)
bscheme, bnetloc, bpath, bquery, bfragment = \
_urlsplit(base, None, allow_fragments)
scheme, netloc, path, query, fragment = \
_urlsplit(url, None, allow_fragments)
if scheme != bscheme or scheme not in uses_relative:
if scheme is None:
scheme = bscheme
if scheme != bscheme or (scheme and scheme not in uses_relative):
return _coerce_result(url)
if scheme in uses_netloc:
if not scheme or scheme in uses_netloc:
if netloc:
return _coerce_result(urlunparse((scheme, netloc, path,
params, query, fragment)))
return _coerce_result(_urlunsplit(scheme, netloc, path,
query, fragment))
netloc = bnetloc
if not path and not params:
if not path:
path = bpath
params = bparams
if not query:
if query is None:
query = bquery
return _coerce_result(urlunparse((scheme, netloc, path,
params, query, fragment)))
if fragment is None:
fragment = bfragment
return _coerce_result(_urlunsplit(scheme, netloc, path,
query, fragment))
base_parts = bpath.split('/')
if base_parts[-1] != '':
@@ -621,8 +645,8 @@ def urljoin(base, url, allow_fragments=True):
# then we need to append the trailing '/'
resolved_path.append('')
return _coerce_result(urlunparse((scheme, netloc, '/'.join(
resolved_path) or '/', params, query, fragment)))
return _coerce_result(_urlunsplit(scheme, netloc, '/'.join(
resolved_path) or '/', query, fragment))
def urldefrag(url):
@@ -634,12 +658,12 @@ def urldefrag(url):
"""
url, _coerce_result = _coerce_args(url)
if '#' in url:
s, n, p, a, q, frag = urlparse(url)
defrag = urlunparse((s, n, p, a, q, ''))
s, n, p, q, frag = _urlsplit(url)
defrag = _urlunsplit(s, n, p, q, None)
else:
frag = ''
defrag = url
return _coerce_result(DefragResult(defrag, frag))
return _coerce_result(DefragResult(defrag, frag or ''))
_hexdig = '0123456789ABCDEFabcdef'
_hextobyte = None
@@ -745,7 +769,8 @@ def parse_qs(qs, keep_blank_values=False, strict_parsing=False,
parsed_result = {}
pairs = parse_qsl(qs, keep_blank_values, strict_parsing,
encoding=encoding, errors=errors,
max_num_fields=max_num_fields, separator=separator)
max_num_fields=max_num_fields, separator=separator,
_stacklevel=2)
for name, value in pairs:
if name in parsed_result:
parsed_result[name].append(value)
@@ -755,7 +780,7 @@ def parse_qs(qs, keep_blank_values=False, strict_parsing=False,
def parse_qsl(qs, keep_blank_values=False, strict_parsing=False,
encoding='utf-8', errors='replace', max_num_fields=None, separator='&'):
encoding='utf-8', errors='replace', max_num_fields=None, separator='&', *, _stacklevel=1):
"""Parse a query given as a string argument.
Arguments:
@@ -783,7 +808,6 @@ def parse_qsl(qs, keep_blank_values=False, strict_parsing=False,
Returns a list, as G-d intended.
"""
if not separator or not isinstance(separator, (str, bytes)):
raise ValueError("Separator must be of type string or bytes.")
if isinstance(qs, str):
@@ -792,12 +816,21 @@ def parse_qsl(qs, keep_blank_values=False, strict_parsing=False,
eq = '='
def _unquote(s):
return unquote_plus(s, encoding=encoding, errors=errors)
elif qs is None:
return []
else:
if not qs:
return []
# Use memoryview() to reject integers and iterables,
# acceptable by the bytes constructor.
qs = bytes(memoryview(qs))
try:
# Use memoryview() to reject integers and iterables,
# acceptable by the bytes constructor.
qs = bytes(memoryview(qs))
except TypeError:
if not qs:
warnings.warn(f"Accepting {type(qs).__name__} objects with "
f"false value in urllib.parse.parse_qsl() is "
f"deprecated as of 3.14",
DeprecationWarning, stacklevel=_stacklevel + 1)
return []
raise
if isinstance(separator, str):
separator = bytes(separator, 'ascii')
eq = b'='
@@ -842,14 +875,6 @@ _ALWAYS_SAFE = frozenset(b'ABCDEFGHIJKLMNOPQRSTUVWXYZ'
b'_.-~')
_ALWAYS_SAFE_BYTES = bytes(_ALWAYS_SAFE)
def __getattr__(name):
if name == 'Quoter':
warnings.warn('Deprecated in 3.11. '
'urllib.parse.Quoter will be removed in Python 3.14. '
'It was not intended to be a public API.',
DeprecationWarning, stacklevel=2)
return _Quoter
raise AttributeError(f'module {__name__!r} has no attribute {name!r}')
class _Quoter(dict):
"""A mapping from bytes numbers (in range(0,256)) to strings.

844
Lib/urllib/request.py vendored
View File

@@ -83,6 +83,7 @@ f = urllib.request.urlopen('https://www.python.org/')
import base64
import bisect
import contextlib
import email
import hashlib
import http.client
@@ -94,21 +95,19 @@ import string
import sys
import time
import tempfile
import contextlib
import warnings
from urllib.error import URLError, HTTPError, ContentTooShortError
from urllib.parse import (
urlparse, urlsplit, urljoin, unwrap, quote, unquote,
_splittype, _splithost, _splitport, _splituser, _splitpasswd,
_splitattr, _splitquery, _splitvalue, _splittag, _to_bytes,
_splitattr, _splitvalue, _splittag,
unquote_to_bytes, urlunparse)
from urllib.response import addinfourl, addclosehook
# check for SSL
try:
import ssl
import ssl # noqa: F401
except ImportError:
_have_ssl = False
else:
@@ -128,7 +127,7 @@ __all__ = [
'urlopen', 'install_opener', 'build_opener',
'pathname2url', 'url2pathname', 'getproxies',
# Legacy interface
'urlretrieve', 'urlcleanup', 'URLopener', 'FancyURLopener',
'urlretrieve', 'urlcleanup',
]
# used in User-Agent header sent
@@ -165,8 +164,7 @@ def urlopen(url, data=None, timeout=socket._GLOBAL_DEFAULT_TIMEOUT,
the reason phrase returned by the server --- instead of the response
headers as it is specified in the documentation for HTTPResponse.
For FTP, file, and data URLs and requests explicitly handled by legacy
URLopener and FancyURLopener classes, this function returns a
For FTP, file, and data URLs, this function returns a
urllib.response.addinfourl object.
Note that None may be returned if no handler handles the request (though
@@ -940,6 +938,7 @@ class AbstractBasicAuthHandler:
for mo in AbstractBasicAuthHandler.rx.finditer(header):
scheme, quote, realm = mo.groups()
if quote not in ['"', "'"]:
import warnings
warnings.warn("Basic Auth Realm was unquoted",
UserWarning, 3)
@@ -1049,7 +1048,7 @@ _randombytes = os.urandom
class AbstractDigestAuthHandler:
# Digest authentication is specified in RFC 2617.
# Digest authentication is specified in RFC 2617/7616.
# XXX The client does not inspect the Authentication-Info header
# in a successful response.
@@ -1177,11 +1176,14 @@ class AbstractDigestAuthHandler:
return base
def get_algorithm_impls(self, algorithm):
# algorithm names taken from RFC 7616 Section 6.1
# lambdas assume digest modules are imported at the top level
if algorithm == 'MD5':
H = lambda x: hashlib.md5(x.encode("ascii")).hexdigest()
elif algorithm == 'SHA':
elif algorithm == 'SHA': # non-standard, retained for compatibility.
H = lambda x: hashlib.sha1(x.encode("ascii")).hexdigest()
elif algorithm == 'SHA-256':
H = lambda x: hashlib.sha256(x.encode("ascii")).hexdigest()
# XXX MD5-sess
else:
raise ValueError("Unsupported digest authentication "
@@ -1448,16 +1450,6 @@ def parse_http_list(s):
return [part.strip() for part in res]
class FileHandler(BaseHandler):
# Use local file or FTP depending on form of URL
def file_open(self, req):
url = req.selector
if url[:2] == '//' and url[2:3] != '/' and (req.host and
req.host != 'localhost'):
if not req.host in self.get_names():
raise URLError("file:// scheme is supported only on localhost")
else:
return self.open_local_file(req)
# names for the localhost
names = None
def get_names(self):
@@ -1474,35 +1466,41 @@ class FileHandler(BaseHandler):
def open_local_file(self, req):
import email.utils
import mimetypes
host = req.host
filename = req.selector
localfile = url2pathname(filename)
localfile = url2pathname(req.full_url, require_scheme=True, resolve_host=True)
try:
stats = os.stat(localfile)
size = stats.st_size
modified = email.utils.formatdate(stats.st_mtime, usegmt=True)
mtype = mimetypes.guess_type(filename)[0]
mtype = mimetypes.guess_file_type(localfile)[0]
headers = email.message_from_string(
'Content-type: %s\nContent-length: %d\nLast-modified: %s\n' %
(mtype or 'text/plain', size, modified))
if host:
host, port = _splitport(host)
if not host or \
(not port and _safe_gethostbyname(host) in self.get_names()):
if host:
origurl = 'file://' + host + filename
else:
origurl = 'file://' + filename
return addinfourl(open(localfile, 'rb'), headers, origurl)
origurl = pathname2url(localfile, add_scheme=True)
return addinfourl(open(localfile, 'rb'), headers, origurl)
except OSError as exp:
raise URLError(exp)
raise URLError('file not on local host')
raise URLError(exp, exp.filename)
def _safe_gethostbyname(host):
file_open = open_local_file
def _is_local_authority(authority, resolve):
# Compare hostnames
if not authority or authority == 'localhost':
return True
try:
return socket.gethostbyname(host)
except socket.gaierror:
return None
hostname = socket.gethostname()
except (socket.gaierror, AttributeError):
pass
else:
if authority == hostname:
return True
# Compare IP addresses
if not resolve:
return False
try:
address = socket.gethostbyname(authority)
except (socket.gaierror, AttributeError, UnicodeEncodeError):
return False
return address in FileHandler().get_names()
class FTPHandler(BaseHandler):
def ftp_open(self, req):
@@ -1559,7 +1557,7 @@ class FTPHandler(BaseHandler):
if fw is not None and not fw.keepalive:
fw.close()
if isinstance(exp, ftplib.all_errors):
raise URLError(exp) from exp
raise URLError(f"ftp error: {exp}") from exp
raise
def connect_ftp(self, user, passwd, host, port, dirs, timeout):
@@ -1651,710 +1649,80 @@ class DataHandler(BaseHandler):
return addinfourl(io.BytesIO(data), headers, url)
# Code move from the old urllib module
# Code moved from the old urllib module
MAXFTPCACHE = 10 # Trim the ftp cache beyond this size
def url2pathname(url, *, require_scheme=False, resolve_host=False):
"""Convert the given file URL to a local file system path.
# Helper for non-unix systems
if os.name == 'nt':
from nturl2path import url2pathname, pathname2url
else:
def url2pathname(pathname):
"""OS-specific conversion from a relative URL of the 'file' scheme
to a file system path; not recommended for general use."""
if pathname[:3] == '///':
# URL has an empty authority section, so the path begins on the
# third character.
pathname = pathname[2:]
elif pathname[:12] == '//localhost/':
# Skip past 'localhost' authority.
pathname = pathname[11:]
encoding = sys.getfilesystemencoding()
errors = sys.getfilesystemencodeerrors()
return unquote(pathname, encoding=encoding, errors=errors)
The 'file:' scheme prefix must be omitted unless *require_scheme*
is set to true.
def pathname2url(pathname):
"""OS-specific conversion from a file system path to a relative URL
of the 'file' scheme; not recommended for general use."""
if pathname[:2] == '//':
# Add explicitly empty authority to avoid interpreting the path
# as authority.
pathname = '//' + pathname
encoding = sys.getfilesystemencoding()
errors = sys.getfilesystemencodeerrors()
return quote(pathname, encoding=encoding, errors=errors)
ftpcache = {}
class URLopener:
"""Class to open URLs.
This is a class rather than just a subroutine because we may need
more than one set of global protocol-specific options.
Note -- this is a base class for those who don't want the
automatic handling of errors type 302 (relocated) and 401
(authorization needed)."""
__tempfiles = None
version = "Python-urllib/%s" % __version__
# Constructor
def __init__(self, proxies=None, **x509):
msg = "%(class)s style of invoking requests is deprecated. " \
"Use newer urlopen functions/methods" % {'class': self.__class__.__name__}
warnings.warn(msg, DeprecationWarning, stacklevel=3)
if proxies is None:
proxies = getproxies()
assert hasattr(proxies, 'keys'), "proxies must be a mapping"
self.proxies = proxies
self.key_file = x509.get('key_file')
self.cert_file = x509.get('cert_file')
self.addheaders = [('User-Agent', self.version), ('Accept', '*/*')]
self.__tempfiles = []
self.__unlink = os.unlink # See cleanup()
self.tempcache = None
# Undocumented feature: if you assign {} to tempcache,
# it is used to cache files retrieved with
# self.retrieve(). This is not enabled by default
# since it does not work for changing documents (and I
# haven't got the logic to check expiration headers
# yet).
self.ftpcache = ftpcache
# Undocumented feature: you can use a different
# ftp cache by assigning to the .ftpcache member;
# in case you want logically independent URL openers
# XXX This is not threadsafe. Bah.
def __del__(self):
self.close()
def close(self):
self.cleanup()
def cleanup(self):
# This code sometimes runs when the rest of this module
# has already been deleted, so it can't use any globals
# or import anything.
if self.__tempfiles:
for file in self.__tempfiles:
try:
self.__unlink(file)
except OSError:
pass
del self.__tempfiles[:]
if self.tempcache:
self.tempcache.clear()
def addheader(self, *args):
"""Add a header to be used by the HTTP interface only
e.g. u.addheader('Accept', 'sound/basic')"""
self.addheaders.append(args)
# External interface
def open(self, fullurl, data=None):
"""Use URLopener().open(file) instead of open(file, 'r')."""
fullurl = unwrap(_to_bytes(fullurl))
fullurl = quote(fullurl, safe="%/:=&?~#+!$,;'@()*[]|")
if self.tempcache and fullurl in self.tempcache:
filename, headers = self.tempcache[fullurl]
fp = open(filename, 'rb')
return addinfourl(fp, headers, fullurl)
urltype, url = _splittype(fullurl)
if not urltype:
urltype = 'file'
if urltype in self.proxies:
proxy = self.proxies[urltype]
urltype, proxyhost = _splittype(proxy)
host, selector = _splithost(proxyhost)
url = (host, fullurl) # Signal special case to open_*()
The URL authority may be resolved with gethostbyname() if
*resolve_host* is set to true.
"""
if not require_scheme:
url = 'file:' + url
scheme, authority, url = urlsplit(url)[:3] # Discard query and fragment.
if scheme != 'file':
raise URLError("URL is missing a 'file:' scheme")
if os.name == 'nt':
if authority[1:2] == ':':
# e.g. file://c:/file.txt
url = authority + url
elif not _is_local_authority(authority, resolve_host):
# e.g. file://server/share/file.txt
url = '//' + authority + url
elif url[:3] == '///':
# e.g. file://///server/share/file.txt
url = url[1:]
else:
proxy = None
name = 'open_' + urltype
self.type = urltype
name = name.replace('-', '_')
if not hasattr(self, name) or name == 'open_local_file':
if proxy:
return self.open_unknown_proxy(proxy, fullurl, data)
else:
return self.open_unknown(fullurl, data)
try:
if data is None:
return getattr(self, name)(url)
else:
return getattr(self, name)(url, data)
except (HTTPError, URLError):
raise
except OSError as msg:
raise OSError('socket error', msg) from msg
def open_unknown(self, fullurl, data=None):
"""Overridable interface to open unknown URL type."""
type, url = _splittype(fullurl)
raise OSError('url error', 'unknown url type', type)
def open_unknown_proxy(self, proxy, fullurl, data=None):
"""Overridable interface to open unknown URL type."""
type, url = _splittype(fullurl)
raise OSError('url error', 'invalid proxy for %s' % type, proxy)
# External interface
def retrieve(self, url, filename=None, reporthook=None, data=None):
"""retrieve(url) returns (filename, headers) for a local object
or (tempfilename, headers) for a remote object."""
url = unwrap(_to_bytes(url))
if self.tempcache and url in self.tempcache:
return self.tempcache[url]
type, url1 = _splittype(url)
if filename is None and (not type or type == 'file'):
try:
fp = self.open_local_file(url1)
hdrs = fp.info()
fp.close()
return url2pathname(_splithost(url1)[1]), hdrs
except OSError:
pass
fp = self.open(url, data)
try:
headers = fp.info()
if filename:
tfp = open(filename, 'wb')
else:
garbage, path = _splittype(url)
garbage, path = _splithost(path or "")
path, garbage = _splitquery(path or "")
path, garbage = _splitattr(path or "")
suffix = os.path.splitext(path)[1]
(fd, filename) = tempfile.mkstemp(suffix)
self.__tempfiles.append(filename)
tfp = os.fdopen(fd, 'wb')
try:
result = filename, headers
if self.tempcache is not None:
self.tempcache[url] = result
bs = 1024*8
size = -1
read = 0
blocknum = 0
if "content-length" in headers:
size = int(headers["Content-Length"])
if reporthook:
reporthook(blocknum, bs, size)
while block := fp.read(bs):
read += len(block)
tfp.write(block)
blocknum += 1
if reporthook:
reporthook(blocknum, bs, size)
finally:
tfp.close()
finally:
fp.close()
# raise exception if actual size does not match content-length header
if size >= 0 and read < size:
raise ContentTooShortError(
"retrieval incomplete: got only %i out of %i bytes"
% (read, size), result)
return result
# Each method named open_<type> knows how to open that type of URL
def _open_generic_http(self, connection_factory, url, data):
"""Make an HTTP connection using connection_class.
This is an internal method that should be called from
open_http() or open_https().
Arguments:
- connection_factory should take a host name and return an
HTTPConnection instance.
- url is the url to retrieval or a host, relative-path pair.
- data is payload for a POST request or None.
"""
user_passwd = None
proxy_passwd= None
if isinstance(url, str):
host, selector = _splithost(url)
if host:
user_passwd, host = _splituser(host)
host = unquote(host)
realhost = host
else:
host, selector = url
# check whether the proxy contains authorization information
proxy_passwd, host = _splituser(host)
# now we proceed with the url we want to obtain
urltype, rest = _splittype(selector)
url = rest
user_passwd = None
if urltype.lower() != 'http':
realhost = None
else:
realhost, rest = _splithost(rest)
if realhost:
user_passwd, realhost = _splituser(realhost)
if user_passwd:
selector = "%s://%s%s" % (urltype, realhost, rest)
if proxy_bypass(realhost):
host = realhost
if not host: raise OSError('http error', 'no host given')
if proxy_passwd:
proxy_passwd = unquote(proxy_passwd)
proxy_auth = base64.b64encode(proxy_passwd.encode()).decode('ascii')
else:
proxy_auth = None
if user_passwd:
user_passwd = unquote(user_passwd)
auth = base64.b64encode(user_passwd.encode()).decode('ascii')
else:
auth = None
http_conn = connection_factory(host)
headers = {}
if proxy_auth:
headers["Proxy-Authorization"] = "Basic %s" % proxy_auth
if auth:
headers["Authorization"] = "Basic %s" % auth
if realhost:
headers["Host"] = realhost
# Add Connection:close as we don't support persistent connections yet.
# This helps in closing the socket and avoiding ResourceWarning
headers["Connection"] = "close"
for header, value in self.addheaders:
headers[header] = value
if data is not None:
headers["Content-Type"] = "application/x-www-form-urlencoded"
http_conn.request("POST", selector, data, headers)
else:
http_conn.request("GET", selector, headers=headers)
try:
response = http_conn.getresponse()
except http.client.BadStatusLine:
# something went wrong with the HTTP status line
raise URLError("http protocol error: bad status line")
# According to RFC 2616, "2xx" code indicates that the client's
# request was successfully received, understood, and accepted.
if 200 <= response.status < 300:
return addinfourl(response, response.msg, "http:" + url,
response.status)
else:
return self.http_error(
url, response.fp,
response.status, response.reason, response.msg, data)
def open_http(self, url, data=None):
"""Use HTTP protocol."""
return self._open_generic_http(http.client.HTTPConnection, url, data)
def http_error(self, url, fp, errcode, errmsg, headers, data=None):
"""Handle http errors.
Derived class can override this, or provide specific handlers
named http_error_DDD where DDD is the 3-digit error code."""
# First check if there's a specific handler for this error
name = 'http_error_%d' % errcode
if hasattr(self, name):
method = getattr(self, name)
if data is None:
result = method(url, fp, errcode, errmsg, headers)
else:
result = method(url, fp, errcode, errmsg, headers, data)
if result: return result
return self.http_error_default(url, fp, errcode, errmsg, headers)
def http_error_default(self, url, fp, errcode, errmsg, headers):
"""Default error handler: close the connection and raise OSError."""
fp.close()
raise HTTPError(url, errcode, errmsg, headers, None)
if _have_ssl:
def _https_connection(self, host):
if self.key_file or self.cert_file:
http_version = http.client.HTTPSConnection._http_vsn
context = http.client._create_https_context(http_version)
context.load_cert_chain(self.cert_file, self.key_file)
# cert and key file means the user wants to authenticate.
# enable TLS 1.3 PHA implicitly even for custom contexts.
if context.post_handshake_auth is not None:
context.post_handshake_auth = True
else:
context = None
return http.client.HTTPSConnection(host, context=context)
def open_https(self, url, data=None):
"""Use HTTPS protocol."""
return self._open_generic_http(self._https_connection, url, data)
def open_file(self, url):
"""Use local file or FTP depending on form of URL."""
if not isinstance(url, str):
raise URLError('file error: proxy support for file protocol currently not implemented')
if url[:2] == '//' and url[2:3] != '/' and url[2:12].lower() != 'localhost/':
raise ValueError("file:// scheme is supported only on localhost")
else:
return self.open_local_file(url)
def open_local_file(self, url):
"""Use local file."""
import email.utils
import mimetypes
host, file = _splithost(url)
localname = url2pathname(file)
try:
stats = os.stat(localname)
except OSError as e:
raise URLError(e.strerror, e.filename)
size = stats.st_size
modified = email.utils.formatdate(stats.st_mtime, usegmt=True)
mtype = mimetypes.guess_type(url)[0]
headers = email.message_from_string(
'Content-Type: %s\nContent-Length: %d\nLast-modified: %s\n' %
(mtype or 'text/plain', size, modified))
if not host:
urlfile = file
if file[:1] == '/':
urlfile = 'file://' + file
return addinfourl(open(localname, 'rb'), headers, urlfile)
host, port = _splitport(host)
if (not port
and socket.gethostbyname(host) in ((localhost(),) + thishost())):
urlfile = file
if file[:1] == '/':
urlfile = 'file://' + file
elif file[:2] == './':
raise ValueError("local file url may start with / or file:. Unknown url of type: %s" % url)
return addinfourl(open(localname, 'rb'), headers, urlfile)
raise URLError('local file error: not on local host')
def open_ftp(self, url):
"""Use FTP protocol."""
if not isinstance(url, str):
raise URLError('ftp error: proxy support for ftp protocol currently not implemented')
import mimetypes
host, path = _splithost(url)
if not host: raise URLError('ftp error: no host given')
host, port = _splitport(host)
user, host = _splituser(host)
if user: user, passwd = _splitpasswd(user)
else: passwd = None
host = unquote(host)
user = unquote(user or '')
passwd = unquote(passwd or '')
host = socket.gethostbyname(host)
if not port:
import ftplib
port = ftplib.FTP_PORT
else:
port = int(port)
path, attrs = _splitattr(path)
path = unquote(path)
dirs = path.split('/')
dirs, file = dirs[:-1], dirs[-1]
if dirs and not dirs[0]: dirs = dirs[1:]
if dirs and not dirs[0]: dirs[0] = '/'
key = user, host, port, '/'.join(dirs)
# XXX thread unsafe!
if len(self.ftpcache) > MAXFTPCACHE:
# Prune the cache, rather arbitrarily
for k in list(self.ftpcache):
if k != key:
v = self.ftpcache[k]
del self.ftpcache[k]
v.close()
try:
if key not in self.ftpcache:
self.ftpcache[key] = \
ftpwrapper(user, passwd, host, port, dirs)
if not file: type = 'D'
else: type = 'I'
for attr in attrs:
attr, value = _splitvalue(attr)
if attr.lower() == 'type' and \
value in ('a', 'A', 'i', 'I', 'd', 'D'):
type = value.upper()
(fp, retrlen) = self.ftpcache[key].retrfile(file, type)
mtype = mimetypes.guess_type("ftp:" + url)[0]
headers = ""
if mtype:
headers += "Content-Type: %s\n" % mtype
if retrlen is not None and retrlen >= 0:
headers += "Content-Length: %d\n" % retrlen
headers = email.message_from_string(headers)
return addinfourl(fp, headers, "ftp:" + url)
except ftperrors() as exp:
raise URLError(f'ftp error: {exp}') from exp
def open_data(self, url, data=None):
"""Use "data" URL."""
if not isinstance(url, str):
raise URLError('data error: proxy support for data protocol currently not implemented')
# ignore POSTed data
#
# syntax of data URLs:
# dataurl := "data:" [ mediatype ] [ ";base64" ] "," data
# mediatype := [ type "/" subtype ] *( ";" parameter )
# data := *urlchar
# parameter := attribute "=" value
try:
[type, data] = url.split(',', 1)
except ValueError:
raise OSError('data error', 'bad data URL')
if not type:
type = 'text/plain;charset=US-ASCII'
semi = type.rfind(';')
if semi >= 0 and '=' not in type[semi:]:
encoding = type[semi+1:]
type = type[:semi]
else:
encoding = ''
msg = []
msg.append('Date: %s'%time.strftime('%a, %d %b %Y %H:%M:%S GMT',
time.gmtime(time.time())))
msg.append('Content-type: %s' % type)
if encoding == 'base64':
# XXX is this encoding/decoding ok?
data = base64.decodebytes(data.encode('ascii')).decode('latin-1')
else:
data = unquote(data)
msg.append('Content-Length: %d' % len(data))
msg.append('')
msg.append(data)
msg = '\n'.join(msg)
headers = email.message_from_string(msg)
f = io.StringIO(msg)
#f.fileno = None # needed for addinfourl
return addinfourl(f, headers, url)
if url[:1] == '/' and url[2:3] in (':', '|'):
# Skip past extra slash before DOS drive in URL path.
url = url[1:]
if url[1:2] == '|':
# Older URLs use a pipe after a drive letter
url = url[:1] + ':' + url[2:]
url = url.replace('/', '\\')
elif not _is_local_authority(authority, resolve_host):
raise URLError("file:// scheme is supported only on localhost")
encoding = sys.getfilesystemencoding()
errors = sys.getfilesystemencodeerrors()
return unquote(url, encoding=encoding, errors=errors)
class FancyURLopener(URLopener):
"""Derived class with handlers for errors we can handle (perhaps)."""
def pathname2url(pathname, *, add_scheme=False):
"""Convert the given local file system path to a file URL.
def __init__(self, *args, **kwargs):
URLopener.__init__(self, *args, **kwargs)
self.auth_cache = {}
self.tries = 0
self.maxtries = 10
def http_error_default(self, url, fp, errcode, errmsg, headers):
"""Default error handling -- don't raise an exception."""
return addinfourl(fp, headers, "http:" + url, errcode)
def http_error_302(self, url, fp, errcode, errmsg, headers, data=None):
"""Error 302 -- relocated (temporarily)."""
self.tries += 1
try:
if self.maxtries and self.tries >= self.maxtries:
if hasattr(self, "http_error_500"):
meth = self.http_error_500
else:
meth = self.http_error_default
return meth(url, fp, 500,
"Internal Server Error: Redirect Recursion",
headers)
result = self.redirect_internal(url, fp, errcode, errmsg,
headers, data)
return result
finally:
self.tries = 0
def redirect_internal(self, url, fp, errcode, errmsg, headers, data):
if 'location' in headers:
newurl = headers['location']
elif 'uri' in headers:
newurl = headers['uri']
else:
return
fp.close()
# In case the server sent a relative URL, join with original:
newurl = urljoin(self.type + ":" + url, newurl)
urlparts = urlparse(newurl)
# For security reasons, we don't allow redirection to anything other
# than http, https and ftp.
# We are using newer HTTPError with older redirect_internal method
# This older method will get deprecated in 3.3
if urlparts.scheme not in ('http', 'https', 'ftp', ''):
raise HTTPError(newurl, errcode,
errmsg +
" Redirection to url '%s' is not allowed." % newurl,
headers, fp)
return self.open(newurl)
def http_error_301(self, url, fp, errcode, errmsg, headers, data=None):
"""Error 301 -- also relocated (permanently)."""
return self.http_error_302(url, fp, errcode, errmsg, headers, data)
def http_error_303(self, url, fp, errcode, errmsg, headers, data=None):
"""Error 303 -- also relocated (essentially identical to 302)."""
return self.http_error_302(url, fp, errcode, errmsg, headers, data)
def http_error_307(self, url, fp, errcode, errmsg, headers, data=None):
"""Error 307 -- relocated, but turn POST into error."""
if data is None:
return self.http_error_302(url, fp, errcode, errmsg, headers, data)
else:
return self.http_error_default(url, fp, errcode, errmsg, headers)
def http_error_308(self, url, fp, errcode, errmsg, headers, data=None):
"""Error 308 -- relocated, but turn POST into error."""
if data is None:
return self.http_error_301(url, fp, errcode, errmsg, headers, data)
else:
return self.http_error_default(url, fp, errcode, errmsg, headers)
def http_error_401(self, url, fp, errcode, errmsg, headers, data=None,
retry=False):
"""Error 401 -- authentication required.
This function supports Basic authentication only."""
if 'www-authenticate' not in headers:
URLopener.http_error_default(self, url, fp,
errcode, errmsg, headers)
stuff = headers['www-authenticate']
match = re.match('[ \t]*([^ \t]+)[ \t]+realm="([^"]*)"', stuff)
if not match:
URLopener.http_error_default(self, url, fp,
errcode, errmsg, headers)
scheme, realm = match.groups()
if scheme.lower() != 'basic':
URLopener.http_error_default(self, url, fp,
errcode, errmsg, headers)
if not retry:
URLopener.http_error_default(self, url, fp, errcode, errmsg,
headers)
name = 'retry_' + self.type + '_basic_auth'
if data is None:
return getattr(self,name)(url, realm)
else:
return getattr(self,name)(url, realm, data)
def http_error_407(self, url, fp, errcode, errmsg, headers, data=None,
retry=False):
"""Error 407 -- proxy authentication required.
This function supports Basic authentication only."""
if 'proxy-authenticate' not in headers:
URLopener.http_error_default(self, url, fp,
errcode, errmsg, headers)
stuff = headers['proxy-authenticate']
match = re.match('[ \t]*([^ \t]+)[ \t]+realm="([^"]*)"', stuff)
if not match:
URLopener.http_error_default(self, url, fp,
errcode, errmsg, headers)
scheme, realm = match.groups()
if scheme.lower() != 'basic':
URLopener.http_error_default(self, url, fp,
errcode, errmsg, headers)
if not retry:
URLopener.http_error_default(self, url, fp, errcode, errmsg,
headers)
name = 'retry_proxy_' + self.type + '_basic_auth'
if data is None:
return getattr(self,name)(url, realm)
else:
return getattr(self,name)(url, realm, data)
def retry_proxy_http_basic_auth(self, url, realm, data=None):
host, selector = _splithost(url)
newurl = 'http://' + host + selector
proxy = self.proxies['http']
urltype, proxyhost = _splittype(proxy)
proxyhost, proxyselector = _splithost(proxyhost)
i = proxyhost.find('@') + 1
proxyhost = proxyhost[i:]
user, passwd = self.get_user_passwd(proxyhost, realm, i)
if not (user or passwd): return None
proxyhost = "%s:%s@%s" % (quote(user, safe=''),
quote(passwd, safe=''), proxyhost)
self.proxies['http'] = 'http://' + proxyhost + proxyselector
if data is None:
return self.open(newurl)
else:
return self.open(newurl, data)
def retry_proxy_https_basic_auth(self, url, realm, data=None):
host, selector = _splithost(url)
newurl = 'https://' + host + selector
proxy = self.proxies['https']
urltype, proxyhost = _splittype(proxy)
proxyhost, proxyselector = _splithost(proxyhost)
i = proxyhost.find('@') + 1
proxyhost = proxyhost[i:]
user, passwd = self.get_user_passwd(proxyhost, realm, i)
if not (user or passwd): return None
proxyhost = "%s:%s@%s" % (quote(user, safe=''),
quote(passwd, safe=''), proxyhost)
self.proxies['https'] = 'https://' + proxyhost + proxyselector
if data is None:
return self.open(newurl)
else:
return self.open(newurl, data)
def retry_http_basic_auth(self, url, realm, data=None):
host, selector = _splithost(url)
i = host.find('@') + 1
host = host[i:]
user, passwd = self.get_user_passwd(host, realm, i)
if not (user or passwd): return None
host = "%s:%s@%s" % (quote(user, safe=''),
quote(passwd, safe=''), host)
newurl = 'http://' + host + selector
if data is None:
return self.open(newurl)
else:
return self.open(newurl, data)
def retry_https_basic_auth(self, url, realm, data=None):
host, selector = _splithost(url)
i = host.find('@') + 1
host = host[i:]
user, passwd = self.get_user_passwd(host, realm, i)
if not (user or passwd): return None
host = "%s:%s@%s" % (quote(user, safe=''),
quote(passwd, safe=''), host)
newurl = 'https://' + host + selector
if data is None:
return self.open(newurl)
else:
return self.open(newurl, data)
def get_user_passwd(self, host, realm, clear_cache=0):
key = realm + '@' + host.lower()
if key in self.auth_cache:
if clear_cache:
del self.auth_cache[key]
else:
return self.auth_cache[key]
user, passwd = self.prompt_user_passwd(host, realm)
if user or passwd: self.auth_cache[key] = (user, passwd)
return user, passwd
def prompt_user_passwd(self, host, realm):
"""Override this in a GUI environment!"""
import getpass
try:
user = input("Enter username for %s at %s: " % (realm, host))
passwd = getpass.getpass("Enter password for %s in %s at %s: " %
(user, realm, host))
return user, passwd
except KeyboardInterrupt:
print()
return None, None
The 'file:' scheme prefix is omitted unless *add_scheme*
is set to true.
"""
if os.name == 'nt':
pathname = pathname.replace('\\', '/')
encoding = sys.getfilesystemencoding()
errors = sys.getfilesystemencodeerrors()
scheme = 'file:' if add_scheme else ''
drive, root, tail = os.path.splitroot(pathname)
if drive:
# First, clean up some special forms. We are going to sacrifice the
# additional information anyway
if drive[:4] == '//?/':
drive = drive[4:]
if drive[:4].upper() == 'UNC/':
drive = '//' + drive[4:]
if drive[1:] == ':':
# DOS drive specified. Add three slashes to the start, producing
# an authority section with a zero-length authority, and a path
# section starting with a single slash.
drive = '///' + drive
drive = quote(drive, encoding=encoding, errors=errors, safe='/:')
elif root:
# Add explicitly empty authority to absolute path. If the path
# starts with exactly one slash then this change is mostly
# cosmetic, but if it begins with two or more slashes then this
# avoids interpreting the path as a URL authority.
root = '//' + root
tail = quote(tail, encoding=encoding, errors=errors)
return scheme + drive + root + tail
# Utility functions
@@ -2502,9 +1870,7 @@ def getproxies_environment():
"""Return a dictionary of scheme -> proxy server URL mappings.
Scan the environment for variables named <scheme>_proxy;
this seems to be the standard convention. If you need a
different way, you can pass a proxies dictionary to the
[Fancy]URLopener constructor.
this seems to be the standard convention.
"""
# in order to prefer lowercase variables, process environment in
# two passes: first matches any, second pass matches lowercase only

View File

@@ -181,8 +181,10 @@ class RobotFileParser:
return False
# search for given user agent matches
# the first match counts
parsed_url = urllib.parse.urlsplit(url)
url = urllib.parse.urlunsplit(('', '', *parsed_url[2:]))
# TODO: The private API is used in order to preserve an empty query.
# This is temporary until the public API starts supporting this feature.
parsed_url = urllib.parse._urlsplit(url, '')
url = urllib.parse._urlunsplit(None, None, *parsed_url[2:])
url = normalize_path(url)
if not url:
url = "/"