diff --git a/linkcheck/checker/consumer.py b/linkcheck/checker/consumer.py index c5763816..7efb4f95 100644 --- a/linkcheck/checker/consumer.py +++ b/linkcheck/checker/consumer.py @@ -52,16 +52,16 @@ def print_duration (duration): print >> stderr, msg, -class Consumer (object): +class Consumer (threading.Lock): """ - Consume urls from the url queue in a threaded manner. Also supply - thread-safe data acquiring methods. + Consume urls from the url queue in a thread-safe manner. """ def __init__ (self, config, cache): """ Initialize consumer data and threads. """ + super(Consumer, self).__init__() self.config = config self.cache = cache self.threader = linkcheck.threader.Threader() @@ -69,14 +69,26 @@ class Consumer (object): self.logger = config['logger'] self.fileoutput = config['fileoutput'] self.linknumber = 0 - # one lock for the data - self.lock = threading.Lock() # if checking had errors self.errors = False # if checking had warnings self.warnings = False self.logger_start_output() + def acquire (self): + """ + Acquire lock. + """ + linkcheck.log.debug(linkcheck.LOG_THREAD, "acquire data lock") + super(Consumer, self).acquire() + + def release (self): + """ + Release lock. + """ + linkcheck.log.debug(linkcheck.LOG_THREAD, "release data lock") + super(Consumer, self).release() + def _set_threads (self, num): """ Set number of checker threads to start. @@ -136,22 +148,22 @@ class Consumer (object): """ Return True if checking is finished. """ - self.lock.acquire() + self.acquire() try: return self.threader.finished() and \ self.cache.incoming_len() <= 0 finally: - self.lock.release() + self.release() def no_more_threads (self): """ Return True if no more active threads are running. """ - self.lock.acquire() + self.acquire() try: return self.threader.finished() finally: - self.lock.release() + self.release() def abort (self): """ @@ -164,11 +176,11 @@ class Consumer (object): "keyboard interrupt; waiting for %d active threads to finish", num) linkcheck.log.warn(linkcheck.LOG_CHECK, msg, num) - self.lock.acquire() + self.acquire() try: self.threader.finish() finally: - self.lock.release() + self.release() time.sleep(2) self.logger_end_output() @@ -176,7 +188,7 @@ class Consumer (object): """ Print check status looking at url queues. """ - self.lock.acquire() + self.acquire() try: print >> stderr, _("Status:"), active = self.threader.active_threads() @@ -186,25 +198,25 @@ class Consumer (object): print_duration(curtime - start_time) print >> stderr finally: - self.lock.release() + self.release() def logger_start_output (self): """ Start output of all configured loggers. """ - self.lock.acquire() + self.acquire() try: self.logger.start_output() for logger in self.fileoutput: logger.start_output() finally: - self.lock.release() + self.release() def logger_new_url (self, url_data): """ Send new url to all configured loggers. """ - self.lock.acquire() + self.acquire() try: self.linknumber += 1 do_filter = (self.linknumber % 1000) == 0 @@ -218,7 +230,7 @@ class Consumer (object): for log in self.fileoutput: log.new_url(url_data) finally: - self.lock.release() + self.release() # XXX deadlock! #if do_filter: # self.filter_queue(self) @@ -227,33 +239,33 @@ class Consumer (object): """ End output of all configured loggers. """ - self.lock.acquire() + self.acquire() try: self.logger.end_output(linknumber=self.linknumber) for logger in self.fileoutput: logger.end_output(linknumber=self.linknumber) finally: - self.lock.release() + self.release() def active_threads (self): """ Return number of active threads. """ - self.lock.acquire() + self.acquire() try: return self.threader.active_threads() finally: - self.lock.release() + self.release() def get_country_name (self, host): """ Return country code for host if found, else None. """ - self.lock.acquire() + self.acquire() try: gi = self.config["geoip"] if gi: return linkcheck.checker.geoip.get_country(gi, host) return None finally: - self.lock.release() + self.release()