mirror of
https://github.com/Hopiu/linkchecker.git
synced 2026-05-05 13:14:46 +00:00
Stop the telnet server threads more reliably
Instead of speaking text-based protocols over TCP we can use threading.Event() objects to indicate the desire for the server thread to quit.
This commit is contained in:
parent
8489730eac
commit
068e9bae8d
1 changed files with 15 additions and 28 deletions
|
|
@ -17,8 +17,6 @@
|
|||
"""
|
||||
Define http test support classes for LinkChecker tests.
|
||||
"""
|
||||
import sys
|
||||
import os
|
||||
import time
|
||||
import threading
|
||||
import telnetlib
|
||||
|
|
@ -36,6 +34,8 @@ class TelnetServerTest (LinkCheckTest):
|
|||
super(TelnetServerTest, self).__init__(methodName=methodName)
|
||||
self.host = 'localhost'
|
||||
self.port = None
|
||||
self.stop_event = threading.Event()
|
||||
self.server_thread = None
|
||||
|
||||
def get_url(self, user=None, password=None):
|
||||
if user is not None:
|
||||
|
|
@ -49,20 +49,18 @@ class TelnetServerTest (LinkCheckTest):
|
|||
|
||||
def setUp (self):
|
||||
"""Start a new Telnet server in a new thread."""
|
||||
deadline = time.time() + 10 * TIMEOUT
|
||||
self.port = start_server(self.host, 0, deadline=deadline,
|
||||
testcase=self.id())
|
||||
self.port, self.server_thread = start_server(self.host, 0, self.stop_event)
|
||||
self.assertFalse(self.port is None)
|
||||
|
||||
def tearDown(self):
|
||||
"""Send QUIT request to telnet server."""
|
||||
try:
|
||||
stop_server(self.host, self.port)
|
||||
except Exception:
|
||||
pass
|
||||
self.stop_event.set()
|
||||
if self.server_thread is not None:
|
||||
self.server_thread.join(10)
|
||||
assert not self.server_thread.is_alive()
|
||||
|
||||
|
||||
def start_server (host, port, deadline, testcase):
|
||||
def start_server (host, port, stop_event):
|
||||
# Instantiate Telnet server class and listen to host:port
|
||||
clients = []
|
||||
def on_connect(client):
|
||||
|
|
@ -70,7 +68,7 @@ def start_server (host, port, deadline, testcase):
|
|||
client.send("Telnet test server\n")
|
||||
server = miniboa.TelnetServer(port=port, address=host, on_connect=on_connect)
|
||||
port = server.server_socket.getsockname()[1]
|
||||
t = threading.Thread(None, serve_forever, args=(server, clients, deadline, testcase))
|
||||
t = threading.Thread(None, serve_forever, args=(server, clients, stop_event))
|
||||
t.start()
|
||||
# wait for server to start up
|
||||
tries = 0
|
||||
|
|
@ -83,33 +81,22 @@ def start_server (host, port, deadline, testcase):
|
|||
break
|
||||
except:
|
||||
time.sleep(0.5)
|
||||
return port
|
||||
return port, t
|
||||
|
||||
|
||||
def stop_server (host, port):
|
||||
"""Stop a running FTP server."""
|
||||
client = telnetlib.Telnet(timeout=TIMEOUT)
|
||||
client.open(host, port)
|
||||
client.write("stop\n")
|
||||
|
||||
|
||||
def serve_forever(server, clients, deadline, testcase):
|
||||
def serve_forever(server, clients, stop_event):
|
||||
"""Run poll loop for server."""
|
||||
while True:
|
||||
if time.time() > deadline:
|
||||
# It could be that the test is very slow, but that's unlikely.
|
||||
raise AssertionError('deadlock detected in %s, aborting' % testcase)
|
||||
if stop_event.is_set():
|
||||
return
|
||||
server.poll()
|
||||
for client in clients:
|
||||
if client.active and client.cmd_ready:
|
||||
if not handle_cmd(client):
|
||||
return
|
||||
handle_cmd(client)
|
||||
|
||||
|
||||
def handle_cmd(client):
|
||||
"""Handle telnet clients."""
|
||||
msg = client.get_command().lower()
|
||||
if msg == 'exit':
|
||||
client.active = False
|
||||
elif msg == 'stop':
|
||||
return False
|
||||
return True
|
||||
|
|
|
|||
Loading…
Reference in a new issue