mirror of
https://github.com/Hopiu/linkchecker.git
synced 2026-05-04 21:04:41 +00:00
Use Python threads in stead of Qt threads. The WSGI script works now.
This commit is contained in:
parent
dab01b00d6
commit
0fdaf00154
2 changed files with 55 additions and 106 deletions
|
|
@ -25,7 +25,8 @@ from ..cache import urlqueue
|
|||
from . import logger, status, checker, cleanup
|
||||
|
||||
|
||||
_lock = threading.Lock()
|
||||
_w3_time_lock = threading.Lock()
|
||||
_threads_lock = threading.Lock()
|
||||
|
||||
class Aggregate (object):
|
||||
"""Store thread-safe data collections for checker threads."""
|
||||
|
|
@ -41,6 +42,7 @@ class Aggregate (object):
|
|||
self.threads = []
|
||||
self.last_w3_call = 0
|
||||
|
||||
@synchronized(_threads_lock)
|
||||
def start_threads (self):
|
||||
"""Spawn threads for URL checking and status printing."""
|
||||
if self.config["status"]:
|
||||
|
|
@ -60,6 +62,7 @@ class Aggregate (object):
|
|||
else:
|
||||
checker.check_url(self.urlqueue, self.logger)
|
||||
|
||||
@synchronized(_threads_lock)
|
||||
def print_active_threads (self):
|
||||
"""Log all currently active threads."""
|
||||
first = True
|
||||
|
|
@ -84,10 +87,16 @@ class Aggregate (object):
|
|||
except urlqueue.Timeout:
|
||||
log.warn(LOG_CHECK, "Abort timed out")
|
||||
|
||||
@synchronized(_threads_lock)
|
||||
def remove_stopped_threads (self):
|
||||
"Remove the stopped threads from the internal thread list."""
|
||||
"""Remove the stopped threads from the internal thread list."""
|
||||
self._remove_stopped_threads()
|
||||
|
||||
def _remove_stopped_threads (self):
|
||||
"""Not threads-safe function to really remove stopped threads."""
|
||||
self.threads = [t for t in self.threads if t.is_alive()]
|
||||
|
||||
@synchronized(_threads_lock)
|
||||
def finish (self):
|
||||
"""Wait for checker threads to finish."""
|
||||
assert self.urlqueue.empty()
|
||||
|
|
@ -95,7 +104,13 @@ class Aggregate (object):
|
|||
t.stop()
|
||||
self.connections.clear()
|
||||
|
||||
@synchronized(_lock)
|
||||
@synchronized(_threads_lock)
|
||||
def is_finished (self):
|
||||
"""Determine if checking is finished."""
|
||||
self._remove_stopped_threads()
|
||||
return self.urlqueue.empty() and not self.threads
|
||||
|
||||
@synchronized(_w3_time_lock)
|
||||
def check_w3_time (self):
|
||||
"""Make sure the W3C validators are at most called once a second."""
|
||||
if time.time() - self.last_w3_call < 1:
|
||||
|
|
|
|||
|
|
@ -21,15 +21,13 @@ Functions used by the WSGI script.
|
|||
import cgi
|
||||
import os
|
||||
import threading
|
||||
from StringIO import StringIO
|
||||
import locale
|
||||
import re
|
||||
import time
|
||||
import urlparse
|
||||
from PyQt4 import QtCore
|
||||
from . import configuration, strformat, checker, director, \
|
||||
add_intern_pattern, get_link_pat, init_i18n, url as urlutil
|
||||
from .logger import Logger
|
||||
from .decorators import synchronized
|
||||
|
||||
|
||||
def application(environ, start_response):
|
||||
|
|
@ -85,93 +83,31 @@ def formvalue (form, key):
|
|||
return field
|
||||
|
||||
|
||||
class ThreadsafeStringIO (StringIO):
|
||||
"""Thread-safe String I/O class."""
|
||||
_lock = threading.Lock()
|
||||
class ThreadsafeIO (object):
|
||||
"""Thread-safe I/O class."""
|
||||
|
||||
def __init__(self):
|
||||
"""Initialize buffer."""
|
||||
self.buf = []
|
||||
self.lock = threading.Lock()
|
||||
self.closed = False
|
||||
|
||||
@synchronized(_lock)
|
||||
def write (self, data):
|
||||
self.lock.acquire()
|
||||
try:
|
||||
if self.closed:
|
||||
raise IOError("Write on closed I/O object")
|
||||
self.buf.append(data)
|
||||
finally:
|
||||
self.lock.release()
|
||||
if self.closed:
|
||||
raise IOError("Write on closed I/O object")
|
||||
self.buf.append(data)
|
||||
|
||||
@synchronized(_lock)
|
||||
def get_data (self):
|
||||
self.lock.acquire()
|
||||
try:
|
||||
data = "".join(self.buf)
|
||||
self.buf = []
|
||||
return data
|
||||
finally:
|
||||
self.lock.release()
|
||||
data = u"".join(self.buf)
|
||||
self.buf = []
|
||||
return data
|
||||
|
||||
@synchronized(_lock)
|
||||
def close (self):
|
||||
self.lock.acquire()
|
||||
try:
|
||||
self.buf = []
|
||||
self.closed = True
|
||||
finally:
|
||||
self.lock.release()
|
||||
|
||||
|
||||
class SignalLogger (Logger):
|
||||
"""Use Qt signals for logged URLs and statistics."""
|
||||
|
||||
def __init__ (self, **args):
|
||||
"""Store signals for URL and statistic data."""
|
||||
super(SignalLogger, self).__init__(**args)
|
||||
self.log_url_signal = args["signal"]
|
||||
self.log_stats_signal = args["stats"]
|
||||
|
||||
def start_fileoutput (self):
|
||||
"""Override fileoutput handling of base class."""
|
||||
pass
|
||||
|
||||
def close_fileoutput (self):
|
||||
"""Override fileoutput handling of base class."""
|
||||
pass
|
||||
|
||||
def log_url (self, url_data):
|
||||
"""Emit URL data which gets logged in the main window."""
|
||||
self.log_url_signal.emit(url_data)
|
||||
|
||||
def end_output (self):
|
||||
"""Emit statistic data which gets logged in the main window."""
|
||||
self.log_stats_signal.emit(self.stats)
|
||||
|
||||
|
||||
class DelegateLogger (QtCore.QObject):
|
||||
"""Logger using connected signals, delegating output to
|
||||
another logger class."""
|
||||
log_url_signal = QtCore.pyqtSignal(object)
|
||||
log_stats_signal = QtCore.pyqtSignal(object)
|
||||
|
||||
def __init__ (self):
|
||||
"""Connect signals to this instance and init state."""
|
||||
super(DelegateLogger, self).__init__()
|
||||
self.log_url_signal.connect(self.log_url)
|
||||
self.log_stats_signal.connect(self.log_stats)
|
||||
self.logger = None
|
||||
self.finished = False
|
||||
|
||||
def add_logger (self, logger):
|
||||
"""Delegate to given logger."""
|
||||
self.logger = logger
|
||||
|
||||
def log_url (self, url_data):
|
||||
"""Delegate URL logging to internal logger."""
|
||||
self.logger.log_url(url_data)
|
||||
|
||||
def log_stats (self, statistics):
|
||||
"""Delegate statistic logging to internal logger."""
|
||||
self.logger.stats = statistics
|
||||
self.logger.end_output()
|
||||
self.finished = True
|
||||
self.buf = []
|
||||
self.closed = True
|
||||
|
||||
|
||||
def encode(s):
|
||||
|
|
@ -188,11 +124,8 @@ def checklink (form=None, env=os.environ):
|
|||
log(env, errmsg)
|
||||
yield encode(format_error(errmsg))
|
||||
return
|
||||
delegate_logger = DelegateLogger()
|
||||
config = get_configuration(form, delegate_logger)
|
||||
out = ThreadsafeStringIO()
|
||||
html_logger = config.logger_new('html', fd=out)
|
||||
delegate_logger.add_logger(html_logger)
|
||||
out = ThreadsafeIO()
|
||||
config = get_configuration(form, out)
|
||||
url = strformat.stripurl(formvalue(form, "url"))
|
||||
aggregate = director.get_aggregate(config)
|
||||
url_data = checker.get_url_from(url, 0, aggregate)
|
||||
|
|
@ -204,7 +137,12 @@ def checklink (form=None, env=os.environ):
|
|||
yield encode(format_error(msg))
|
||||
return
|
||||
aggregate.urlqueue.put(url_data)
|
||||
html_logger.start_output()
|
||||
for html_str in start_check(aggregate, out):
|
||||
yield encode(html_str)
|
||||
out.close()
|
||||
|
||||
|
||||
def start_check (aggregate, out):
|
||||
# check in background
|
||||
t = threading.Thread(target=director.check_urls, args=(aggregate,))
|
||||
t.start()
|
||||
|
|
@ -214,27 +152,22 @@ def checklink (form=None, env=os.environ):
|
|||
max_seconds = 300
|
||||
# current running time
|
||||
run_seconds = 0
|
||||
while not delegate_logger.finished:
|
||||
data = out.get_data()
|
||||
yield encode(data)
|
||||
while not aggregate.is_finished():
|
||||
yield out.get_data()
|
||||
time.sleep(sleep_seconds)
|
||||
run_seconds += sleep_seconds
|
||||
if run_seconds > max_seconds:
|
||||
# XXX signal checker to stop
|
||||
html_logger.end_output()
|
||||
director.abort(aggregate)
|
||||
break
|
||||
yield encode(out.get_data())
|
||||
out.close()
|
||||
yield out.get_data()
|
||||
|
||||
|
||||
def get_configuration(form, logger):
|
||||
def get_configuration(form, out):
|
||||
"""Initialize a CGI configuration."""
|
||||
config = configuration.Configuration()
|
||||
config["recursionlevel"] = int(formvalue(form, "level"))
|
||||
config.logger_add("signal", SignalLogger)
|
||||
config["logger"] = config.logger_new('signal',
|
||||
signal=logger.log_url_signal, stats=logger.log_stats_signal)
|
||||
config["threads"] = 0
|
||||
config["logger"] = config.logger_new('html', fd=out)
|
||||
config["threads"] = 2
|
||||
if "anchors" in form:
|
||||
config["anchors"] = True
|
||||
if "errors" not in form:
|
||||
|
|
@ -268,19 +201,20 @@ def checkform (form):
|
|||
if not url or url == "http://":
|
||||
raise LCFormError(_("empty url was given"))
|
||||
if not urlutil.is_safe_url(url):
|
||||
raise LCFormError(_("disallowed url was given"))
|
||||
raise LCFormError(_("disallowed url %r was given") % url)
|
||||
else:
|
||||
raise LCFormError(_("no url was given"))
|
||||
# check recursion level
|
||||
if "level" in form:
|
||||
level = formvalue(form, "level")
|
||||
if not _is_level(level):
|
||||
raise LCFormError(_("invalid recursion level"))
|
||||
raise LCFormError(_("invalid recursion level %r") % level)
|
||||
# check options
|
||||
for option in ("anchors", "errors", "intern"):
|
||||
if option in form:
|
||||
if not formvalue(form, option) == "on":
|
||||
raise LCFormError(_("invalid %s option syntax") % option)
|
||||
value = formvalue(form, option)
|
||||
if value != "on":
|
||||
raise LCFormError(_("invalid %s option %r") % (option, value))
|
||||
|
||||
|
||||
def log (env, msg):
|
||||
|
|
|
|||
Loading…
Reference in a new issue