Add maximum number of allowed puts on URL queue.

This commit is contained in:
Bastian Kleineidam 2012-09-02 22:44:29 +02:00
parent d8fce1ceeb
commit 3baaca47a0

View file

@ -37,7 +37,7 @@ class UrlQueue (object):
"""A queue supporting several consumer tasks. The task_done() idea is
from the Python 2.5 implementation of Queue.Queue()."""
def __init__ (self):
def __init__ (self, allowed_puts=None):
"""Initialize the queue state and task counters."""
# Note: don't put a maximum size on the queue since it would
# lead to deadlocks when all worker threads called put().
@ -56,6 +56,11 @@ class UrlQueue (object):
self.in_progress = {}
self.checked = LFUCache(size=100000)
self.shutdown = False
# Each put() decreases the number of allowed puts.
# This way we can restrict the number of URLs that are checked.
if allowed_puts is not None and allowed_puts <= 0:
raise ValueError("Non-positive number of allowed puts: %d" % allowed_puts)
self.allowed_puts = allowed_puts
def qsize (self):
"""Return the approximate size of the queue (not reliable!)."""
@ -114,16 +119,9 @@ class UrlQueue (object):
self.in_progress[key] = url_data
return url_data
def put (self, item, block=True, timeout=None):
def put (self, item):
"""Put an item into the queue.
If optional args 'block' is true and 'timeout' is None (the default),
block if necessary until a free slot is available. If 'timeout' is
a positive number, it blocks at most 'timeout' seconds and raises
the Full exception if no free slot was available within that time.
Otherwise ('block' is false), put an item on the queue if a free slot
is immediately available, else raise the Full exception ('timeout'
is ignored in that case).
Block if necessary until a free slot is available.
"""
with self.mutex:
self._put(item)
@ -134,6 +132,11 @@ class UrlQueue (object):
if self.shutdown:
# don't accept more URLs
return
if self.allowed_puts is not None:
if self.allowed_puts == 0:
# no more puts allowed
return
self.allowed_puts -= 1
log.debug(LOG_CACHE, "queueing %s", url_data)
key = url_data.cache_url_key
if key in self.checked: