git-svn-id: https://linkchecker.svn.sourceforge.net/svnroot/linkchecker/trunk/linkchecker@3149 e7d03fd6-7b0d-0410-9947-9c21f3af8025
This commit is contained in:
calvin 2006-05-13 18:08:35 +00:00
parent 98a6d1eb00
commit 7afe60c841
3 changed files with 0 additions and 623 deletions

View file

@ -1,229 +0,0 @@
# -*- coding: iso-8859-1 -*-
# Copyright (C) 2000-2006 Bastian Kleineidam
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
"""
Store cached data during checking.
"""
import collections
import linkcheck
import linkcheck.log
import linkcheck.lock
import linkcheck.containers
import linkcheck.configuration
import linkcheck.cookies
import linkcheck.threader
import linkcheck.checker.pool
class Cache (object):
"""
Store and provide routines for cached data. Currently there are
caches for cookies, checked URLs, FTP connections and robots.txt
contents.
This class is not thread-safe and needs extern locking if multiple
threads access it.
"""
def __init__ (self):
"""
Initialize the default options.
"""
super(Cache, self).__init__()
# already checked URLs
# format: {cache key (string) -> cache data (dict)}
self.checked = {}
# URLs that are being checked
# format: {cache key (string) -> urldata (UrlData)}
self.in_progress = {}
# to-be-checked URLs
# format: [urldata (UrlData)]
self.incoming = collections.deque()
# downloaded robots.txt files
# format: {cache key (string) -> robots.txt content (RobotFileParser)}
self.robots_txt = {}
# stored cookies
# format: {cache key (string) -> cookie jar (linkcheck.cookielib.CookieJar)}
self.cookies = {}
# pooled connections
self.pool = linkcheck.checker.pool.ConnectionPool()
def incoming_is_empty (self):
"""
Check if incoming queue is empty.
"""
return len(self.incoming) <= 0
def incoming_get_url (self):
"""
Get first not-in-progress url from the incoming queue and
return it. If no such url is available return None. The
url might be already cached.
"""
res = None
to_delete = None
for i, url_data in enumerate(self.incoming):
key = url_data.cache_url_key
if key in self.checked:
to_delete = i
# url is cached and can be logged
url_data.copy_from_cache(self.checked[key])
res = url_data
break
elif key not in self.in_progress:
to_delete = i
self.in_progress[key] = url_data
res = url_data
break
if to_delete is not None:
del self.incoming[i]
return res
def incoming_len (self):
"""
Return number of entries in incoming queue.
"""
return len(self.incoming)
def incoming_add (self, url_data):
"""
Add a new URL to list of URLs to check.
"""
assert linkcheck.log.debug(linkcheck.LOG_CACHE,
"Add url %r ...", url_data)
if url_data.has_result:
# do not check any further
assert linkcheck.log.debug(linkcheck.LOG_CACHE,
"... no, has result")
return False
# check the cache
key = url_data.cache_url_key
if key in self.checked:
# url is cached and can be logged
url_data.copy_from_cache(self.checked[key])
assert linkcheck.log.debug(linkcheck.LOG_CACHE, "... no, cached")
return False
# url is not cached, so add to incoming queue
self.incoming.append(url_data)
assert linkcheck.log.debug(linkcheck.LOG_CACHE, "... yes, added.")
return True
def has_in_progress (self, key):
"""
Check if in-progress queue has an entry with the given key.
@param key: Usually obtained from url_data.cache_url_key
@type key: String
"""
return key in self.in_progress
def in_progress_remove (self, url_data, ignore_missing=False):
"""
Remove url from in-progress cache. If url is not cached and
ignore_missing evaluates True, raise AssertionError.
"""
key = url_data.cache_url_key
if key in self.in_progress:
del self.in_progress[key]
else:
assert ignore_missing, repr(key)
def checked_add (self, url_data):
"""
Cache checked url data.
"""
data = url_data.get_cache_data()
key = url_data.cache_url_key
assert linkcheck.log.debug(linkcheck.LOG_CACHE, "Caching %r", key)
assert key not in self.checked, \
key + u", " + unicode(self.checked[key])
assert key in self.in_progress, key
# move entry from self.in_progress to self.checked
del self.in_progress[key]
self.checked[key] = data
# check for aliases (eg. through HTTP redirections)
if hasattr(url_data, "aliases"):
data = url_data.get_alias_cache_data()
for key in url_data.aliases:
if key not in self.checked and key not in self.in_progress:
assert linkcheck.log.debug(linkcheck.LOG_CACHE,
"Caching alias %r", key)
self.checked[key] = data
def checked_redirect (self, redirect, url_data):
"""
Check if redirect is already in cache. Used for URL redirections
to avoid double checking of already cached URLs.
If the redirect URL is found in the cache, the result data is
already copied.
"""
if redirect in self.checked:
url_data.copy_from_cache(self.checked[redirect])
return True
return False
def robots_txt_allows_url (self, roboturl, url, user, password):
"""
Ask robots.txt allowance.
"""
if roboturl not in self.robots_txt:
rp = linkcheck.robotparser2.RobotFileParser(
user=user, password=password)
rp.set_url(roboturl)
rp.read()
self.robots_txt[roboturl] = rp
else:
rp = self.robots_txt[roboturl]
return rp.can_fetch(linkcheck.configuration.UserAgent, url)
def get_connection (self, key):
"""
Get open connection to given host. Return None if no such
connection is available (or the old one timed out).
"""
return self.pool.get_connection(key)
def add_connection (self, key, connection, timeout):
"""
Store open connection into pool for reuse.
"""
self.pool.add_connection(key, connection, timeout)
def release_connection (self, key):
"""
Remove connection from pool.
"""
self.pool.release_connection(key)
def store_cookies (self, headers, scheme, host, path):
"""
Cookie cache setter function. Can raise the
exception Cookie.CookieError.
"""
jar = self.cookies.setdefault(host, linkcheck.cookies.CookieJar())
return jar.add_cookies(headers, scheme, host, path)
def get_cookies (self, scheme, host, port, path):
"""
Cookie cache getter function.
"""
assert linkcheck.log.debug(linkcheck.LOG_CACHE,
"Get cookies for host %r path %r", host, path)
jar = self.cookies.setdefault(host, linkcheck.cookies.CookieJar())
jar.remove_expired()
return [x for x in jar if x.is_valid_for(scheme, host, port, path)]

