updated threading

git-svn-id: https://linkchecker.svn.sourceforge.net/svnroot/linkchecker/trunk/linkchecker@1131 e7d03fd6-7b0d-0410-9947-9c21f3af8025
This commit is contained in:
calvin 2004-01-02 23:30:11 +00:00
parent f8eff3d02b
commit c0c91b17d5
6 changed files with 275 additions and 204 deletions

11
README
View file

@ -90,14 +90,11 @@ Only if you want to hack on the code.
commandline options and stores them in a Config object.
(2) Which leads us directly to the Config class. This class stores all
options and works a little magic: it tries to find out if your platform
supports threads. If so, threading is enabled. If not, it is disabled.
Several functions are replaced with their threaded equivalents if
threading is enabled.
Another thing are config files. A Config object reads config file options
on initialization so they get handled before any commandline options.
options and supports threading and reading config files.
A Config object reads config file options on initialization so they get
handled before any commandline options.
(3) The linkchecker script finally calls linkcheck.checkUrls(), which
(3) The linkchecker script calls linkcheck.checkUrls(), which
calls linkcheck.Config.checkUrl(), which calls linkcheck.UrlData.check().
An UrlData object represents a single URL with all attached data like
validity, check time and so on. These values are filled by the

View file

@ -16,12 +16,18 @@
# along with this program; if not, write to the Free Software
# Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
import ConfigParser, sys, os, re, time, Cookie
import ConfigParser, sys, os, re, Cookie
import _linkchecker_configdata, i18n
from linkcheck import getLinkPat
from linkcheck.LRU import LRU
from os.path import expanduser, normpath, normcase, join
from urllib import getproxies
from debug import *
try:
import threading as _threading
except ImportError:
import dummy_threading as _threading
import Queue, Threader
Version = _linkchecker_configdata.version
AppName = "LinkChecker"
@ -40,6 +46,11 @@ This is free software, and you are welcome to redistribute it
under certain conditions. Look at the file `LICENSE' within this
distribution."""
MAX_URL_CACHE = 10000
MAX_ROBOTS_TXT_CACHE = 1000
MAX_COOKIES_CACHE = 200
# path util function
def norm (path):
return normcase(normpath(expanduser(path)))
@ -73,6 +84,7 @@ class Configuration (dict):
# reduceThreads(). Ok, this is a hack but ItWorksForMe(tm).
self.reduceCount = 0
def reset (self):
"""Reset to default values"""
self['linknumber'] = 0
@ -85,10 +97,11 @@ class Configuration (dict):
self["denyallow"] = False
self["interactive"] = False
# on ftp, password is set by Pythons ftplib
self["authentication"] = [{'pattern': re.compile(r'^.+'),
'user': 'anonymous',
'password': '',
}]
self["authentication"] = [
{'pattern': re.compile(r'^.+'),
'user': 'anonymous',
'password': '',
}]
self["proxy"] = getproxies()
self["recursionlevel"] = 1
self["wait"] = 0
@ -147,134 +160,25 @@ class Configuration (dict):
self["warningregex"] = None
self["warnsizebytes"] = None
self["nntpserver"] = os.environ.get("NNTP_SERVER",None)
self.urlCache = {}
self.robotsTxtCache = {}
try:
import threading
self.enableThreading(10)
except ImportError:
type, value = sys.exc_info()[:2]
debug(HURT_ME_PLENTY, "no threading available")
self.disableThreading()
self.cookies = {}
def disableThreading (self):
"""Disable threading by replacing functions with their
non-threading equivalents
"""
debug(HURT_ME_PLENTY, "disable threading")
self["threads"] = False
self.hasMoreUrls = self.hasMoreUrls_NoThreads
self.finished = self.finished_NoThreads
self.finish = self.finish_NoThreads
self.appendUrl = self.appendUrl_NoThreads
self.getUrl = self.getUrl_NoThreads
self.checkUrl = self.checkUrl_NoThreads
self.urlCache_has_key = self.urlCache_has_key_NoThreads
self.urlCache_get = self.urlCache_get_NoThreads
self.urlCache_set = self.urlCache_set_NoThreads
self.urlCacheLock = None
self.robotsTxtCache_has_key = self.robotsTxtCache_has_key_NoThreads
self.robotsTxtCache_get = self.robotsTxtCache_get_NoThreads
self.robotsTxtCache_set = self.robotsTxtCache_set_NoThreads
self.robotsTxtCacheLock = None
self.incrementLinknumber = self.incrementLinknumber_NoThreads
self.getCookies = self.getCookies_NoThreads
self.storeCookies = self.storeCookies_NoThreads
self.log_newUrl = self.log_newUrl_NoThreads
self.logLock = None
self.urls = []
self.threader = None
self.dataLock = None
sys.setcheckinterval(10)
def enableThreading (self, num):
"""Enable threading by replacing functions with their
threading equivalents
"""
debug(HURT_ME_PLENTY, "enable threading with %d threads" % num)
import Queue,Threader
from threading import Lock
self.threader = Threader.Threader()
self.setThreads(10)
self.urlSeen = {}
self.urlCache = LRU(MAX_URL_CACHE)
self.robotsTxtCache = LRU(MAX_ROBOTS_TXT_CACHE)
self["threads"] = True
self.hasMoreUrls = self.hasMoreUrls_Threads
self.finished = self.finished_Threads
self.finish = self.finish_Threads
self.appendUrl = self.appendUrl_Threads
self.getUrl = self.getUrl_Threads
self.checkUrl = self.checkUrl_Threads
self.urlCache_has_key = self.urlCache_has_key_Threads
self.urlCache_get = self.urlCache_get_Threads
self.urlCache_set = self.urlCache_set_Threads
self.urlCacheLock = Lock()
self.robotsTxtCache_has_key = self.robotsTxtCache_has_key_Threads
self.robotsTxtCache_get = self.robotsTxtCache_get_Threads
self.robotsTxtCache_set = self.robotsTxtCache_set_Threads
self.robotsTxtCacheLock = Lock()
self.incrementLinknumber = self.incrementLinknumber_Threads
self.getCookies = self.getCookies_Threads
self.storeCookies = self.storeCookies_Threads
self.log_newUrl = self.log_newUrl_Threads
self.logLock = Lock()
self.urlsLock = _threading.Lock()
self.urlCacheLock = _threading.Lock()
self.robotsTxtCacheLock = _threading.Lock()
self.logLock = _threading.Lock()
self.urls = Queue.Queue(0)
self.threader = Threader.Threader(num)
self.dataLock = Lock()
sys.setcheckinterval(20)
self.dataLock = _threading.Lock()
self.cookies = LRU(MAX_COOKIES_CACHE)
def hasMoreUrls_NoThreads (self):
return len(self.urls)
def finished_NoThreads (self):
return not self.hasMoreUrls_NoThreads()
def setThreads (self, num):
debug(HURT_ME_PLENTY, "set threading with %d threads", num)
self.threader.threads_max = num
def finish_NoThreads (self):
pass
def appendUrl_NoThreads (self, url):
self.urls.append(url)
def getUrl_NoThreads (self):
return self.urls.pop(0)
def checkUrl_NoThreads (self, url):
url.check()
def urlCache_has_key_NoThreads (self, key):
return self.urlCache.has_key(key)
def urlCache_get_NoThreads (self, key):
return self.urlCache[key]
def urlCache_set_NoThreads (self, key, val):
self.urlCache[key] = val
def robotsTxtCache_has_key_NoThreads (self, key):
return self.robotsTxtCache.has_key(key)
def robotsTxtCache_get_NoThreads (self, key):
return self.robotsTxtCache[key]
def robotsTxtCache_set_NoThreads (self, key, val):
self.robotsTxtCache[key] = val
def storeCookies_NoThreads (self, headers, host):
output = []
for h in headers.getallmatchingheaders("Set-Cookie"):
output.append(h)
debug(BRING_IT_ON, "Store Cookie", h)
c = self.cookies.setdefault(host, Cookie.SimpleCookie())
c.load(h)
return output
def getCookies_NoThreads (self, host, path):
debug(BRING_IT_ON, "Get Cookie", host, path)
if not self.cookies.has_key(host):
return []
cookievals = []
for m in self.cookies[host].values():
val = _check_morsel(m, host, path)
if val:
cookievals.append(val)
return cookievals
def newLogger (self, logtype, dict={}):
args = {}
@ -283,63 +187,78 @@ class Configuration (dict):
from linkcheck.log import Loggers
return Loggers[logtype](**args)
def addLogger(self, logtype, loggerClass, logargs={}):
"add a new logger type"
from linkcheck.log import Loggers
Loggers[logtype] = loggerClass
self[logtype] = logargs
def incrementLinknumber_NoThreads (self):
self['linknumber'] += 1
def log_newUrl_NoThreads (self, url):
if not self["quiet"]: self["log"].newUrl(url)
for log in self["fileoutput"]:
log.newUrl(url)
def log_init (self):
if not self["quiet"]: self["log"].init()
for log in self["fileoutput"]:
log.init()
def log_endOfOutput (self):
if not self["quiet"]:
self["log"].endOfOutput(linknumber=self['linknumber'])
for log in self["fileoutput"]:
log.endOfOutput(linknumber=self['linknumber'])
def incrementLinknumber_Threads (self):
def incrementLinknumber (self):
try:
self.dataLock.acquire()
self['linknumber'] += 1
finally:
self.dataLock.release()
def hasMoreUrls_Threads (self):
def hasMoreUrls (self):
return not self.urls.empty()
def finished_Threads (self):
time.sleep(0.1)
if self.reduceCount==5:
self.reduceCount = 0
self.threader.reduceThreads()
else:
self.reduceCount += 1
def finished (self):
return self.threader.finished() and self.urls.empty()
def finish_Threads (self):
def finish (self):
self.threader.finish()
def appendUrl_Threads (self, url):
def appendUrl (self, url):
self.urls.put(url)
def getUrl_Threads (self):
def getUrl (self):
return self.urls.get()
def checkUrl_Threads (self, url):
self.threader.startThread(url.check, ())
def urlCache_has_key_Threads (self, key):
def checkUrl (self, url):
self.threader.start_thread(url.check, ())
def urlSeen_has_key (self, key):
ret = None
try:
self.urlsLock.acquire()
ret = self.urlSeen.has_key(key)
finally:
self.urlsLock.release()
return ret
def urlSeen_set (self, key):
try:
self.urlsLock.acquire()
self.urlSeen[key] = 1
finally:
self.urlsLock.release()
def urlCache_has_key (self, key):
ret = None
try:
self.urlCacheLock.acquire()
@ -348,7 +267,8 @@ class Configuration (dict):
self.urlCacheLock.release()
return ret
def urlCache_get_Threads (self, key):
def urlCache_get (self, key):
ret = None
try:
self.urlCacheLock.acquire()
@ -357,14 +277,16 @@ class Configuration (dict):
self.urlCacheLock.release()
return ret
def urlCache_set_Threads (self, key, val):
def urlCache_set (self, key, val):
try:
self.urlCacheLock.acquire()
self.urlCache[key] = val
finally:
self.urlCacheLock.release()
def robotsTxtCache_has_key_Threads (self, key):
def robotsTxtCache_has_key (self, key):
ret = None
try:
self.robotsTxtCacheLock.acquire()
@ -373,7 +295,8 @@ class Configuration (dict):
self.robotsTxtCacheLock.release()
return ret
def robotsTxtCache_get_Threads (self, key):
def robotsTxtCache_get (self, key):
ret = None
try:
self.robotsTxtCacheLock.acquire()
@ -382,14 +305,16 @@ class Configuration (dict):
self.robotsTxtCacheLock.release()
return ret
def robotsTxtCache_set_Threads (self, key, val):
def robotsTxtCache_set (self, key, val):
try:
self.robotsTxtCacheLock.acquire()
self.robotsTxtCache[key] = val
finally:
self.robotsTxtCacheLock.release()
def log_newUrl_Threads (self, url):
def log_newUrl (self, url):
try:
self.logLock.acquire()
if not self["quiet"]: self["log"].newUrl(url)
@ -398,20 +323,37 @@ class Configuration (dict):
finally:
self.logLock.release()
def storeCookies_Threads (self, headers, host):
def storeCookies (self, headers, host):
try:
self.dataLock.acquire()
return self.storeCookies_NoThreads(headers, host)
output = []
for h in headers.getallmatchingheaders("Set-Cookie"):
output.append(h)
debug(BRING_IT_ON, "Store Cookie", h)
c = self.cookies.setdefault(host, Cookie.SimpleCookie())
c.load(h)
return output
finally:
self.dataLock.release()
def getCookies_Threads (self, host, path):
def getCookies (self, host, path):
try:
self.dataLock.acquire()
return self.getCookies_NoThreads(host, path)
debug(BRING_IT_ON, "Get Cookie", host, path)
if not self.cookies.has_key(host):
return []
cookievals = []
for m in self.cookies[host].values():
val = _check_morsel(m, host, path)
if val:
cookievals.append(val)
return cookievals
finally:
self.dataLock.release()
def read (self, files = []):
cfiles = files[:]
if not cfiles:
@ -422,6 +364,7 @@ class Configuration (dict):
cfiles.append(norm("~/.linkcheckerrc"))
self.readConfig(cfiles)
def readConfig (self, files):
"""this big function reads all the configuration parameters
used in the linkchecker module."""
@ -473,11 +416,7 @@ class Configuration (dict):
section="checking"
try:
num = cfgparser.getint(section, "threads")
if num > 0:
debug(HURT_ME_PLENTY, "set threading with %d threads", num)
self.enableThreading(num)
else:
self.disableThreading()
self.setThreads(num)
except ConfigParser.Error: debug(NIGHTMARE, msg)
try: self["anchors"] = cfgparser.getboolean(section, "anchors")
except ConfigParser.Error, msg: debug(NIGHTMARE, msg)

124
linkcheck/LRU.py Normal file
View file

@ -0,0 +1,124 @@
class LRU (object):
"""
Implementation of a length-limited O(1) LRU queue.
Built for and used by PyPE:
http://pype.sourceforge.net
Copyright 2003 Josiah Carlson. (Licensed under the GPL)
"""
class Node (object):
def __init__ (self, prev, me):
self.prev = prev
self.me = me
self.next = None
def __init__ (self, count, pairs=[]):
self.count = max(count, 1)
self.d = {}
self.first = None
self.last = None
for key, value in pairs:
self[key] = value
def __contains__ (self, obj):
return obj in self.d
def has_key (self, obj):
return self.d.has_key(obj)
def __getitem__ (self, obj):
a = self.d[obj].me
self[a[0]] = a[1]
return a[1]
def __setitem__ (self, obj, val):
if obj in self.d:
del self[obj]
nobj = self.Node(self.last, (obj, val))
if self.first is None:
self.first = nobj
if self.last:
self.last.next = nobj
self.last = nobj
self.d[obj] = nobj
if len(self.d) > self.count:
if self.first == self.last:
self.first = None
self.last = None
return
a = self.first
a.next.prev = None
self.first = a.next
a.next = None
del self.d[a.me[0]]
del a
def __delitem__ (self, obj):
nobj = self.d[obj]
if nobj.prev:
nobj.prev.next = nobj.next
else:
self.first = nobj.next
if nobj.next:
nobj.next.prev = nobj.prev
else:
self.last = nobj.prev
del self.d[obj]
def __iter__ (self):
cur = self.first
while cur != None:
cur2 = cur.next
yield cur.me[1]
cur = cur2
def iteritems (self):
cur = self.first
while cur != None:
cur2 = cur.next
yield cur.me
cur = cur2
def iterkeys (self):
return iter(self.d)
def itervalues (self):
for i,j in self.iteritems():
yield j
def keys (self):
return self.d.keys()
def _main ():
a = LRU(4)
a['1'] = '1'
a['2'] = '2'
a['3'] = '3'
a['4'] = '4'
a['5'] = '5'
for i in a.iteritems():
print i,
print
b = a['2']
a['6'] = '6'
for i in a.iteritems():
print i,
print
print a.has_key('1')
print a.has_key('2')
if __name__=='__main__':
_main()

View file

@ -15,43 +15,57 @@
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
from threading import *
try:
import threading as _threading
except ImportError:
import dummy_threading as _threading
class Threader (object):
"A thread generating class"
def __init__ (self, num=5):
self.maxThreads = num
# this allows negative numbers
self.threads_max = max(num, 1)
# list of active threads to watch
self.threads = []
def acquire (self):
def _acquire (self):
"Wait until we are allowed to start a new thread"
while True:
self.reduceThreads()
if len(self.threads) < self.maxThreads:
self._reduce_threads()
if len(self.threads) < self.threads_max:
break
def reduceThreads (self):
def _reduce_threads (self):
for t in self.threads:
if not t.isAlive():
self.threads.remove(t)
def finished (self):
return not len(self.threads)
if self.threads_max > 0:
self._reduce_threads()
return len(self.threads) == 0
def finish (self):
self.reduceThreads()
# dont know how to stop a thread
self._reduce_threads()
# XXX don't know how to stop a thread
def startThread (self, callable, args):
def start_thread (self, func, args):
"Generate a new thread"
self.acquire()
t = Thread(None, callable, None, args)
t.start()
self.threads.append(t)
if self.threads_max < 1:
func(*args)
else:
self._acquire()
t = _threading.Thread(None, func, None, args)
t.start()
self.threads.append(t)
def __str__ (self):
return "Threader with %d threads (max %d)" % \
(len(self.threads), self.threads_max)

View file

@ -19,7 +19,7 @@
class LinkCheckerError (Exception):
pass
import re, i18n
import time, re, i18n
def getLinkPat (arg, strict=False):
"""get a link pattern matcher for intern/extern links"""
debug(BRING_IT_ON, "Link pattern %r", arg)
@ -61,6 +61,10 @@ def checkUrls (config):
while not config.finished():
if config.hasMoreUrls():
config.checkUrl(config.getUrl())
else:
# active connections are downloading/parsing, so
# wait a little
time.sleep(0.1)
config.log_endOfOutput()
except KeyboardInterrupt:
config.finish()

View file

@ -267,9 +267,6 @@ for opt,arg in options:
if opt=="-f" or opt=="--config":
configfiles.append(arg)
config.read(configfiles)
# disable threading for debugging
if get_debuglevel() > 0:
config.disableThreading()
# apply commandline options and arguments
_user = "anonymous"
_password = "guest@"
@ -345,13 +342,13 @@ for opt,arg in options:
elif opt=="-r" or opt=="--recursion-level":
try:
depth = int(arg)
if depth >= 0:
config["recursionlevel"] = depth
else:
config["recursionlevel"] = -1
except ValueError:
printUsage(i18n._("Illegal argument %r for option %s") % \
(arg, "'-r, --recursion-level'"))
if depth >= 0:
config["recursionlevel"] = depth
else:
config["recursionlevel"] = -1
# robots.txt is now default, so ignore this option
elif opt=="-R" or opt=="--robots-txt": pass
@ -360,25 +357,21 @@ for opt,arg in options:
elif opt=="-t" or opt=="--threads":
try:
num = int(arg)
config.setThreads(int(arg))
except ValueError:
printUsage(i18n._("Illegal argument %r for option %s") % \
(arg, "'-t, --threads'"))
if num > 1 and not get_debuglevel() > 0:
config.enableThreading(num)
else:
config.disableThreading()
elif opt=="--timeout":
try:
timeout = int(arg)
if timeout <= 0:
printUsage(i18n._("Illegal argument %r for option %s") % \
(arg, "'--timeout'"))
socket.setdefaulttimeout(timeout)
except ValueError:
printUsage(i18n._("Illegal argument %r for option %s") % \
(arg, "'--timeout'"))
if timeout <= 0:
printUsage(i18n._("Illegal argument %r for option %s") % \
(arg, "'--timeout'"))
socket.setdefaulttimeout(timeout)
elif opt=="-u" or opt=="--user":
_user = arg