diff --git a/Lib/test/test_xmlrpc.py b/Lib/test/test_xmlrpc.py index 110f52bcb1..6b60c25e31 100644 --- a/Lib/test/test_xmlrpc.py +++ b/Lib/test/test_xmlrpc.py @@ -18,6 +18,7 @@ from test import support from test.support import os_helper from test.support import socket_helper from test.support import threading_helper +from test.support import ALWAYS_EQ, LARGEST, SMALLEST try: import gzip @@ -559,14 +560,10 @@ class DateTimeTestCase(unittest.TestCase): # some other types dbytes = dstr.encode('ascii') dtuple = now.timetuple() - with self.assertRaises(TypeError): - dtime == 1970 - with self.assertRaises(TypeError): - dtime != dbytes - with self.assertRaises(TypeError): - dtime == bytearray(dbytes) - with self.assertRaises(TypeError): - dtime != dtuple + self.assertFalse(dtime == 1970) + self.assertTrue(dtime != dbytes) + self.assertFalse(dtime == bytearray(dbytes)) + self.assertTrue(dtime != dtuple) with self.assertRaises(TypeError): dtime < float(1970) with self.assertRaises(TypeError): @@ -576,9 +573,21 @@ class DateTimeTestCase(unittest.TestCase): with self.assertRaises(TypeError): dtime >= dtuple + self.assertTrue(dtime == ALWAYS_EQ) + self.assertFalse(dtime != ALWAYS_EQ) + self.assertTrue(dtime < LARGEST) + self.assertFalse(dtime > LARGEST) + self.assertTrue(dtime <= LARGEST) + self.assertFalse(dtime >= LARGEST) + self.assertFalse(dtime < SMALLEST) + self.assertTrue(dtime > SMALLEST) + self.assertFalse(dtime <= SMALLEST) + self.assertTrue(dtime >= SMALLEST) + + class BinaryTestCase(unittest.TestCase): - # XXX What should str(Binary(b"\xff")) return? I'm chosing "\xff" + # XXX What should str(Binary(b"\xff")) return? I'm choosing "\xff" # for now (i.e. interpreting the binary data as Latin-1-encoded # text). But this feels very unsatisfactory. Perhaps we should # only define repr(), and return r"Binary(b'\xff')" instead? @@ -665,7 +674,7 @@ def http_server(evt, numrequests, requestHandler=None, encoding=None): serv.handle_request() numrequests -= 1 - except socket.timeout: + except TimeoutError: pass finally: serv.socket.close() @@ -715,11 +724,16 @@ def http_multi_server(evt, numrequests, requestHandler=None): #on AF_INET only. URL = "http://%s:%d"%(ADDR, PORT) serv.server_activate() - paths = ["/foo", "/foo/bar"] + paths = [ + "/foo", "/foo/bar", + "/foo?k=v", "/foo#frag", "/foo?k=v#frag", + "", "/", "/RPC2", "?k=v", "#frag", + ] for path in paths: d = serv.add_dispatcher(path, xmlrpc.server.SimpleXMLRPCDispatcher()) d.register_introspection_functions() d.register_multicall_functions() + d.register_function(lambda p=path: p, 'test') serv.get_dispatcher(paths[0]).register_function(pow) serv.get_dispatcher(paths[1]).register_function(lambda x,y: x+y, 'add') serv.add_dispatcher("/is/broken", BrokenDispatcher()) @@ -730,7 +744,7 @@ def http_multi_server(evt, numrequests, requestHandler=None): serv.handle_request() numrequests -= 1 - except socket.timeout: + except TimeoutError: pass finally: serv.socket.close() @@ -1073,6 +1087,39 @@ class MultiPathServerTestCase(BaseServerTestCase): p = xmlrpclib.ServerProxy(URL+"/is/broken") self.assertRaises(xmlrpclib.Fault, p.add, 6, 8) + def test_invalid_path(self): + p = xmlrpclib.ServerProxy(URL+"/invalid") + self.assertRaises(xmlrpclib.Fault, p.add, 6, 8) + + def test_path_query_fragment(self): + p = xmlrpclib.ServerProxy(URL+"/foo?k=v#frag") + self.assertEqual(p.test(), "/foo?k=v#frag") + + def test_path_fragment(self): + p = xmlrpclib.ServerProxy(URL+"/foo#frag") + self.assertEqual(p.test(), "/foo#frag") + + def test_path_query(self): + p = xmlrpclib.ServerProxy(URL+"/foo?k=v") + self.assertEqual(p.test(), "/foo?k=v") + + def test_empty_path(self): + p = xmlrpclib.ServerProxy(URL) + self.assertEqual(p.test(), "/RPC2") + + def test_root_path(self): + p = xmlrpclib.ServerProxy(URL + "/") + self.assertEqual(p.test(), "/") + + def test_empty_path_query(self): + p = xmlrpclib.ServerProxy(URL + "?k=v") + self.assertEqual(p.test(), "?k=v") + + def test_empty_path_fragment(self): + p = xmlrpclib.ServerProxy(URL + "#frag") + self.assertEqual(p.test(), "#frag") + + #A test case that verifies that a server using the HTTP/1.1 keep-alive mechanism #does indeed serve subsequent requests on the same connection class BaseKeepaliveServerTestCase(BaseServerTestCase): @@ -1544,16 +1591,10 @@ class UseBuiltinTypesTestCase(unittest.TestCase): self.assertTrue(server.use_builtin_types) -@threading_helper.reap_threads -def test_main(): - support.run_unittest(XMLRPCTestCase, HelperTestCase, DateTimeTestCase, - BinaryTestCase, FaultTestCase, UseBuiltinTypesTestCase, - SimpleServerTestCase, SimpleServerEncodingTestCase, - KeepaliveServerTestCase1, KeepaliveServerTestCase2, - GzipServerTestCase, GzipUtilTestCase, HeadersServerTestCase, - MultiPathServerTestCase, ServerProxyTestCase, FailingServerTestCase, - CGIHandlerTestCase, SimpleXMLRPCDispatcherTestCase) +def setUpModule(): + thread_info = threading_helper.threading_setup() + unittest.addModuleCleanup(threading_helper.threading_cleanup, *thread_info) if __name__ == "__main__": - test_main() + unittest.main() diff --git a/Lib/xmlrpc/client.py b/Lib/xmlrpc/client.py index f01b5ae787..121e44023c 100644 --- a/Lib/xmlrpc/client.py +++ b/Lib/xmlrpc/client.py @@ -265,16 +265,22 @@ boolean = Boolean = bool # Issue #13305: different format codes across platforms _day0 = datetime(1, 1, 1) -if _day0.strftime('%Y') == '0001': # Mac OS X +def _try(fmt): + try: + return _day0.strftime(fmt) == '0001' + except ValueError: + return False +if _try('%Y'): # Mac OS X def _iso8601_format(value): return value.strftime("%Y%m%dT%H:%M:%S") -elif _day0.strftime('%4Y') == '0001': # Linux +elif _try('%4Y'): # Linux def _iso8601_format(value): return value.strftime("%4Y%m%dT%H:%M:%S") else: def _iso8601_format(value): return value.strftime("%Y%m%dT%H:%M:%S").zfill(17) del _day0 +del _try def _strftime(value): @@ -314,31 +320,38 @@ class DateTime: s = self.timetuple() o = other.timetuple() else: - otype = (hasattr(other, "__class__") - and other.__class__.__name__ - or type(other)) - raise TypeError("Can't compare %s and %s" % - (self.__class__.__name__, otype)) + s = self + o = NotImplemented return s, o def __lt__(self, other): s, o = self.make_comparable(other) + if o is NotImplemented: + return NotImplemented return s < o def __le__(self, other): s, o = self.make_comparable(other) + if o is NotImplemented: + return NotImplemented return s <= o def __gt__(self, other): s, o = self.make_comparable(other) + if o is NotImplemented: + return NotImplemented return s > o def __ge__(self, other): s, o = self.make_comparable(other) + if o is NotImplemented: + return NotImplemented return s >= o def __eq__(self, other): s, o = self.make_comparable(other) + if o is NotImplemented: + return NotImplemented return s == o def timetuple(self): @@ -436,7 +449,7 @@ class ExpatParser: target.xml(encoding, None) def feed(self, data): - self._parser.Parse(data, 0) + self._parser.Parse(data, False) def close(self): try: @@ -1415,15 +1428,16 @@ class ServerProxy: # establish a "logical" server connection # get the url - type, uri = urllib.parse._splittype(uri) - if type not in ("http", "https"): + p = urllib.parse.urlsplit(uri) + if p.scheme not in ("http", "https"): raise OSError("unsupported XML-RPC protocol") - self.__host, self.__handler = urllib.parse._splithost(uri) + self.__host = p.netloc + self.__handler = urllib.parse.urlunsplit(["", "", *p[2:]]) if not self.__handler: self.__handler = "/RPC2" if transport is None: - if type == "https": + if p.scheme == "https": handler = SafeTransport extra_kwargs = {"context": context} else: diff --git a/Lib/xmlrpc/server.py b/Lib/xmlrpc/server.py index 32aba4df4c..69a260f5b1 100644 --- a/Lib/xmlrpc/server.py +++ b/Lib/xmlrpc/server.py @@ -732,7 +732,7 @@ class ServerHTMLDoc(pydoc.HTMLDoc): # hyperlinking of arbitrary strings being used as method # names. Only methods with names consisting of word characters # and '.'s are hyperlinked. - pattern = re.compile(r'\b((http|ftp)://\S+[\w/]|' + pattern = re.compile(r'\b((http|https|ftp)://\S+[\w/]|' r'RFC[- ]?(\d+)|' r'PEP[- ]?(\d+)|' r'(self\.)?((?:\w|\.)+))\b') @@ -750,7 +750,7 @@ class ServerHTMLDoc(pydoc.HTMLDoc): url = 'http://www.rfc-editor.org/rfc/rfc%d.txt' % int(rfc) results.append('%s' % (url, escape(all))) elif pep: - url = 'http://www.python.org/dev/peps/pep-%04d/' % int(pep) + url = 'https://www.python.org/dev/peps/pep-%04d/' % int(pep) results.append('%s' % (url, escape(all))) elif text[end:end+1] == '(': results.append(self.namelink(name, methods, funcs, classes))