View file

@ -1,287 +0,0 @@
# -*- coding: iso-8859-1 -*-
# Copyright (C) 2000-2006 Bastian Kleineidam
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
"""
Url consumer class.
"""
import time
try:
import thread
except ImportError:
import dummy_thread as thread
import linkcheck.threader
import linkcheck.log
import linkcheck.lock
import linkcheck.strformat
import linkcheck.checker.geoip
import linkcheck.decorators
from linkcheck.checker import stderr
# global lock for synchronizing all the checker threads
_lock = thread.allocate_lock()
def print_tocheck (tocheck):
"""
Print the number of queued URLs.
"""
msg = _n("%5d URL queued,", "%5d URLs queued,", tocheck) % tocheck
print >> stderr, msg,
def print_links (links):
"""
Print the number of checked URLs.
"""
msg = _n("%4d URL checked,", "%4d URLs checked,", links) % links
print >> stderr, msg,
def print_active (active):
"""
Print the number of active threads.
"""
msg = _n("%2d active thread,", "%2d active threads,", active) % active
print >> stderr, msg,
def print_duration (duration):
"""
Print the run time.
"""
msg = _("runtime %s") % linkcheck.strformat.strduration(duration)
print >> stderr, msg,
class Consumer (object):
"""
Consume URLs from the URL queue in a thread-safe manner.
All public methods are synchronized, with the exception of
abort() which calls locking methods itself, and check_url()
which just spawns another checker thread.
Additionally all public methods of the Cache() object
are included as synchronized proxy functions via __getattr__().
The synchronization uses a global variable _lock defined above.
"""
def __init__ (self, config, cache):
"""
Initialize consumer data and threads.
"""
super(Consumer, self).__init__()
# number of consumed URLs
self._number = 0
self._config = config
self._cache = cache
self._threader = linkcheck.threader.Threader(num=config['threads'])
self.start_log_output()
@linkcheck.decorators.synchronized(_lock)
def config (self, key):
"""
Get config value.
"""
return self._config[key]
@linkcheck.decorators.synchronized(_lock)
def config_append (self, key, val):
"""
Append config value.
"""
self._config[key].append(val)
def __getattr__ (self, name):
"""
Delegate access to the internal cache if possible.
"""
if hasattr(self._cache, name):
func = getattr(self._cache, name)
return linkcheck.decorators.synchronize(_lock, func)
raise AttributeError(name)
@linkcheck.decorators.synchronized(_lock)
def append_url (self, url_data):
"""
Append url to incoming check list.
"""
if not self._cache.incoming_add(url_data):
# can be logged
self._log_url(url_data)
def check_url (self, url_data, name):
"""
Check given URL data, spawning a new thread with given name.
This eventually calls either Consumer.checked() or
Consumer.interrupted().
This method is not thread safe (hence it should only be called
from a single thread).
"""
name = linkcheck.strformat.ascii_safe(name)
self._threader.start_thread(url_data.check, (), name=name)
@linkcheck.decorators.synchronized(_lock)
def checked (self, url_data):
"""
Put checked url in cache and log it.
"""
# log before putting it in the cache (otherwise we would see
# a "(cached)" after every url
self._log_url(url_data)
if url_data.caching and not url_data.cached:
self._cache.checked_add(url_data)
else:
self._cache.in_progress_remove(url_data)
@linkcheck.decorators.synchronized(_lock)
def interrupted (self, url_data):
"""
Remove url from active list.
"""
self._cache.in_progress_remove(url_data, ignore_missing=True)
@linkcheck.decorators.synchronized(_lock)
def finished (self):
"""
Return True if checking is finished.
"""
# avoid deadlock by requesting cache data before locking
return self._threader.finished() and \
self._cache.incoming_len() == 0
@linkcheck.decorators.synchronized(_lock)
def finish (self):
"""
Finish consuming URLs.
"""
self._threader.finish()
@linkcheck.decorators.synchronized(_lock)
def no_more_threads (self):
"""
Return True if no more active threads are running.
"""
return self._threader.finished()
def abort (self):
"""
Abort checking and send end-of-output message to logger.
"""
# While loop to disable keyboard interrupts and system exits
# (triggered from internal errors while finishing up) during abort.
self.num_waited = 0
while True:
try:
self._abort()
break
except (KeyboardInterrupt, SystemExit):
pass
def _abort (self):
"""
Abort checking and send end-of-output message to logger.
Private method not disabling exceptions.
"""
# wait for threads to finish
while not self.no_more_threads():
if self.num_waited > 30:
linkcheck.log.error(linkcheck.LOG_CHECK,
"Thread wait timeout")
self.end_log_output()
return
num = self.active_threads()
msg = \
_n("keyboard interrupt; waiting for %d active thread to finish",
"keyboard interrupt; waiting for %d active threads to finish",
num)
linkcheck.log.warn(linkcheck.LOG_CHECK, msg, num)
self.finish()
self.num_waited += 1
time.sleep(2)
self.end_log_output()
@linkcheck.decorators.synchronized(_lock)
def print_status (self, curtime, start_time):
"""
Print check status looking at url queues.
"""
# avoid deadlock by requesting cache data before locking
print >> stderr, _("Status:"),
print_active(self._threader.active_threads())
print_links(self._number)
print_tocheck(self._cache.incoming_len())
print_duration(curtime - start_time)
print >> stderr
@linkcheck.decorators.synchronized(_lock)
def start_log_output (self):
"""
Start output of all configured loggers.
"""
self._config['logger'].start_output()
for logger in self._config['fileoutput']:
logger.start_output()
@linkcheck.decorators.synchronized(_lock)
def log_url (self, url_data):
"""
Log given URL.
"""
self._log_url(url_data)
def _log_url (self, url_data):
"""
Send new url to all configured loggers.
"""
self._number += 1
has_warnings = False
for tag, content in url_data.warnings:
if tag not in self._config["ignorewarnings"]:
has_warnings = True
break
do_print = self._config["verbose"] or not url_data.valid or \
(has_warnings and self._config["warnings"])
self._config['logger'].log_filter_url(url_data, do_print)
for log in self._config['fileoutput']:
log.log_filter_url(url_data, do_print)
@linkcheck.decorators.synchronized(_lock)
def end_log_output (self):
"""
End output of all configured loggers.
"""
self._config['logger'].end_output()
for logger in self._config['fileoutput']:
logger.end_output()
@linkcheck.decorators.synchronized(_lock)
def active_threads (self):
"""
Return number of active threads.
"""
return self._threader.active_threads()
@linkcheck.decorators.synchronized(_lock)
def get_country_name (self, host):
"""
Return country code for host if found, else None.
"""
gi = self._config["geoip"]
if gi:
return linkcheck.checker.geoip.get_country(gi, host)
return None

