diff --git a/Lib/telnetlib.py b/Lib/telnetlib.py new file mode 100644 index 0000000000..8ce053e881 --- /dev/null +++ b/Lib/telnetlib.py @@ -0,0 +1,677 @@ +r"""TELNET client class. + +Based on RFC 854: TELNET Protocol Specification, by J. Postel and +J. Reynolds + +Example: + +>>> from telnetlib import Telnet +>>> tn = Telnet('www.python.org', 79) # connect to finger port +>>> tn.write(b'guido\r\n') +>>> print(tn.read_all()) +Login Name TTY Idle When Where +guido Guido van Rossum pts/2 snag.cnri.reston.. + +>>> + +Note that read_all() won't read until eof -- it just reads some data +-- but it guarantees to read at least one byte unless EOF is hit. + +It is possible to pass a Telnet object to a selector in order to wait until +more data is available. Note that in this case, read_eager() may return b'' +even if there was data on the socket, because the protocol negotiation may have +eaten the data. This is why EOFError is needed in some cases to distinguish +between "no data" and "connection closed" (since the socket also appears ready +for reading when it is closed). + +To do: +- option negotiation +- timeout should be intrinsic to the connection object instead of an + option on one of the read calls only + +""" + + +# Imported modules +import sys +import socket +import selectors +from time import monotonic as _time + +__all__ = ["Telnet"] + +# Tunable parameters +DEBUGLEVEL = 0 + +# Telnet protocol defaults +TELNET_PORT = 23 + +# Telnet protocol characters (don't change) +IAC = bytes([255]) # "Interpret As Command" +DONT = bytes([254]) +DO = bytes([253]) +WONT = bytes([252]) +WILL = bytes([251]) +theNULL = bytes([0]) + +SE = bytes([240]) # Subnegotiation End +NOP = bytes([241]) # No Operation +DM = bytes([242]) # Data Mark +BRK = bytes([243]) # Break +IP = bytes([244]) # Interrupt process +AO = bytes([245]) # Abort output +AYT = bytes([246]) # Are You There +EC = bytes([247]) # Erase Character +EL = bytes([248]) # Erase Line +GA = bytes([249]) # Go Ahead +SB = bytes([250]) # Subnegotiation Begin + + +# Telnet protocol options code (don't change) +# These ones all come from arpa/telnet.h +BINARY = bytes([0]) # 8-bit data path +ECHO = bytes([1]) # echo +RCP = bytes([2]) # prepare to reconnect +SGA = bytes([3]) # suppress go ahead +NAMS = bytes([4]) # approximate message size +STATUS = bytes([5]) # give status +TM = bytes([6]) # timing mark +RCTE = bytes([7]) # remote controlled transmission and echo +NAOL = bytes([8]) # negotiate about output line width +NAOP = bytes([9]) # negotiate about output page size +NAOCRD = bytes([10]) # negotiate about CR disposition +NAOHTS = bytes([11]) # negotiate about horizontal tabstops +NAOHTD = bytes([12]) # negotiate about horizontal tab disposition +NAOFFD = bytes([13]) # negotiate about formfeed disposition +NAOVTS = bytes([14]) # negotiate about vertical tab stops +NAOVTD = bytes([15]) # negotiate about vertical tab disposition +NAOLFD = bytes([16]) # negotiate about output LF disposition +XASCII = bytes([17]) # extended ascii character set +LOGOUT = bytes([18]) # force logout +BM = bytes([19]) # byte macro +DET = bytes([20]) # data entry terminal +SUPDUP = bytes([21]) # supdup protocol +SUPDUPOUTPUT = bytes([22]) # supdup output +SNDLOC = bytes([23]) # send location +TTYPE = bytes([24]) # terminal type +EOR = bytes([25]) # end or record +TUID = bytes([26]) # TACACS user identification +OUTMRK = bytes([27]) # output marking +TTYLOC = bytes([28]) # terminal location number +VT3270REGIME = bytes([29]) # 3270 regime +X3PAD = bytes([30]) # X.3 PAD +NAWS = bytes([31]) # window size +TSPEED = bytes([32]) # terminal speed +LFLOW = bytes([33]) # remote flow control +LINEMODE = bytes([34]) # Linemode option +XDISPLOC = bytes([35]) # X Display Location +OLD_ENVIRON = bytes([36]) # Old - Environment variables +AUTHENTICATION = bytes([37]) # Authenticate +ENCRYPT = bytes([38]) # Encryption option +NEW_ENVIRON = bytes([39]) # New - Environment variables +# the following ones come from +# http://www.iana.org/assignments/telnet-options +# Unfortunately, that document does not assign identifiers +# to all of them, so we are making them up +TN3270E = bytes([40]) # TN3270E +XAUTH = bytes([41]) # XAUTH +CHARSET = bytes([42]) # CHARSET +RSP = bytes([43]) # Telnet Remote Serial Port +COM_PORT_OPTION = bytes([44]) # Com Port Control Option +SUPPRESS_LOCAL_ECHO = bytes([45]) # Telnet Suppress Local Echo +TLS = bytes([46]) # Telnet Start TLS +KERMIT = bytes([47]) # KERMIT +SEND_URL = bytes([48]) # SEND-URL +FORWARD_X = bytes([49]) # FORWARD_X +PRAGMA_LOGON = bytes([138]) # TELOPT PRAGMA LOGON +SSPI_LOGON = bytes([139]) # TELOPT SSPI LOGON +PRAGMA_HEARTBEAT = bytes([140]) # TELOPT PRAGMA HEARTBEAT +EXOPL = bytes([255]) # Extended-Options-List +NOOPT = bytes([0]) + + +# poll/select have the advantage of not requiring any extra file descriptor, +# contrarily to epoll/kqueue (also, they require a single syscall). +if hasattr(selectors, 'PollSelector'): + _TelnetSelector = selectors.PollSelector +else: + _TelnetSelector = selectors.SelectSelector + + +class Telnet: + + """Telnet interface class. + + An instance of this class represents a connection to a telnet + server. The instance is initially not connected; the open() + method must be used to establish a connection. Alternatively, the + host name and optional port number can be passed to the + constructor, too. + + Don't try to reopen an already connected instance. + + This class has many read_*() methods. Note that some of them + raise EOFError when the end of the connection is read, because + they can return an empty string for other reasons. See the + individual doc strings. + + read_until(expected, [timeout]) + Read until the expected string has been seen, or a timeout is + hit (default is no timeout); may block. + + read_all() + Read all data until EOF; may block. + + read_some() + Read at least one byte or EOF; may block. + + read_very_eager() + Read all data available already queued or on the socket, + without blocking. + + read_eager() + Read either data already queued or some data available on the + socket, without blocking. + + read_lazy() + Read all data in the raw queue (processing it first), without + doing any socket I/O. + + read_very_lazy() + Reads all data in the cooked queue, without doing any socket + I/O. + + read_sb_data() + Reads available data between SB ... SE sequence. Don't block. + + set_option_negotiation_callback(callback) + Each time a telnet option is read on the input flow, this callback + (if set) is called with the following parameters : + callback(telnet socket, command, option) + option will be chr(0) when there is no option. + No other action is done afterwards by telnetlib. + + """ + + def __init__(self, host=None, port=0, + timeout=socket._GLOBAL_DEFAULT_TIMEOUT): + """Constructor. + + When called without arguments, create an unconnected instance. + With a hostname argument, it connects the instance; port number + and timeout are optional. + """ + self.debuglevel = DEBUGLEVEL + self.host = host + self.port = port + self.timeout = timeout + self.sock = None + self.rawq = b'' + self.irawq = 0 + self.cookedq = b'' + self.eof = 0 + self.iacseq = b'' # Buffer for IAC sequence. + self.sb = 0 # flag for SB and SE sequence. + self.sbdataq = b'' + self.option_callback = None + if host is not None: + self.open(host, port, timeout) + + def open(self, host, port=0, timeout=socket._GLOBAL_DEFAULT_TIMEOUT): + """Connect to a host. + + The optional second argument is the port number, which + defaults to the standard telnet port (23). + + Don't try to reopen an already connected instance. + """ + self.eof = 0 + if not port: + port = TELNET_PORT + self.host = host + self.port = port + self.timeout = timeout + sys.audit("telnetlib.Telnet.open", self, host, port) + self.sock = socket.create_connection((host, port), timeout) + + def __del__(self): + """Destructor -- close the connection.""" + self.close() + + def msg(self, msg, *args): + """Print a debug message, when the debug level is > 0. + + If extra arguments are present, they are substituted in the + message using the standard string formatting operator. + + """ + if self.debuglevel > 0: + print('Telnet(%s,%s):' % (self.host, self.port), end=' ') + if args: + print(msg % args) + else: + print(msg) + + def set_debuglevel(self, debuglevel): + """Set the debug level. + + The higher it is, the more debug output you get (on sys.stdout). + + """ + self.debuglevel = debuglevel + + def close(self): + """Close the connection.""" + sock = self.sock + self.sock = None + self.eof = True + self.iacseq = b'' + self.sb = 0 + if sock: + sock.close() + + def get_socket(self): + """Return the socket object used internally.""" + return self.sock + + def fileno(self): + """Return the fileno() of the socket object used internally.""" + return self.sock.fileno() + + def write(self, buffer): + """Write a string to the socket, doubling any IAC characters. + + Can block if the connection is blocked. May raise + OSError if the connection is closed. + + """ + if IAC in buffer: + buffer = buffer.replace(IAC, IAC+IAC) + sys.audit("telnetlib.Telnet.write", self, buffer) + self.msg("send %r", buffer) + self.sock.sendall(buffer) + + def read_until(self, match, timeout=None): + """Read until a given string is encountered or until timeout. + + When no match is found, return whatever is available instead, + possibly the empty string. Raise EOFError if the connection + is closed and no cooked data is available. + + """ + n = len(match) + self.process_rawq() + i = self.cookedq.find(match) + if i >= 0: + i = i+n + buf = self.cookedq[:i] + self.cookedq = self.cookedq[i:] + return buf + if timeout is not None: + deadline = _time() + timeout + with _TelnetSelector() as selector: + selector.register(self, selectors.EVENT_READ) + while not self.eof: + if selector.select(timeout): + i = max(0, len(self.cookedq)-n) + self.fill_rawq() + self.process_rawq() + i = self.cookedq.find(match, i) + if i >= 0: + i = i+n + buf = self.cookedq[:i] + self.cookedq = self.cookedq[i:] + return buf + if timeout is not None: + timeout = deadline - _time() + if timeout < 0: + break + return self.read_very_lazy() + + def read_all(self): + """Read all data until EOF; block until connection closed.""" + self.process_rawq() + while not self.eof: + self.fill_rawq() + self.process_rawq() + buf = self.cookedq + self.cookedq = b'' + return buf + + def read_some(self): + """Read at least one byte of cooked data unless EOF is hit. + + Return b'' if EOF is hit. Block if no data is immediately + available. + + """ + self.process_rawq() + while not self.cookedq and not self.eof: + self.fill_rawq() + self.process_rawq() + buf = self.cookedq + self.cookedq = b'' + return buf + + def read_very_eager(self): + """Read everything that's possible without blocking in I/O (eager). + + Raise EOFError if connection closed and no cooked data + available. Return b'' if no cooked data available otherwise. + Don't block unless in the midst of an IAC sequence. + + """ + self.process_rawq() + while not self.eof and self.sock_avail(): + self.fill_rawq() + self.process_rawq() + return self.read_very_lazy() + + def read_eager(self): + """Read readily available data. + + Raise EOFError if connection closed and no cooked data + available. Return b'' if no cooked data available otherwise. + Don't block unless in the midst of an IAC sequence. + + """ + self.process_rawq() + while not self.cookedq and not self.eof and self.sock_avail(): + self.fill_rawq() + self.process_rawq() + return self.read_very_lazy() + + def read_lazy(self): + """Process and return data that's already in the queues (lazy). + + Raise EOFError if connection closed and no data available. + Return b'' if no cooked data available otherwise. Don't block + unless in the midst of an IAC sequence. + + """ + self.process_rawq() + return self.read_very_lazy() + + def read_very_lazy(self): + """Return any data available in the cooked queue (very lazy). + + Raise EOFError if connection closed and no data available. + Return b'' if no cooked data available otherwise. Don't block. + + """ + buf = self.cookedq + self.cookedq = b'' + if not buf and self.eof and not self.rawq: + raise EOFError('telnet connection closed') + return buf + + def read_sb_data(self): + """Return any data available in the SB ... SE queue. + + Return b'' if no SB ... SE available. Should only be called + after seeing a SB or SE command. When a new SB command is + found, old unread SB data will be discarded. Don't block. + + """ + buf = self.sbdataq + self.sbdataq = b'' + return buf + + def set_option_negotiation_callback(self, callback): + """Provide a callback function called after each receipt of a telnet option.""" + self.option_callback = callback + + def process_rawq(self): + """Transfer from raw queue to cooked queue. + + Set self.eof when connection is closed. Don't block unless in + the midst of an IAC sequence. + + """ + buf = [b'', b''] + try: + while self.rawq: + c = self.rawq_getchar() + if not self.iacseq: + if c == theNULL: + continue + if c == b"\021": + continue + if c != IAC: + buf[self.sb] = buf[self.sb] + c + continue + else: + self.iacseq += c + elif len(self.iacseq) == 1: + # 'IAC: IAC CMD [OPTION only for WILL/WONT/DO/DONT]' + if c in (DO, DONT, WILL, WONT): + self.iacseq += c + continue + + self.iacseq = b'' + if c == IAC: + buf[self.sb] = buf[self.sb] + c + else: + if c == SB: # SB ... SE start. + self.sb = 1 + self.sbdataq = b'' + elif c == SE: + self.sb = 0 + self.sbdataq = self.sbdataq + buf[1] + buf[1] = b'' + if self.option_callback: + # Callback is supposed to look into + # the sbdataq + self.option_callback(self.sock, c, NOOPT) + else: + # We can't offer automatic processing of + # suboptions. Alas, we should not get any + # unless we did a WILL/DO before. + self.msg('IAC %d not recognized' % ord(c)) + elif len(self.iacseq) == 2: + cmd = self.iacseq[1:2] + self.iacseq = b'' + opt = c + if cmd in (DO, DONT): + self.msg('IAC %s %d', + cmd == DO and 'DO' or 'DONT', ord(opt)) + if self.option_callback: + self.option_callback(self.sock, cmd, opt) + else: + self.sock.sendall(IAC + WONT + opt) + elif cmd in (WILL, WONT): + self.msg('IAC %s %d', + cmd == WILL and 'WILL' or 'WONT', ord(opt)) + if self.option_callback: + self.option_callback(self.sock, cmd, opt) + else: + self.sock.sendall(IAC + DONT + opt) + except EOFError: # raised by self.rawq_getchar() + self.iacseq = b'' # Reset on EOF + self.sb = 0 + pass + self.cookedq = self.cookedq + buf[0] + self.sbdataq = self.sbdataq + buf[1] + + def rawq_getchar(self): + """Get next char from raw queue. + + Block if no data is immediately available. Raise EOFError + when connection is closed. + + """ + if not self.rawq: + self.fill_rawq() + if self.eof: + raise EOFError + c = self.rawq[self.irawq:self.irawq+1] + self.irawq = self.irawq + 1 + if self.irawq >= len(self.rawq): + self.rawq = b'' + self.irawq = 0 + return c + + def fill_rawq(self): + """Fill raw queue from exactly one recv() system call. + + Block if no data is immediately available. Set self.eof when + connection is closed. + + """ + if self.irawq >= len(self.rawq): + self.rawq = b'' + self.irawq = 0 + # The buffer size should be fairly small so as to avoid quadratic + # behavior in process_rawq() above + buf = self.sock.recv(50) + self.msg("recv %r", buf) + self.eof = (not buf) + self.rawq = self.rawq + buf + + def sock_avail(self): + """Test whether data is available on the socket.""" + with _TelnetSelector() as selector: + selector.register(self, selectors.EVENT_READ) + return bool(selector.select(0)) + + def interact(self): + """Interaction function, emulates a very dumb telnet client.""" + if sys.platform == "win32": + self.mt_interact() + return + with _TelnetSelector() as selector: + selector.register(self, selectors.EVENT_READ) + selector.register(sys.stdin, selectors.EVENT_READ) + + while True: + for key, events in selector.select(): + if key.fileobj is self: + try: + text = self.read_eager() + except EOFError: + print('*** Connection closed by remote host ***') + return + if text: + sys.stdout.write(text.decode('ascii')) + sys.stdout.flush() + elif key.fileobj is sys.stdin: + line = sys.stdin.readline().encode('ascii') + if not line: + return + self.write(line) + + def mt_interact(self): + """Multithreaded version of interact().""" + import _thread + _thread.start_new_thread(self.listener, ()) + while 1: + line = sys.stdin.readline() + if not line: + break + self.write(line.encode('ascii')) + + def listener(self): + """Helper for mt_interact() -- this executes in the other thread.""" + while 1: + try: + data = self.read_eager() + except EOFError: + print('*** Connection closed by remote host ***') + return + if data: + sys.stdout.write(data.decode('ascii')) + else: + sys.stdout.flush() + + def expect(self, list, timeout=None): + """Read until one from a list of a regular expressions matches. + + The first argument is a list of regular expressions, either + compiled (re.Pattern instances) or uncompiled (strings). + The optional second argument is a timeout, in seconds; default + is no timeout. + + Return a tuple of three items: the index in the list of the + first regular expression that matches; the re.Match object + returned; and the text read up till and including the match. + + If EOF is read and no text was read, raise EOFError. + Otherwise, when nothing matches, return (-1, None, text) where + text is the text received so far (may be the empty string if a + timeout happened). + + If a regular expression ends with a greedy match (e.g. '.*') + or if more than one expression can match the same input, the + results are undeterministic, and may depend on the I/O timing. + + """ + re = None + list = list[:] + indices = range(len(list)) + for i in indices: + if not hasattr(list[i], "search"): + if not re: import re + list[i] = re.compile(list[i]) + if timeout is not None: + deadline = _time() + timeout + with _TelnetSelector() as selector: + selector.register(self, selectors.EVENT_READ) + while not self.eof: + self.process_rawq() + for i in indices: + m = list[i].search(self.cookedq) + if m: + e = m.end() + text = self.cookedq[:e] + self.cookedq = self.cookedq[e:] + return (i, m, text) + if timeout is not None: + ready = selector.select(timeout) + timeout = deadline - _time() + if not ready: + if timeout < 0: + break + else: + continue + self.fill_rawq() + text = self.read_very_lazy() + if not text and self.eof: + raise EOFError + return (-1, None, text) + + def __enter__(self): + return self + + def __exit__(self, type, value, traceback): + self.close() + + +def test(): + """Test program for telnetlib. + + Usage: python telnetlib.py [-d] ... [host [port]] + + Default host is localhost; default port is 23. + + """ + debuglevel = 0 + while sys.argv[1:] and sys.argv[1] == '-d': + debuglevel = debuglevel+1 + del sys.argv[1] + host = 'localhost' + if sys.argv[1:]: + host = sys.argv[1] + port = 0 + if sys.argv[2:]: + portstr = sys.argv[2] + try: + port = int(portstr) + except ValueError: + port = socket.getservbyname(portstr, 'tcp') + with Telnet() as tn: + tn.set_debuglevel(debuglevel) + tn.open(host, port, timeout=0.5) + tn.interact() + +if __name__ == '__main__': + test() diff --git a/Lib/test/test_telnetlib.py b/Lib/test/test_telnetlib.py new file mode 100644 index 0000000000..414c328b79 --- /dev/null +++ b/Lib/test/test_telnetlib.py @@ -0,0 +1,401 @@ +import socket +import selectors +import telnetlib +import threading +import contextlib + +from test import support +import unittest + +HOST = support.HOST + +def server(evt, serv): + serv.listen() + evt.set() + try: + conn, addr = serv.accept() + conn.close() + except socket.timeout: + pass + finally: + serv.close() + +class GeneralTests(unittest.TestCase): + + def setUp(self): + self.evt = threading.Event() + self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.sock.settimeout(60) # Safety net. Look issue 11812 + self.port = support.bind_port(self.sock) + self.thread = threading.Thread(target=server, args=(self.evt,self.sock)) + self.thread.setDaemon(True) + self.thread.start() + self.evt.wait() + + def tearDown(self): + self.thread.join() + del self.thread # Clear out any dangling Thread objects. + + def testBasic(self): + # connects + telnet = telnetlib.Telnet(HOST, self.port) + telnet.sock.close() + + def testContextManager(self): + with telnetlib.Telnet(HOST, self.port) as tn: + self.assertIsNotNone(tn.get_socket()) + self.assertIsNone(tn.get_socket()) + + def testTimeoutDefault(self): + self.assertTrue(socket.getdefaulttimeout() is None) + socket.setdefaulttimeout(30) + try: + telnet = telnetlib.Telnet(HOST, self.port) + finally: + socket.setdefaulttimeout(None) + self.assertEqual(telnet.sock.gettimeout(), 30) + telnet.sock.close() + + def testTimeoutNone(self): + # None, having other default + self.assertTrue(socket.getdefaulttimeout() is None) + socket.setdefaulttimeout(30) + try: + telnet = telnetlib.Telnet(HOST, self.port, timeout=None) + finally: + socket.setdefaulttimeout(None) + self.assertTrue(telnet.sock.gettimeout() is None) + telnet.sock.close() + + def testTimeoutValue(self): + telnet = telnetlib.Telnet(HOST, self.port, timeout=30) + self.assertEqual(telnet.sock.gettimeout(), 30) + telnet.sock.close() + + def testTimeoutOpen(self): + telnet = telnetlib.Telnet() + telnet.open(HOST, self.port, timeout=30) + self.assertEqual(telnet.sock.gettimeout(), 30) + telnet.sock.close() + + def testGetters(self): + # Test telnet getter methods + telnet = telnetlib.Telnet(HOST, self.port, timeout=30) + t_sock = telnet.sock + self.assertEqual(telnet.get_socket(), t_sock) + self.assertEqual(telnet.fileno(), t_sock.fileno()) + telnet.sock.close() + +class SocketStub(object): + ''' a socket proxy that re-defines sendall() ''' + def __init__(self, reads=()): + self.reads = list(reads) # Intentionally make a copy. + self.writes = [] + self.block = False + def sendall(self, data): + self.writes.append(data) + def recv(self, size): + out = b'' + while self.reads and len(out) < size: + out += self.reads.pop(0) + if len(out) > size: + self.reads.insert(0, out[size:]) + out = out[:size] + return out + +class TelnetAlike(telnetlib.Telnet): + def fileno(self): + raise NotImplementedError() + def close(self): pass + def sock_avail(self): + return (not self.sock.block) + def msg(self, msg, *args): + with support.captured_stdout() as out: + telnetlib.Telnet.msg(self, msg, *args) + self._messages += out.getvalue() + return + +class MockSelector(selectors.BaseSelector): + + def __init__(self): + self.keys = {} + + @property + def resolution(self): + return 1e-3 + + def register(self, fileobj, events, data=None): + key = selectors.SelectorKey(fileobj, 0, events, data) + self.keys[fileobj] = key + return key + + def unregister(self, fileobj): + return self.keys.pop(fileobj) + + def select(self, timeout=None): + block = False + for fileobj in self.keys: + if isinstance(fileobj, TelnetAlike): + block = fileobj.sock.block + break + if block: + return [] + else: + return [(key, key.events) for key in self.keys.values()] + + def get_map(self): + return self.keys + + +@contextlib.contextmanager +def test_socket(reads): + def new_conn(*ignored): + return SocketStub(reads) + try: + old_conn = socket.create_connection + socket.create_connection = new_conn + yield None + finally: + socket.create_connection = old_conn + return + +def test_telnet(reads=(), cls=TelnetAlike): + ''' return a telnetlib.Telnet object that uses a SocketStub with + reads queued up to be read ''' + for x in reads: + assert type(x) is bytes, x + with test_socket(reads): + telnet = cls('dummy', 0) + telnet._messages = '' # debuglevel output + return telnet + +class ExpectAndReadTestCase(unittest.TestCase): + def setUp(self): + self.old_selector = telnetlib._TelnetSelector + telnetlib._TelnetSelector = MockSelector + def tearDown(self): + telnetlib._TelnetSelector = self.old_selector + +class ReadTests(ExpectAndReadTestCase): + def test_read_until(self): + """ + read_until(expected, timeout=None) + test the blocking version of read_util + """ + want = [b'xxxmatchyyy'] + telnet = test_telnet(want) + data = telnet.read_until(b'match') + self.assertEqual(data, b'xxxmatch', msg=(telnet.cookedq, telnet.rawq, telnet.sock.reads)) + + reads = [b'x' * 50, b'match', b'y' * 50] + expect = b''.join(reads[:-1]) + telnet = test_telnet(reads) + data = telnet.read_until(b'match') + self.assertEqual(data, expect) + + + def test_read_all(self): + """ + read_all() + Read all data until EOF; may block. + """ + reads = [b'x' * 500, b'y' * 500, b'z' * 500] + expect = b''.join(reads) + telnet = test_telnet(reads) + data = telnet.read_all() + self.assertEqual(data, expect) + return + + def test_read_some(self): + """ + read_some() + Read at least one byte or EOF; may block. + """ + # test 'at least one byte' + telnet = test_telnet([b'x' * 500]) + data = telnet.read_some() + self.assertTrue(len(data) >= 1) + # test EOF + telnet = test_telnet() + data = telnet.read_some() + self.assertEqual(b'', data) + + def _read_eager(self, func_name): + """ + read_*_eager() + Read all data available already queued or on the socket, + without blocking. + """ + want = b'x' * 100 + telnet = test_telnet([want]) + func = getattr(telnet, func_name) + telnet.sock.block = True + self.assertEqual(b'', func()) + telnet.sock.block = False + data = b'' + while True: + try: + data += func() + except EOFError: + break + self.assertEqual(data, want) + + def test_read_eager(self): + # read_eager and read_very_eager make the same guarantees + # (they behave differently but we only test the guarantees) + self._read_eager('read_eager') + self._read_eager('read_very_eager') + # NB -- we need to test the IAC block which is mentioned in the + # docstring but not in the module docs + + def read_very_lazy(self): + want = b'x' * 100 + telnet = test_telnet([want]) + self.assertEqual(b'', telnet.read_very_lazy()) + while telnet.sock.reads: + telnet.fill_rawq() + data = telnet.read_very_lazy() + self.assertEqual(want, data) + self.assertRaises(EOFError, telnet.read_very_lazy) + + def test_read_lazy(self): + want = b'x' * 100 + telnet = test_telnet([want]) + self.assertEqual(b'', telnet.read_lazy()) + data = b'' + while True: + try: + read_data = telnet.read_lazy() + data += read_data + if not read_data: + telnet.fill_rawq() + except EOFError: + break + self.assertTrue(want.startswith(data)) + self.assertEqual(data, want) + +class nego_collector(object): + def __init__(self, sb_getter=None): + self.seen = b'' + self.sb_getter = sb_getter + self.sb_seen = b'' + + def do_nego(self, sock, cmd, opt): + self.seen += cmd + opt + if cmd == tl.SE and self.sb_getter: + sb_data = self.sb_getter() + self.sb_seen += sb_data + +tl = telnetlib + +class WriteTests(unittest.TestCase): + '''The only thing that write does is replace each tl.IAC for + tl.IAC+tl.IAC''' + + def test_write(self): + data_sample = [b'data sample without IAC', + b'data sample with' + tl.IAC + b' one IAC', + b'a few' + tl.IAC + tl.IAC + b' iacs' + tl.IAC, + tl.IAC, + b''] + for data in data_sample: + telnet = test_telnet() + telnet.write(data) + written = b''.join(telnet.sock.writes) + self.assertEqual(data.replace(tl.IAC,tl.IAC+tl.IAC), written) + +class OptionTests(unittest.TestCase): + # RFC 854 commands + cmds = [tl.AO, tl.AYT, tl.BRK, tl.EC, tl.EL, tl.GA, tl.IP, tl.NOP] + + def _test_command(self, data): + """ helper for testing IAC + cmd """ + telnet = test_telnet(data) + data_len = len(b''.join(data)) + nego = nego_collector() + telnet.set_option_negotiation_callback(nego.do_nego) + txt = telnet.read_all() + cmd = nego.seen + self.assertTrue(len(cmd) > 0) # we expect at least one command + self.assertIn(cmd[:1], self.cmds) + self.assertEqual(cmd[1:2], tl.NOOPT) + self.assertEqual(data_len, len(txt + cmd)) + nego.sb_getter = None # break the nego => telnet cycle + + def test_IAC_commands(self): + for cmd in self.cmds: + self._test_command([tl.IAC, cmd]) + self._test_command([b'x' * 100, tl.IAC, cmd, b'y'*100]) + self._test_command([b'x' * 10, tl.IAC, cmd, b'y'*10]) + # all at once + self._test_command([tl.IAC + cmd for (cmd) in self.cmds]) + + def test_SB_commands(self): + # RFC 855, subnegotiations portion + send = [tl.IAC + tl.SB + tl.IAC + tl.SE, + tl.IAC + tl.SB + tl.IAC + tl.IAC + tl.IAC + tl.SE, + tl.IAC + tl.SB + tl.IAC + tl.IAC + b'aa' + tl.IAC + tl.SE, + tl.IAC + tl.SB + b'bb' + tl.IAC + tl.IAC + tl.IAC + tl.SE, + tl.IAC + tl.SB + b'cc' + tl.IAC + tl.IAC + b'dd' + tl.IAC + tl.SE, + ] + telnet = test_telnet(send) + nego = nego_collector(telnet.read_sb_data) + telnet.set_option_negotiation_callback(nego.do_nego) + txt = telnet.read_all() + self.assertEqual(txt, b'') + want_sb_data = tl.IAC + tl.IAC + b'aabb' + tl.IAC + b'cc' + tl.IAC + b'dd' + self.assertEqual(nego.sb_seen, want_sb_data) + self.assertEqual(b'', telnet.read_sb_data()) + nego.sb_getter = None # break the nego => telnet cycle + + def test_debuglevel_reads(self): + # test all the various places that self.msg(...) is called + given_a_expect_b = [ + # Telnet.fill_rawq + (b'a', ": recv b''\n"), + # Telnet.process_rawq + (tl.IAC + bytes([88]), ": IAC 88 not recognized\n"), + (tl.IAC + tl.DO + bytes([1]), ": IAC DO 1\n"), + (tl.IAC + tl.DONT + bytes([1]), ": IAC DONT 1\n"), + (tl.IAC + tl.WILL + bytes([1]), ": IAC WILL 1\n"), + (tl.IAC + tl.WONT + bytes([1]), ": IAC WONT 1\n"), + ] + for a, b in given_a_expect_b: + telnet = test_telnet([a]) + telnet.set_debuglevel(1) + txt = telnet.read_all() + self.assertIn(b, telnet._messages) + return + + def test_debuglevel_write(self): + telnet = test_telnet() + telnet.set_debuglevel(1) + telnet.write(b'xxx') + expected = "send b'xxx'\n" + self.assertIn(expected, telnet._messages) + + def test_debug_accepts_str_port(self): + # Issue 10695 + with test_socket([]): + telnet = TelnetAlike('dummy', '0') + telnet._messages = '' + telnet.set_debuglevel(1) + telnet.msg('test') + self.assertRegex(telnet._messages, r'0.*test') + + +class ExpectTests(ExpectAndReadTestCase): + def test_expect(self): + """ + expect(expected, [timeout]) + Read until the expected string has been seen, or a timeout is + hit (default is no timeout); may block. + """ + want = [b'x' * 10, b'match', b'y' * 10] + telnet = test_telnet(want) + (_,_,data) = telnet.expect([b'match']) + self.assertEqual(data, b''.join(want[:-1])) + + +if __name__ == '__main__': + unittest.main()