Move cached queue items to top once in a while.

This commit is contained in:
Bastian Kleineidam 2014-03-14 22:08:51 +01:00
parent 6437f08277
commit 19b8baf08c

View file

@ -32,6 +32,8 @@ class Empty (StandardError):
pass
NUM_PUTS_CLEANUP = 10000
class UrlQueue (object):
"""A queue supporting several consumer tasks. The task_done() idea is
from the Python 2.5 implementation of Queue.Queue()."""
@ -59,6 +61,7 @@ class UrlQueue (object):
if max_allowed_urls is not None and max_allowed_urls <= 0:
raise ValueError("Non-positive number of allowed URLs: %d" % max_allowed_urls)
self.max_allowed_urls = max_allowed_urls
self.num_puts = 0
def qsize (self):
"""Return the approximate size of the queue (not reliable!)."""
@ -129,9 +132,32 @@ class UrlQueue (object):
assert key is not None, "no result for None key: %s" % url_data
if self.max_allowed_urls is not None:
self.max_allowed_urls -= 1
self.num_puts += 1
if self.num_puts >= NUM_PUTS_CLEANUP:
self.cleanup()
self.queue.append(url_data)
self.unfinished_tasks += 1
def cleanup(self):
"""Move cached elements to top."""
self.num_puts = 0
cached = []
for i, url_data in enumerate(self.queue):
key = url_data.cache_url
cache = url_data.aggregate.result_cache
if cache.has_result(key):
cached.append(i)
for pos in cached:
self._move_to_top(pos)
def _move_to_top(self, pos):
"""Move element at given position to top of queue."""
if pos > 0:
self.queue.rotate(-pos)
item = self.queue.popleft()
self.queue.rotate(pos)
self.queue.appendleft(item)
def task_done (self, url_data):
"""
Indicate that a formerly enqueued task is complete.