View file

@ -1,107 +0,0 @@
# -*- coding: iso-8859-1 -*-
# Copyright (C) 2005-2006 Bastian Kleineidam
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
"""
Store and retrieve open connections.
"""
import time
class ConnectionPool (object):
"""
Connection pool class, storing a set of connections for URL retrieval.
"""
def __init__ (self):
"""
Initialize an empty connection dictionary which will have entries
of the form::
key -> [connection, status, expiration time]
Connection can be any open connection object (HTTP, FTP, ...).
Status is either 'available' or 'busy'.
Expiration time is the point of time in seconds when this
connection will be timed out.
The identifier key is usually a tuple (type, host, user, pass),
but it can be any immutable Python object.
"""
# open connections
# {(type, host, user, pass) -> [connection, status, expiration time]}
self.connections = {}
self.expiration_counter = 1
def add_connection (self, key, conn, timeout):
"""
Add connection to the pool with given identifier key and timeout
in seconds.
"""
self.connections[key] = [conn, 'available', time.time() + timeout]
def get_connection (self, key):
"""
Get open connection if available, for at most 30 seconds.
@return: Open connection object or None if no connection is available.
@rtype None or FTPConnection or HTTP(S)Connection
"""
if (self.expiration_counter % 100) == 0:
self.expiration_counter = 1
self.expire_connections()
else:
self.expiration_counter += 1
if key not in self.connections:
# not found
return None
conn_data = self.connections[key]
t = time.time()
if t > conn_data[2]:
# timed out
try:
conn_data[1].close()
except:
# ignore close errors
pass
del self.connections[key]
return None
# wait at most 300*0.1=30 seconds for connection to become available
for dummy in xrange(300):
if conn_data[1] != 'busy':
conn_data[1] = 'busy'
conn_data[2] = t
return conn_data[0]
time.sleep(0.1)
# connection is in use
return None
def release_connection (self, key):
"""
Mark an open and reusable connection as available.
"""
if key in self.connections:
self.connections[key][1] = 'available'
def expire_connections (self):
"""
Remove expired connections from this pool.
"""
t = time.time()
to_delete = []
for key, conn_data in self.connections.iteritems():
if conn_data[1] == 'available' and t > conn_data[2]:
to_delete.append(key)
for key in to_delete:
del self.connections[key]