add a finish() method to wait for spawned threads to finish

git-svn-id: https://linkchecker.svn.sourceforge.net/svnroot/linkchecker/trunk/linkchecker@3290 e7d03fd6-7b0d-0410-9947-9c21f3af8025
This commit is contained in:
calvin 2006-05-23 19:55:35 +00:00
parent 7a94995345
commit 27997b0251
2 changed files with 28 additions and 17 deletions

View file

@ -53,9 +53,7 @@ def check_urls (aggregate):
except StandardError:
status.internal_error()
aggregate.abort()
if aggregate.config["status"]:
status.disable_status()
status.join()
aggregate.finish()
aggregate.logger.end_log_output()

View file

@ -15,6 +15,7 @@
# along with this program; if not, write to the Free Software
# Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
"""Aggregate needed object instances for checker threads."""
import Queue
import thread
import threading
import time
@ -52,14 +53,17 @@ class Aggregate (object):
self.cookies = cookies
self.robots_txt = robots_txt
self.logger = logger.Logger(config)
self.threads = []
def start_threads (self):
if self.config["status"]:
start_thread(status.do_status, self.urlqueue)
t = start_thread(status.do_status, self.urlqueue)
self.threads.append(t)
num = self.config["threads"]
if num >= 1:
for i in xrange(num):
start_thread(self.worker)
t = start_thread(self.worker)
self.threads.append(t)
else:
self.worker()
@ -74,21 +78,21 @@ class Aggregate (object):
def check_url (self):
try:
url_data = self.urlqueue.get(timeout=1)
except linkcheck.cache.urlqueue.Timeout:
except Queue.Empty:
time.sleep(1)
return
if url_data is not None:
try:
if url_data.url is None:
url = ""
else:
url = url_data.url.encode("ascii", "replace")
threading.currentThread().setName("Thread-%s" % url)
if not url_data.has_result:
url_data.check()
self.logger.log_url(url_data)
finally:
self.urlqueue.task_done(url_data)
self.check_url_data(url_data)
def check_url_data (self, url_data):
try:
url = url_data.url.encode("ascii", "replace")
threading.currentThread().setName("Thread-%s" % url)
if not url_data.has_result:
url_data.check()
self.logger.log_url(url_data)
finally:
self.urlqueue.task_done(url_data)
def abort (self):
self.urlqueue.do_shutdown()
@ -96,3 +100,12 @@ class Aggregate (object):
self.urlqueue.join(timeout=self.config["timeout"])
except linkcheck.cache.urlqueue.Timeout:
linkcheck.log.warn(linkcheck.LOG_CHECK, "Abort timed out")
def finish (self):
assert self.urlqueue.empty()
if self.config["status"]:
status.disable_status()
for t in self.threads:
t.join(2)
if t.isAlive():
linkcheck.log.warn(linkcheck.LOG_CHECK, "Thread %s still active", t)