make the consumer a lock object itself

git-svn-id: https://linkchecker.svn.sourceforge.net/svnroot/linkchecker/trunk/linkchecker@2386 e7d03fd6-7b0d-0410-9947-9c21f3af8025
This commit is contained in:
calvin 2005-03-09 20:28:16 +00:00
parent 8e289cc522
commit 3373644560

View file

@ -52,16 +52,16 @@ def print_duration (duration):
print >> stderr, msg,
class Consumer (object):
class Consumer (threading.Lock):
"""
Consume urls from the url queue in a threaded manner. Also supply
thread-safe data acquiring methods.
Consume urls from the url queue in a thread-safe manner.
"""
def __init__ (self, config, cache):
"""
Initialize consumer data and threads.
"""
super(Consumer, self).__init__()
self.config = config
self.cache = cache
self.threader = linkcheck.threader.Threader()
@ -69,14 +69,26 @@ class Consumer (object):
self.logger = config['logger']
self.fileoutput = config['fileoutput']
self.linknumber = 0
# one lock for the data
self.lock = threading.Lock()
# if checking had errors
self.errors = False
# if checking had warnings
self.warnings = False
self.logger_start_output()
def acquire (self):
"""
Acquire lock.
"""
linkcheck.log.debug(linkcheck.LOG_THREAD, "acquire data lock")
super(Consumer, self).acquire()
def release (self):
"""
Release lock.
"""
linkcheck.log.debug(linkcheck.LOG_THREAD, "release data lock")
super(Consumer, self).release()
def _set_threads (self, num):
"""
Set number of checker threads to start.
@ -136,22 +148,22 @@ class Consumer (object):
"""
Return True if checking is finished.
"""
self.lock.acquire()
self.acquire()
try:
return self.threader.finished() and \
self.cache.incoming_len() <= 0
finally:
self.lock.release()
self.release()
def no_more_threads (self):
"""
Return True if no more active threads are running.
"""
self.lock.acquire()
self.acquire()
try:
return self.threader.finished()
finally:
self.lock.release()
self.release()
def abort (self):
"""
@ -164,11 +176,11 @@ class Consumer (object):
"keyboard interrupt; waiting for %d active threads to finish",
num)
linkcheck.log.warn(linkcheck.LOG_CHECK, msg, num)
self.lock.acquire()
self.acquire()
try:
self.threader.finish()
finally:
self.lock.release()
self.release()
time.sleep(2)
self.logger_end_output()
@ -176,7 +188,7 @@ class Consumer (object):
"""
Print check status looking at url queues.
"""
self.lock.acquire()
self.acquire()
try:
print >> stderr, _("Status:"),
active = self.threader.active_threads()
@ -186,25 +198,25 @@ class Consumer (object):
print_duration(curtime - start_time)
print >> stderr
finally:
self.lock.release()
self.release()
def logger_start_output (self):
"""
Start output of all configured loggers.
"""
self.lock.acquire()
self.acquire()
try:
self.logger.start_output()
for logger in self.fileoutput:
logger.start_output()
finally:
self.lock.release()
self.release()
def logger_new_url (self, url_data):
"""
Send new url to all configured loggers.
"""
self.lock.acquire()
self.acquire()
try:
self.linknumber += 1
do_filter = (self.linknumber % 1000) == 0
@ -218,7 +230,7 @@ class Consumer (object):
for log in self.fileoutput:
log.new_url(url_data)
finally:
self.lock.release()
self.release()
# XXX deadlock!
#if do_filter:
# self.filter_queue(self)
@ -227,33 +239,33 @@ class Consumer (object):
"""
End output of all configured loggers.
"""
self.lock.acquire()
self.acquire()
try:
self.logger.end_output(linknumber=self.linknumber)
for logger in self.fileoutput:
logger.end_output(linknumber=self.linknumber)
finally:
self.lock.release()
self.release()
def active_threads (self):
"""
Return number of active threads.
"""
self.lock.acquire()
self.acquire()
try:
return self.threader.active_threads()
finally:
self.lock.release()
self.release()
def get_country_name (self, host):
"""
Return country code for host if found, else None.
"""
self.lock.acquire()
self.acquire()
try:
gi = self.config["geoip"]
if gi:
return linkcheck.checker.geoip.get_country(gi, host)
return None
finally:
self.lock.release()
self.release()