diff --git a/linkcheck/director/__init__.py b/linkcheck/director/__init__.py index f6aa9627..cbda665f 100644 --- a/linkcheck/director/__init__.py +++ b/linkcheck/director/__init__.py @@ -53,9 +53,7 @@ def check_urls (aggregate): except StandardError: status.internal_error() aggregate.abort() - if aggregate.config["status"]: - status.disable_status() - status.join() + aggregate.finish() aggregate.logger.end_log_output() diff --git a/linkcheck/director/aggregator.py b/linkcheck/director/aggregator.py index 9a9f8712..1bbf598c 100644 --- a/linkcheck/director/aggregator.py +++ b/linkcheck/director/aggregator.py @@ -15,6 +15,7 @@ # along with this program; if not, write to the Free Software # Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. """Aggregate needed object instances for checker threads.""" +import Queue import thread import threading import time @@ -52,14 +53,17 @@ class Aggregate (object): self.cookies = cookies self.robots_txt = robots_txt self.logger = logger.Logger(config) + self.threads = [] def start_threads (self): if self.config["status"]: - start_thread(status.do_status, self.urlqueue) + t = start_thread(status.do_status, self.urlqueue) + self.threads.append(t) num = self.config["threads"] if num >= 1: for i in xrange(num): - start_thread(self.worker) + t = start_thread(self.worker) + self.threads.append(t) else: self.worker() @@ -74,21 +78,21 @@ class Aggregate (object): def check_url (self): try: url_data = self.urlqueue.get(timeout=1) - except linkcheck.cache.urlqueue.Timeout: + except Queue.Empty: time.sleep(1) return if url_data is not None: - try: - if url_data.url is None: - url = "" - else: - url = url_data.url.encode("ascii", "replace") - threading.currentThread().setName("Thread-%s" % url) - if not url_data.has_result: - url_data.check() - self.logger.log_url(url_data) - finally: - self.urlqueue.task_done(url_data) + self.check_url_data(url_data) + + def check_url_data (self, url_data): + try: + url = url_data.url.encode("ascii", "replace") + threading.currentThread().setName("Thread-%s" % url) + if not url_data.has_result: + url_data.check() + self.logger.log_url(url_data) + finally: + self.urlqueue.task_done(url_data) def abort (self): self.urlqueue.do_shutdown() @@ -96,3 +100,12 @@ class Aggregate (object): self.urlqueue.join(timeout=self.config["timeout"]) except linkcheck.cache.urlqueue.Timeout: linkcheck.log.warn(linkcheck.LOG_CHECK, "Abort timed out") + + def finish (self): + assert self.urlqueue.empty() + if self.config["status"]: + status.disable_status() + for t in self.threads: + t.join(2) + if t.isAlive(): + linkcheck.log.warn(linkcheck.LOG_CHECK, "Thread %s still active", t)