mirror of
https://github.com/Hopiu/linkchecker.git
synced 2026-03-27 03:00:36 +00:00
Linkchecker's telnet://username:password@host:port URL verification logic is - connect to host:port - wait for 'login: ' to appear (with a 10 second timeout), send username - wait for 'Password: ' to appear (with a 10 second timeout), send password The test spawns a fake telnet server on localhost that never presented the login/password prompts, forcing the 10 second timeout three times. This commit makes the fake telnet server emit the expected prompts, making the test pass in .2 seconds.
104 lines
3.3 KiB
Python
104 lines
3.3 KiB
Python
# -*- coding: iso-8859-1 -*-
|
|
# Copyright (C) 2012 Bastian Kleineidam
|
|
#
|
|
# This program is free software; you can redistribute it and/or modify
|
|
# it under the terms of the GNU General Public License as published by
|
|
# the Free Software Foundation; either version 2 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# This program is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
# GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License along
|
|
# with this program; if not, write to the Free Software Foundation, Inc.,
|
|
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
|
|
"""
|
|
Define http test support classes for LinkChecker tests.
|
|
"""
|
|
import time
|
|
import threading
|
|
import telnetlib
|
|
import miniboa
|
|
from . import LinkCheckTest
|
|
|
|
|
|
TIMEOUT = 5
|
|
|
|
class TelnetServerTest (LinkCheckTest):
|
|
"""Start/stop a Telnet server that can be used for testing."""
|
|
|
|
def __init__ (self, methodName='runTest'):
|
|
"""Init test class and store default ftp server port."""
|
|
super(TelnetServerTest, self).__init__(methodName=methodName)
|
|
self.host = 'localhost'
|
|
self.port = None
|
|
self.stop_event = threading.Event()
|
|
self.server_thread = None
|
|
|
|
def get_url(self, user=None, password=None):
|
|
if user is not None:
|
|
if password is not None:
|
|
netloc = u"%s:%s@%s" % (user, password, self.host)
|
|
else:
|
|
netloc = u"%s@%s" % (user, self.host)
|
|
else:
|
|
netloc = self.host
|
|
return u"telnet://%s:%d" % (netloc, self.port)
|
|
|
|
def setUp (self):
|
|
"""Start a new Telnet server in a new thread."""
|
|
self.port, self.server_thread = start_server(self.host, 0, self.stop_event)
|
|
self.assertFalse(self.port is None)
|
|
|
|
def tearDown(self):
|
|
"""Send QUIT request to telnet server."""
|
|
self.stop_event.set()
|
|
if self.server_thread is not None:
|
|
self.server_thread.join(10)
|
|
assert not self.server_thread.is_alive()
|
|
|
|
|
|
def start_server (host, port, stop_event):
|
|
# Instantiate Telnet server class and listen to host:port
|
|
clients = []
|
|
def on_connect(client):
|
|
clients.append(client)
|
|
client.send("Telnet test server\nlogin: ")
|
|
server = miniboa.TelnetServer(port=port, address=host, on_connect=on_connect)
|
|
port = server.server_socket.getsockname()[1]
|
|
t = threading.Thread(None, serve_forever, args=(server, clients, stop_event))
|
|
t.start()
|
|
# wait for server to start up
|
|
tries = 0
|
|
while tries < 5:
|
|
tries += 1
|
|
try:
|
|
client = telnetlib.Telnet(timeout=TIMEOUT)
|
|
client.open(host, port)
|
|
client.write("exit\n")
|
|
break
|
|
except:
|
|
time.sleep(0.5)
|
|
return port, t
|
|
|
|
|
|
def serve_forever(server, clients, stop_event):
|
|
"""Run poll loop for server."""
|
|
while True:
|
|
if stop_event.is_set():
|
|
return
|
|
server.poll()
|
|
for client in clients:
|
|
if client.active and client.cmd_ready:
|
|
handle_cmd(client)
|
|
|
|
|
|
def handle_cmd(client):
|
|
"""Handle telnet clients."""
|
|
msg = client.get_command().lower()
|
|
if msg == 'exit':
|
|
client.active = False
|
|
else:
|
|
client.send("Password: ")
|