cookies and port number syntax

git-svn-id: https://linkchecker.svn.sourceforge.net/svnroot/linkchecker/trunk/linkchecker@409 e7d03fd6-7b0d-0410-9947-9c21f3af8025
This commit is contained in:
calvin 2002-05-04 13:27:02 +00:00
parent f1176e15e8
commit 49a16042ca
27 changed files with 301 additions and 330 deletions

3
TODO
View file

@ -3,4 +3,5 @@ Check why threaded app wont exit resp. is stalled
Another Profiling roundup
Named constants for ANSI Color codes
Test Proxy Authentication
Test socket timeout (how?)
Cookie support
http://www.host.com:/ syntax check

6
debian/changelog vendored
View file

@ -1,3 +1,9 @@
linkchecker (1.5.0) unstable; urgency=low
* More syntax checking for host:port network locations
-- Bastian Kleineidam <calvin@debian.org> Sat, 4 May 2002 00:21:45 +0200
linkchecker (1.4.3) unstable; urgency=low
* UrlData.py: also catch Timeout and other exception on retrieving

View file

@ -103,6 +103,7 @@ class Configuration(UserDict.UserDict):
self["recursionlevel"] = 1
self["wait"] = 0
self["robotstxt"] = 1
self['cookies'] = 0
self["strict"] = 0
self["fileoutput"] = []
# Logger configurations
@ -186,6 +187,8 @@ class Configuration(UserDict.UserDict):
self.robotsTxtCache_set = self.robotsTxtCache_set_NoThreads
self.robotsTxtCacheLock = None
self.incrementLinknumber = self.incrementLinknumber_NoThreads
self.setCookies = self.setCookies_NoThreads
self.storeCookies = self.storeCookies_NoThreads
self.log_newUrl = self.log_newUrl_NoThreads
self.logLock = None
self.urls = []
@ -215,6 +218,8 @@ class Configuration(UserDict.UserDict):
self.robotsTxtCache_set = self.robotsTxtCache_set_Threads
self.robotsTxtCacheLock = Lock()
self.incrementLinknumber = self.incrementLinknumber_Threads
self.setCookies = self.setCookies_Threads
self.storeCookies = self.storeCookies_Threads
self.log_newUrl = self.log_newUrl_Threads
self.logLock = Lock()
self.urls = Queue.Queue(0)
@ -224,40 +229,48 @@ class Configuration(UserDict.UserDict):
def hasMoreUrls_NoThreads(self):
return len(self.urls)
def finished_NoThreads(self):
return not self.hasMoreUrls_NoThreads()
def finish_NoThreads(self):
pass
def appendUrl_NoThreads(self, url):
self.urls.append(url)
def getUrl_NoThreads(self):
return self.urls.pop(0)
def checkUrl_NoThreads(self, url):
url.check(self)
url.check()
def urlCache_has_key_NoThreads(self, key):
return self.urlCache.has_key(key)
def urlCache_get_NoThreads(self, key):
return self.urlCache[key]
def urlCache_set_NoThreads(self, key, val):
self.urlCache[key] = val
def robotsTxtCache_has_key_NoThreads(self, key):
return self.robotsTxtCache.has_key(key)
def robotsTxtCache_get_NoThreads(self, key):
return self.robotsTxtCache[key]
def robotsTxtCache_set_NoThreads(self, key, val):
self.robotsTxtCache[key] = val
def storeCookies_NoThreads(self, headers):
pass
# XXX
def setCookies_NoThreads(self, urlConnection):
pass
# XXX
def newLogger(self, logtype, dict={}):
args = {}
args.update(self[logtype])
@ -266,7 +279,7 @@ class Configuration(UserDict.UserDict):
def incrementLinknumber_NoThreads(self):
self['linknumber'] += 1
def log_newUrl_NoThreads(self, url):
if not self["quiet"]: self["log"].newUrl(url)
for log in self["fileoutput"]:
@ -312,7 +325,7 @@ class Configuration(UserDict.UserDict):
return self.urls.get()
def checkUrl_Threads(self, url):
self.threader.startThread(url.check, (self,))
self.threader.startThread(url.check, ())
def urlCache_has_key_Threads(self, key):
ret = None
@ -373,6 +386,20 @@ class Configuration(UserDict.UserDict):
finally:
self.logLock.release()
def storeCookies_Threads(self, headers):
try:
self.dataLock.acquire()
# XXX
finally:
self.dataLock.release()
def setCookies_Threads(self, urlConnection):
try:
self.dataLock.acquire()
# XXX
finally:
self.dataLock.release()
def read(self, files = []):
if not files:
# system wide config settings

View file

@ -86,16 +86,18 @@ _url_re = re.compile(_url, re.VERBOSE)
class FileUrlData (UrlData):
"Url link with file scheme"
def __init__(self,
urlName,
recursionLevel,
parentName = None,
baseRef = None, line=0, name=""):
def __init__ (self,
urlName,
config,
recursionLevel,
parentName = None,
baseRef = None, line=0, name=""):
UrlData.__init__(self,
urlName,
recursionLevel,
parentName=parentName,
baseRef=baseRef, line=line, name=name)
urlName,
config,
recursionLevel,
parentName=parentName,
baseRef=baseRef, line=line, name=name)
if not parentName and not baseRef and \
not re.compile("^file:").search(self.urlName):
self.urlName = os.path.expanduser(self.urlName)
@ -136,18 +138,18 @@ class FileUrlData (UrlData):
return None
def parseUrl (self, config):
def parseUrl (self):
for key,ro in extensions.items():
if ro.search(self.url):
return getattr(self, "parse_"+key)(config)
return getattr(self, "parse_"+key)()
for key,ro in contents.items():
if ro.search(self.getContent()):
return getattr(self, "parse_"+key)(config)
return getattr(self, "parse_"+key)()
def parse_html (self, config):
UrlData.parseUrl(self, config)
def parse_html (self):
UrlData.parseUrl(self)
def parse_opera (self, config):
def parse_opera (self):
# parse an opera bookmark file
name = ""
lineno = 0
@ -159,11 +161,11 @@ class FileUrlData (UrlData):
elif line.startswith("URL="):
url = line[4:]
if url:
config.appendUrl(linkcheck.UrlData.GetUrlDataFrom(url,
self.recursionLevel+1, self.url, None, lineno, name))
self.config.appendUrl(linkcheck.UrlData.GetUrlDataFrom(url,
self.recursionLevel+1, self.config, self.url, None, lineno, name))
name = ""
def parse_text (self, config):
def parse_text (self):
lineno = 0
for line in self.getContent().splitlines():
lineno += 1
@ -171,8 +173,8 @@ class FileUrlData (UrlData):
while 1:
mo = _url_re.search(line, i)
if not mo: break
config.appendUrl(linkcheck.UrlData.GetUrlDataFrom(mo.group(),
self.recursionLevel+1, self.url, None, lineno, ""))
self.config.appendUrl(linkcheck.UrlData.GetUrlDataFrom(mo.group(),
self.recursionLevel+1, self.config, self.url, None, lineno, ""))
i = mo.end()
return

View file

@ -25,13 +25,13 @@ ExcList.extend([
ftplib.error_proto,
])
class FtpUrlData(UrlData):
class FtpUrlData (UrlData):
"""
Url link with ftp scheme.
Url link with ftp scheme.
"""
def checkConnection(self, config):
_user, _password = self._getUserPassword(config)
def checkConnection (self):
_user, _password = self._getUserPassword()
if _user is None or _password is None:
raise linkcheck.error, linkcheck._("No user or password found")
try:
@ -45,7 +45,7 @@ class FtpUrlData(UrlData):
self.setInfo(info)
def closeConnection(self):
def closeConnection (self):
try: self.urlConnection.quit()
except: pass
self.urlConnection = None

View file

@ -17,6 +17,6 @@
from UrlData import UrlData
class GopherUrlData(UrlData):
class GopherUrlData (UrlData):
"Url link with gopher scheme"
pass

View file

@ -18,27 +18,24 @@
import socket, linkcheck
from UrlData import UrlData
class HostCheckingUrlData(UrlData):
class HostCheckingUrlData (UrlData):
"Url link for which we have to connect to a specific host"
def __init__(self, urlName, recursionLevel, parentName = None,
baseRef = None, line=0, name=""):
UrlData.__init__(self, urlName, recursionLevel,
def __init__ (self, urlName, recursionLevel, config, parentName=None,
baseRef=None, line=0, name=""):
UrlData.__init__(self, urlName, recursionLevel, config,
parentName=parentName, baseRef=baseRef, line=line,
name=name)
name=name)
self.host = None
self.url = urlName
def buildUrl(self):
def buildUrl (self):
# to avoid anchor checking
self.urlTuple=None
def getCacheKey(self):
def getCacheKey (self):
return "%s:%s" % (self.scheme, self.host)
def checkConnection(self, config):
def checkConnection (self):
ip = socket.gethostbyname(self.host)
self.setValid(self.host+"("+ip+") "+linkcheck._("found"))

View file

@ -24,11 +24,11 @@ from urllib import splittype, splithost, splituser, splitpasswd
from debuglevels import *
class HttpUrlData(UrlData):
class HttpUrlData (UrlData):
"Url link with http scheme"
netscape_re = re.compile("Netscape-Enterprise/")
def checkConnection(self, config):
def checkConnection (self):
"""
Check a URL with HTTP protocol.
Here is an excerpt from RFC 1945 with common response codes:
@ -70,27 +70,26 @@ class HttpUrlData(UrlData):
| extension-code
"""
self._setProxy(config["proxy"].get(self.scheme))
self.mime = None
self._setProxy(self.config["proxy"].get(self.scheme))
self.headers = None
self.auth = None
self.proxyauth = None
if not self.urlTuple[2]:
self.setWarning(linkcheck._("Missing '/' at end of URL"))
if config["robotstxt"] and not self.robotsTxtAllowsUrl(config):
if self.config["robotstxt"] and not self.robotsTxtAllowsUrl():
self.setWarning(linkcheck._("Access denied by robots.txt, checked only syntax"))
return
# first try
status, statusText, self.mime = self._getHttpRequest()
Config.debug(BRING_IT_ON, status, statusText, self.mime)
status, statusText, self.headers = self._getHttpRequest()
Config.debug(BRING_IT_ON, status, statusText, self.headers)
has301status = 0
while 1:
# proxy enforcement (overrides standard proxy)
if status == 305 and self.mime:
self._setProxy(self.mime.get("Location"))
status, statusText, self.mime = self._getHttpRequest()
if status == 305 and self.headers:
self._setProxy(self.headers.getheader("Location"))
status, statusText, self.headers = self._getHttpRequest()
# proxy authentication
if status==407:
if not (self.proxyuser and self.proxypass):
@ -99,34 +98,33 @@ class HttpUrlData(UrlData):
import base64
self.proxyauth = "Basic "+base64.encodestring("%s:%s" % \
(self.proxyuser, self.proxypass))
status, statusText, self.mime = self._getHttpRequest()
status, statusText, self.headers = self._getHttpRequest()
# follow redirections
tries = 0
redirected = self.urlName
while status in [301,302] and self.mime and tries < 5:
while status in [301,302] and self.headers and tries < 5:
has301status = (status==301)
newurl = self.mime.get("Location", self.mime.get("Uri", ""))
newurl = self.headers.getheader("Location",
self.headers.getheader("Uri", ""))
redirected = urlparse.urljoin(redirected, newurl)
self.urlTuple = urlparse.urlparse(redirected)
status, statusText, self.mime = self._getHttpRequest()
Config.debug(BRING_IT_ON, "Redirected", self.mime)
status, statusText, self.headers = self._getHttpRequest()
Config.debug(BRING_IT_ON, "Redirected", self.headers)
tries += 1
if tries >= 5:
self.setError(linkcheck._("too much redirections (>= 5)"))
return
# user authentication
if status==401:
if not self.auth:
import base64
_user, _password = self._getUserPassword(config)
_user, _password = self._getUserPassword()
self.auth = "Basic "+\
base64.encodestring("%s:%s" % (_user, _password))
status, statusText, self.mime = self._getHttpRequest()
status, statusText, self.headers = self._getHttpRequest()
Config.debug(BRING_IT_ON, "Authentication", _user, "/",
_password)
# some servers get the HEAD request wrong:
# - Netscape Enterprise Server (no HEAD implemented, 404 error)
# - Hyperwave Information Server (501 error)
@ -138,24 +136,23 @@ class HttpUrlData(UrlData):
# HEAD method not allowed ==> try get
self.setWarning(linkcheck._("Server does not support HEAD request (got "
"%d status), falling back to GET")%status)
status, statusText, self.mime = self._getHttpRequest("GET")
elif status>=400 and self.mime:
server = self.mime.getheader("Server")
status, statusText, self.headers = self._getHttpRequest("GET")
elif status>=400 and self.headers:
server = self.headers.getheader("Server")
if server and self.netscape_re.search(server):
self.setWarning(linkcheck._("Netscape Enterprise Server with no "
"HEAD support, falling back to GET"))
status,statusText,self.mime = self._getHttpRequest("GET")
elif self.mime:
type = self.mime.gettype()
poweredby = self.mime.getheader('X-Powered-By')
server = self.mime.getheader('Server')
status,statusText,self.headers = self._getHttpRequest("GET")
elif self.headers:
type = self.headers.gettype()
poweredby = self.headers.getheader('X-Powered-By')
server = self.headers.getheader('Server')
if type=='application/octet-stream' and \
((poweredby and poweredby[:4]=='Zope') or \
(server and server[:4]=='Zope')):
self.setWarning(linkcheck._("Zope Server cannot determine MIME type"
" with HEAD, falling back to GET"))
status,statusText,self.mime = self._getHttpRequest("GET")
status,statusText,self.headers = self._getHttpRequest("GET")
if status not in [301,302]: break
effectiveurl = urlparse.urlunparse(self.urlTuple)
@ -183,9 +180,11 @@ class HttpUrlData(UrlData):
self.setValid(`status`+" "+statusText)
else:
self.setValid("OK")
# store cookies for valid links
if self.config['cookies']:
self.config.storeCookies(self.headers)
def _setProxy(self, proxy):
def _setProxy (self, proxy):
self.proxy = proxy
self.proxyuser = None
self.proxypass = None
@ -198,8 +197,7 @@ class HttpUrlData(UrlData):
if self.proxyuser:
self.proxyuser, self.proxypass = splitpasswd(self.proxyuser)
def _getHttpRequest(self, method="HEAD"):
def _getHttpRequest (self, method="HEAD"):
"""Put request and return (status code, status text, mime object).
host can be host:port format
"""
@ -226,23 +224,23 @@ class HttpUrlData(UrlData):
if self.parentName:
self.urlConnection.putheader("Referer", self.parentName)
self.urlConnection.putheader("User-agent", Config.UserAgent)
if self.config['cookies']:
self.config.setCookies(self.urlConnection)
self.urlConnection.endheaders()
return self.urlConnection.getreply()
def _getHTTPObject(self, host):
def _getHTTPObject (self, host):
h = httplib.HTTP()
h.set_debuglevel(Config.DebugLevel)
h.connect(host)
return h
def getContent(self):
def getContent (self):
if not self.has_content:
self.has_content = 1
self.closeConnection()
t = time.time()
status, statusText, self.mime = self._getHttpRequest("GET")
status, statusText, self.headers = self._getHttpRequest("GET")
self.urlConnection = self.urlConnection.getfile()
self.data = self.urlConnection.read()
self.downloadtime = time.time() - t
@ -250,29 +248,26 @@ class HttpUrlData(UrlData):
Config.debug(HURT_ME_PLENTY, "comment spans", self.html_comments)
return self.data
def isHtml(self):
if not (self.valid and self.mime):
def isHtml (self):
if not (self.valid and self.headers):
return 0
return self.mime.gettype()[:9]=="text/html"
return self.headers.gettype()[:9]=="text/html"
def robotsTxtAllowsUrl(self, config):
def robotsTxtAllowsUrl (self):
roboturl = "%s://%s/robots.txt" % self.urlTuple[0:2]
Config.debug(HURT_ME_PLENTY, "robots.txt url", roboturl)
Config.debug(HURT_ME_PLENTY, "url", self.url)
if not config.robotsTxtCache_has_key(roboturl):
if not self.config.robotsTxtCache_has_key(roboturl):
rp = robotparser.RobotFileParser()
rp.set_url(roboturl)
rp.read()
config.robotsTxtCache_set(roboturl, rp)
rp = config.robotsTxtCache_get(roboturl)
self.config.robotsTxtCache_set(roboturl, rp)
rp = self.config.robotsTxtCache_get(roboturl)
return rp.can_fetch(Config.UserAgent, self.url)
def closeConnection(self):
if self.mime:
try: self.mime.close()
except: pass
self.mime = None
def closeConnection (self):
#if self.headers:
# try: self.headers.close()
# except: pass
# self.headers = None
UrlData.closeConnection(self)

View file

@ -22,19 +22,18 @@ import linkcheck, Config
_supportHttps = hasattr(httplib, "HTTPS")
class HttpsUrlData(HttpUrlData):
class HttpsUrlData (HttpUrlData):
"""Url link with https scheme"""
def _getHTTPObject(self, host):
def _getHTTPObject (self, host):
h = httplib.HTTPS()
h.set_debuglevel(Config.DebugLevel)
h.connect(host)
return h
def _check(self, config):
def _check (self):
if _supportHttps:
HttpUrlData._check(self, config)
HttpUrlData._check(self)
else:
self.setWarning(linkcheck._("HTTPS url ignored"))
self.logMe(config)
self.logMe()

View file

@ -54,9 +54,9 @@ acap # application configuration access protocol
ignored_schemes_re = re.compile(ignored_schemes, re.VERBOSE)
class IgnoredUrlData(UrlData):
class IgnoredUrlData (UrlData):
"""Some schemes are defined in http://www.w3.org/Addressing/schemes"""
def _check(self, config):
def _check (self):
self.setWarning(linkcheck._("%s url ignored")%self.scheme.capitalize())
self.logMe(config)
self.logMe()

View file

@ -28,10 +28,10 @@ headers_re = re.compile(r"\?(.+)$")
# or read entries from the registry (Windows systems)
linkcheck.DNS.init_dns_resolver()
class MailtoUrlData(HostCheckingUrlData):
class MailtoUrlData (HostCheckingUrlData):
"Url link with mailto scheme"
def buildUrl(self):
def buildUrl (self):
HostCheckingUrlData.buildUrl(self)
self.headers = {}
self.adresses = AddressList(self._cutout_adresses()).addresslist
@ -42,8 +42,7 @@ class MailtoUrlData(HostCheckingUrlData):
self.adresses.extend(AddressList(a).addresslist)
Config.debug(BRING_IT_ON, "adresses: ", self.adresses)
def _cutout_adresses(self):
def _cutout_adresses (self):
mo = headers_re.search(self.urlName)
if mo:
headers = cgi.parse_qs(mo.group(1), strict_parsing=1)
@ -54,7 +53,7 @@ class MailtoUrlData(HostCheckingUrlData):
return self.urlName[7:]
def checkConnection(self, config):
def checkConnection (self):
"""Verify a list of email adresses. If one adress fails,
the whole list will fail.
For each mail adress we check the following things:
@ -97,7 +96,6 @@ class MailtoUrlData(HostCheckingUrlData):
type, value = sys.exc_info()[:2]
#print type,value
if smtpconnect: break
if not smtpconnect:
self.setWarning(linkcheck._("None of the MX mail hosts for %s accepts an "
"SMTP connection: %s") % (host, str(value)))
@ -107,7 +105,7 @@ class MailtoUrlData(HostCheckingUrlData):
self.setValid(linkcheck._("found MX mail host %s") % mxrecord)
def _split_adress(self, adress):
def _split_adress (self, adress):
split = adress.split("@", 1)
if len(split)==2:
if not split[1]:
@ -117,12 +115,10 @@ class MailtoUrlData(HostCheckingUrlData):
return (split[0], "localhost")
raise linkcheck.error, linkcheck._("could not split the mail adress")
def closeConnection(self):
def closeConnection (self):
try: self.urlConnection.quit()
except: pass
self.urlConnection = None
def getCacheKey(self):
def getCacheKey (self):
return "%s:%s" % (self.scheme, str(self.adresses))

View file

@ -26,10 +26,10 @@ ExcList.extend([nntplib.error_reply,
nntplib.error_proto,
])
class NntpUrlData(UrlData):
class NntpUrlData (UrlData):
"Url link with NNTP scheme"
def buildUrl(self):
def buildUrl (self):
# use nntp instead of news to comply with the unofficial internet
# draft of Alfred Gilman which unifies (s)news and nntp URLs
# note: we use this only internally (for parsing and caching)
@ -40,9 +40,8 @@ class NntpUrlData(UrlData):
self.urlTuple = urlparse.urlparse(self.url)
debug(BRING_IT_ON, self.urlTuple)
def checkConnection(self, config):
nntpserver = self.urlTuple[1] or config["nntpserver"]
def checkConnection (self):
nntpserver = self.urlTuple[1] or self.config["nntpserver"]
if not nntpserver:
self.setWarning(linkcheck._("No NNTP server specified, skipping this URL"))
return
@ -66,8 +65,7 @@ class NntpUrlData(UrlData):
# group name is the empty string
self.setWarning(linkcheck._("No newsgroup specified in NNTP URL"))
def _connectNntp(self, nntpserver):
def _connectNntp (self, nntpserver):
"""This is done only once per checking task."""
timeout = 1
while timeout:
@ -84,6 +82,5 @@ class NntpUrlData(UrlData):
raise
return nntp
def getCacheKey(self):
def getCacheKey (self):
return self.url

View file

@ -30,52 +30,7 @@ SQLTable = [
("'","''")
]
TeXTable = []
def stripHtmlComments(data):
"Remove <!-- ... --> HTML comments from data"
i = data.find("<!--")
while i!=-1:
j = data.find("-->", i)
if j == -1:
break
data = data[:i] + data[j+3:]
i = data.find("<!--")
return data
def stripFenceComments(data):
"Remove # ... comments from data"
lines = data.split("\n")
ret = None
for line in lines:
if not re.compile("\s*#.*").match(line):
if ret:
ret += "\n" + line
else:
ret = line
return ret
def rstripQuotes(s):
"Strip optional ending quotes"
if len(s)<1:
return s
if s[-1]=="\"" or s[-1]=="'":
s = s[:-1]
return s
def lstripQuotes(s):
"Strip optional leading quotes"
if len(s)<1:
return s
if s[0]=="\"" or s[0]=="'":
s = s[1:]
return s
def stripQuotes(s):
def stripQuotes (s):
"Strip optional quotes"
if len(s)<2:
return s
@ -85,13 +40,11 @@ def stripQuotes(s):
s = s[:-1]
return s
def indent(s, level):
def indent (s, level):
"indent each line of s with <level> spaces"
return indentWith(s, level * " ")
def indentWith(s, indent):
def indentWith (s, indent):
"indent each line of s with given indent argument"
i = 0
while i < len(s):
@ -100,8 +53,7 @@ def indentWith(s, indent):
i += 1
return s
def blocktext(s, width):
def blocktext (s, width):
"Adjust lines of s to be not wider than width"
# split into lines
s = s.split("\n")
@ -119,8 +71,7 @@ def blocktext(s, width):
line = line[i:].strip()
return ret + line
def getLastWordBoundary(s, width):
def getLastWordBoundary (s, width):
"""Get maximal index i of a whitespace char in s with 0 < i < width.
Note: if s contains no whitespace this returns width-1"""
match = re.compile(".*\s").match(s[0:width])
@ -128,47 +79,37 @@ def getLastWordBoundary(s, width):
return match.end()
return width-1
def applyTable(table, s):
def applyTable (table, s):
"apply a table of replacement pairs to str"
for mapping in table:
s = s.replace(mapping[0], mapping[1])
return s
def texify(str):
"Escape special TeX chars and strings"
return applyTable(TeXTable, str)
def sqlify(str):
def sqlify (s):
"Escape special SQL chars and strings"
if not str:
if not s:
return "NULL"
return "'"+applyTable(SQLTable, str)+"'"
return "'%s'"%applyTable(SQLTable, s)
def htmlify(str):
def htmlify (s):
"Escape special HTML chars and strings"
return applyTable(HtmlTable, str)
return applyTable(HtmlTable, s)
def unhtmlify (s):
return applyTable(UnHtmlTable, s)
def unhtmlify(str):
return applyTable(UnHtmlTable, str)
def getLineNumber(str, index):
def getLineNumber (s, index):
"return the line number of str[index]"
i=0
if index<0: index=0
line=1
while i<index:
if str[i]=='\n':
if s[i]=='\n':
line += 1
i += 1
return line
def paginate(text, lines=22):
def paginate (text, lines=22):
"""print text in pages of lines size"""
textlines = text.split("\n")
curline = 1
@ -180,13 +121,7 @@ def paginate(text, lines=22):
print "press return to continue..."
sys.stdin.read(1)
if __name__=='__main__':
print htmlify("äöü")
print unhtmlify("&auml;&nbsp;&auml;&amp;auml;")
def remove_markup(s):
def remove_markup (s):
mo = markup_re.search(s)
while mo:
s = s[0:mo.start()] + s[mo.end():]

View file

@ -30,10 +30,10 @@ _port = r"\d+"
telnet_re = re.compile(r"^telnet://(%s)?(?P<host>%s)(:(?P<port>%s))?(/)?$"%\
(_userpassword, _host, _port))
class TelnetUrlData(HostCheckingUrlData):
class TelnetUrlData (HostCheckingUrlData):
"Url link with telnet scheme"
def buildUrl(self):
def buildUrl (self):
HostCheckingUrlData.buildUrl(self)
mo = telnet_re.match(self.urlName)
if not mo:
@ -45,8 +45,8 @@ class TelnetUrlData(HostCheckingUrlData):
if not self.port:
self.port = 23
def checkConnection(self, config):
HostCheckingUrlData.checkConnection(self, config)
def checkConnection (self):
HostCheckingUrlData.checkConnection(self)
self.urlConnection = telnetlib.Telnet()
self.urlConnection.open(self.host, self.port)
if self.user:
@ -56,4 +56,3 @@ class TelnetUrlData(HostCheckingUrlData):
self.urlConnection.read_until("Password: ", 10)
self.urlConnection.write(self.password+"\n")
self.urlConnection.write("exit\n")

View file

@ -20,32 +20,30 @@ from threading import *
class Threader:
"A thread generating class"
def __init__(self, num=5):
def __init__ (self, num=5):
self.maxThreads = num
self.threads = []
def acquire(self):
def acquire (self):
"Wait until we are allowed to start a new thread"
while 1:
self.reduceThreads()
if len(self.threads) < self.maxThreads:
break
def reduceThreads(self):
def reduceThreads (self):
for t in self.threads:
if not t.isAlive():
self.threads.remove(t)
def finished(self):
def finished (self):
return not len(self.threads)
def finish(self):
def finish (self):
self.reduceThreads()
# dont know how to stop a thread
def startThread(self, callable, args):
def startThread (self, callable, args):
"Generate a new thread"
self.acquire()
t = Thread(None, callable, None, args)

View file

@ -143,21 +143,26 @@ BasePattern = {
#CommentPattern = re.compile("<!--.*?--\s*>", re.DOTALL)
# Workaround for Python 2.0 re module bug
CommentPatternBegin = re.compile("<!--")
CommentPatternEnd = re.compile("--\s*>")
CommentPatternBegin = re.compile(r"<!--")
CommentPatternEnd = re.compile(r"--\s*>")
# regular expression for port numbers
port_re = re.compile(r"\d+")
class UrlData:
"Representing a URL with additional information like validity etc"
def __init__(self,
urlName,
recursionLevel,
parentName = None,
baseRef = None,
line = 0,
name = ""):
def __init__ (self,
urlName,
recursionLevel,
config,
parentName = None,
baseRef = None,
line = 0,
name = ""):
self.urlName = urlName
self.recursionLevel = recursionLevel
self.config = config
self.parentName = parentName
self.baseRef = baseRef
self.errorString = linkcheck._("Error")
@ -180,30 +185,30 @@ class UrlData:
# assume file link if no scheme is found
self.scheme = url.split(":", 1)[0] or "file"
def setError(self, s):
def setError (self, s):
self.valid=0
self.errorString = linkcheck._("Error")+": "+s
def setValid(self, s):
def setValid (self, s):
self.valid=1
self.validString = linkcheck._("Valid")+": "+s
def isHtml(self):
def isHtml (self):
return 0
def setWarning(self, s):
def setWarning (self, s):
if self.warningString:
self.warningString += "\n" + s
else:
self.warningString = s
def setInfo(self, s):
def setInfo (self, s):
if self.infoString:
self.infoString += "\n"+s
else:
self.infoString = s
def copyFrom(self, urlData):
def copyFrom (self, urlData):
self.errorString = urlData.errorString
self.validString = urlData.validString
self.warningString = urlData.warningString
@ -212,7 +217,7 @@ class UrlData:
self.downloadtime = urlData.downloadtime
def buildUrl(self):
def buildUrl (self):
if self.baseRef:
self.url = urlparse.urljoin(self.baseRef, self.urlName)
elif self.parentName:
@ -227,19 +232,25 @@ class UrlData:
self.url = urlparse.urlunparse(self.urlTuple)
# resolve HTML entities
self.url = StringUtil.unhtmlify(self.url)
# check host:port syntax
host = self.urlTuple[1]
if ":" in host:
host,port = host.split(":", 1)
if not port_re.match(port):
raise linkcheck.error("URL has invalid port number")
def logMe(self, config):
def logMe (self):
debug(BRING_IT_ON, "logging url")
config.incrementLinknumber()
if config["verbose"] or not self.valid or \
(self.warningString and config["warnings"]):
config.log_newUrl(self)
self.config.incrementLinknumber()
if self.config["verbose"] or not self.valid or \
(self.warningString and self.config["warnings"]):
self.config.log_newUrl(self)
def check(self, config):
def check (self):
try:
self._check(config)
self._check()
except KeyboardInterrupt:
pass
except (socket.error, select.error):
@ -251,52 +262,53 @@ class UrlData:
except test_support.Error:
raise
except:
type, value = sys.exc_info()[:2]
internal_error()
def _check(self, config):
def _check (self):
debug(BRING_IT_ON, "Checking", self)
if self.recursionLevel and config['wait']:
debug(BRING_IT_ON, "sleeping for", config['wait'], "seconds")
time.sleep(config['wait'])
if self.recursionLevel and self.config['wait']:
debug(BRING_IT_ON, "sleeping for", self.config['wait'], "seconds")
time.sleep(self.config['wait'])
t = time.time()
# check syntax
debug(BRING_IT_ON, "checking syntax")
if not self.urlName or self.urlName=="":
self.setError(linkcheck._("URL is null or empty"))
self.logMe(config)
self.logMe()
return
try:
self.buildUrl()
self.extern = self._getExtern(config)
self.extern = self._getExtern()
except tuple(ExcList):
type, value, tb = sys.exc_info()
debug(HURT_ME_PLENTY, "exception", traceback.format_tb(tb))
self.setError(str(value))
self.logMe(config)
self.logMe()
return
# check the cache
debug(BRING_IT_ON, "checking cache")
if config.urlCache_has_key(self.getCacheKey()):
self.copyFrom(config.urlCache_get(self.getCacheKey()))
if self.config.urlCache_has_key(self.getCacheKey()):
self.copyFrom(self.config.urlCache_get(self.getCacheKey()))
self.cached = 1
self.logMe(config)
self.logMe()
return
# apply filter
debug(BRING_IT_ON, "extern =", self.extern)
if self.extern and (config["strict"] or self.extern[1]):
if self.extern and (self.config["strict"] or self.extern[1]):
self.setWarning(
linkcheck._("outside of domain filter, checked only syntax"))
self.logMe(config)
self.logMe()
return
# check connection
debug(BRING_IT_ON, "checking connection")
try:
self.checkConnection(config)
if self.urlTuple and config["anchors"]:
self.checkConnection()
if self.urlTuple and self.config["anchors"]:
self.checkAnchors(self.urlTuple[5])
except tuple(ExcList):
type, value, tb = sys.exc_info()
@ -304,7 +316,7 @@ class UrlData:
self.setError(str(value))
# check content
warningregex = config["warningregex"]
warningregex = self.config["warningregex"]
if warningregex and self.valid:
debug(BRING_IT_ON, "checking content")
try: self.checkContent(warningregex)
@ -316,19 +328,19 @@ class UrlData:
self.checktime = time.time() - t
# check recursion
debug(BRING_IT_ON, "checking recursion")
if self.allowsRecursion(config):
try: self.parseUrl(config)
if self.allowsRecursion():
try: self.parseUrl()
except tuple(ExcList):
type, value, tb = sys.exc_info()
debug(HURT_ME_PLENTY, "exception", traceback.format_tb(tb))
self.setError(str(value))
self.closeConnection()
self.logMe(config)
self.logMe()
debug(BRING_IT_ON, "caching")
self.putInCache(config)
self.putInCache()
def closeConnection(self):
def closeConnection (self):
# brute force closing
if self.urlConnection is not None:
try: self.urlConnection.close()
@ -337,32 +349,32 @@ class UrlData:
self.urlConnection = None
def putInCache(self, config):
def putInCache (self):
cacheKey = self.getCacheKey()
if cacheKey and not self.cached:
config.urlCache_set(cacheKey, self)
self.config.urlCache_set(cacheKey, self)
self.cached = 1
def getCacheKey(self):
def getCacheKey (self):
if self.urlTuple:
return urlparse.urlunparse(self.urlTuple)
return None
def checkConnection(self, config):
def checkConnection (self):
self.urlConnection = urllib.urlopen(self.url)
def allowsRecursion(self, config):
def allowsRecursion (self):
return self.valid and \
self.isHtml() and \
not self.cached and \
self.recursionLevel < config["recursionlevel"] and \
self.recursionLevel < self.config["recursionlevel"] and \
not self.extern
def checkAnchors(self, anchor):
def checkAnchors (self, anchor):
if not (anchor!="" and self.isHtml() and self.valid):
return
self.getContent()
@ -372,30 +384,30 @@ class UrlData:
self.setWarning("anchor #"+anchor+" not found")
def _getExtern(self, config):
if not (config["externlinks"] or config["internlinks"]):
def _getExtern (self):
if not (self.config["externlinks"] or self.config["internlinks"]):
return 0
# deny and allow external checking
if config["denyallow"]:
for pat, strict in config["externlinks"]:
if self.config["denyallow"]:
for pat, strict in self.config["externlinks"]:
if pat.search(self.url):
return (1, strict)
for pat in config["internlinks"]:
for pat in self.config["internlinks"]:
if pat.search(self.url):
return 0
return 0
else:
for pat in config["internlinks"]:
for pat in self.config["internlinks"]:
if pat.search(self.url):
return 0
for pat, strict in config["externlinks"]:
for pat, strict in self.config["externlinks"]:
if pat.search(self.url):
return (1, strict)
return (1,0)
raise linkcheck.error, "internal error in UrlData._getExtern"
def getContent(self):
def getContent (self):
"""Precondition: urlConnection is an opened URL."""
if not self.has_content:
self.has_content = 1
@ -406,7 +418,7 @@ class UrlData:
return self.data
def init_html_comments(self):
def init_html_comments (self):
# if we find an URL inside HTML comments we ignore it
# so build a list of intervalls which are HTML comments
index = 0
@ -426,20 +438,20 @@ class UrlData:
debug(NIGHTMARE, "comment spans", self.html_comments)
def is_in_comment(self, index):
def is_in_comment (self, index):
for low,high in self.html_comments:
if low < index < high:
return 1
return 0
def checkContent(self, warningregex):
def checkContent (self, warningregex):
match = warningregex.search(self.getContent())
if match:
self.setWarning("Found '"+match.group()+"' in link contents")
def parseUrl(self, config):
def parseUrl (self):
debug(BRING_IT_ON, "Parsing recursively into", self)
# search for a possible base reference
bases = self.searchInForTag(BasePattern)
@ -454,11 +466,11 @@ class UrlData:
for pattern in LinkPatterns:
urls = self.searchInForTag(pattern)
for url,line,name in urls:
config.appendUrl(GetUrlDataFrom(url,
self.recursionLevel+1, self.url, baseRef, line, name))
self.config.appendUrl(GetUrlDataFrom(url,
self.recursionLevel+1, self.config, self.url, baseRef, line, name))
def searchInForTag(self, pattern):
def searchInForTag (self, pattern):
debug(HURT_ME_PLENTY, "Searching for tag", `pattern['tag']`,
"attribute", `pattern['attr']`)
urls = []
@ -481,7 +493,7 @@ class UrlData:
return urls
def searchInForName(self, tag, attr, start, end):
def searchInForName (self, tag, attr, start, end):
name=""
if tag=='img':
name = linkname.image_name(self.getContent()[start:end])
@ -490,7 +502,7 @@ class UrlData:
return name
def __str__(self):
def __str__ (self):
return ("%s link\n"
"urlname=%s\n"
"parentName=%s\n"
@ -505,8 +517,8 @@ class UrlData:
self.name))
def _getUserPassword(self, config):
for auth in config["authentication"]:
def _getUserPassword (self):
for auth in self.config["authentication"]:
if auth['pattern'].match(self.url):
return auth['user'], auth['password']
return None,None
@ -523,7 +535,7 @@ from TelnetUrlData import TelnetUrlData
from NntpUrlData import NntpUrlData
def get_absolute_url(urlName, baseRef, parentName):
def get_absolute_url (urlName, baseRef, parentName):
"""search for the absolute url"""
if urlName and ":" in urlName:
return urlName.lower()
@ -534,8 +546,8 @@ def get_absolute_url(urlName, baseRef, parentName):
return ""
def GetUrlDataFrom(urlName, recursionLevel, parentName=None,
baseRef=None, line=0, name=None):
def GetUrlDataFrom (urlName, recursionLevel, config, parentName=None,
baseRef=None, line=0, name=None):
url = get_absolute_url(urlName, baseRef, parentName)
# test scheme
if re.search("^http:", url):
@ -560,5 +572,5 @@ def GetUrlDataFrom(urlName, recursionLevel, parentName=None,
# assume local file
else:
klass = FileUrlData
return klass(urlName, recursionLevel, parentName, baseRef, line, name)
return klass(urlName, recursionLevel, config, parentName, baseRef, line,
name)

View file

@ -15,7 +15,7 @@
# along with this program; if not, write to the Free Software
# Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
class error(Exception):
class error (Exception):
pass
# i18n suppport
@ -39,7 +39,7 @@ debug = Config.debug
# main check function
def checkUrls(config):
def checkUrls (config):
""" checkUrls gets a complete configuration object as parameter where all
runtime-dependent options are stored.
If you call checkUrls more than once, you can specify different

View file

@ -33,6 +33,9 @@ For single-letter option arguments the space is not a necessity. So
'-o colored' is the same as '-ocolored'.
-a, --anchors
Check HTTP anchor references. Default is don't check anchors.
-C, --cookies
Accept and send HTTP cookies according to RFC 2109. See the
FAQ for security-related questions about cookies.
-d, --denyallow
Swap checking order to extern/intern. Default checking order
is intern/extern.
@ -137,11 +140,11 @@ o Local files and syntactic sugar on the command line:
linkchecker -r0 ftp.linux.org
""")
def printVersion():
def printVersion ():
print linkcheck.Config.AppInfo
sys.exit(0)
def printHelp():
def printHelp ():
if os.name!='posix':
StringUtil.paginate(Usage+"\n"+Notes+"\n"+Examples)
else:
@ -150,7 +153,7 @@ def printHelp():
print Examples
sys.exit(0)
def printUsage(msg):
def printUsage (msg):
sys.stderr.write(linkcheck._("Error: %s\n") % msg)
sys.stderr.write(linkcheck._("Execute 'linkchecker -h' for help\n"))
sys.exit(1)
@ -160,9 +163,10 @@ def printUsage(msg):
try:
# Note: cut out the name of the script
options, args = getopt.getopt(sys.argv[1:],
"adDe:f:F:hIi:N:o:p:P:qr:Rst:u:VvwW:", # short options
"adCDe:f:F:hIi:N:o:p:P:qr:Rst:u:VvwW:", # short options
["anchors", # long options
"config=",
"cookies",
"debug",
"extern=",
"file-output=",
@ -310,6 +314,9 @@ for opt,arg in options:
config["warningregex"] = re.compile(arg)
config["warnings"] = 1
elif opt=="-C" or opt=="--cookies":
config['cookies'] = 1
if constructauth:
config["authentication"].insert(0, {'pattern': re.compile(".*"),
'user': _user,
@ -337,7 +344,7 @@ for url in args:
url = "ftp://"+url
elif re.compile("^www\.").match(url):
url = "http://"+url
config.appendUrl(linkcheck.UrlData.GetUrlDataFrom(url, 0))
config.appendUrl(linkcheck.UrlData.GetUrlDataFrom(url, 0, config))
# check the urls
linkcheck.checkUrls(config)

View file

@ -1,4 +1,4 @@
import os, sys
import os
import linkcheck
config = linkcheck.Config.Configuration()
config['recursionlevel'] = 1
@ -9,5 +9,5 @@ config.disableThreading()
htmldir = "test/html"
for file in ('base1.html',):
url = os.path.join(htmldir, file)
config.appendUrl(linkcheck.UrlData.GetUrlDataFrom(url, 0))
config.appendUrl(linkcheck.UrlData.GetUrlDataFrom(url, 0, config))
linkcheck.checkUrls(config)

View file

@ -1,4 +1,4 @@
import os, sys
import os
import linkcheck
config = linkcheck.Config.Configuration()
config['recursionlevel'] = 1
@ -9,5 +9,5 @@ config.disableThreading()
htmldir = "test/html"
for file in ('file.html',"file.txt","file.asc"):
url = os.path.join(htmldir, file)
config.appendUrl(linkcheck.UrlData.GetUrlDataFrom(url, 0))
config.appendUrl(linkcheck.UrlData.GetUrlDataFrom(url, 0, config))
linkcheck.checkUrls(config)

View file

@ -1,4 +1,4 @@
import os, sys
import os
import linkcheck
config = linkcheck.Config.Configuration()
config['recursionlevel'] = 1
@ -9,5 +9,5 @@ config.disableThreading()
htmldir = "test/html"
for file in ('frames.html',):
url = os.path.join(htmldir, file)
config.appendUrl(linkcheck.UrlData.GetUrlDataFrom(url, 0))
config.appendUrl(linkcheck.UrlData.GetUrlDataFrom(url, 0, config))
linkcheck.checkUrls(config)

View file

@ -1,4 +1,4 @@
import os, sys
import os
import linkcheck
config = linkcheck.Config.Configuration()
config['recursionlevel'] = 1
@ -9,5 +9,5 @@ config.disableThreading()
htmldir = "test/html"
for file in ('ftp.html',):
url = os.path.join(htmldir, file)
config.appendUrl(linkcheck.UrlData.GetUrlDataFrom(url, 0))
config.appendUrl(linkcheck.UrlData.GetUrlDataFrom(url, 0, config))
linkcheck.checkUrls(config)

View file

@ -1,4 +1,4 @@
import os, sys
import os
import linkcheck
config = linkcheck.Config.Configuration()
config['recursionlevel'] = 1
@ -9,5 +9,5 @@ config.disableThreading()
htmldir = "test/html"
for file in ('http.html',):
url = os.path.join(htmldir, file)
config.appendUrl(linkcheck.UrlData.GetUrlDataFrom(url, 0))
config.appendUrl(linkcheck.UrlData.GetUrlDataFrom(url, 0, config))
linkcheck.checkUrls(config)

View file

@ -1,4 +1,4 @@
import os, sys
import os
import linkcheck
config = linkcheck.Config.Configuration()
config['recursionlevel'] = 1
@ -9,5 +9,5 @@ config.disableThreading()
htmldir = "test/html"
for file in ('https.html',):
url = os.path.join(htmldir, file)
config.appendUrl(linkcheck.UrlData.GetUrlDataFrom(url, 0))
config.appendUrl(linkcheck.UrlData.GetUrlDataFrom(url, 0, config))
linkcheck.checkUrls(config)

View file

@ -1,4 +1,4 @@
import os, sys
import os
import linkcheck
config = linkcheck.Config.Configuration()
config['recursionlevel'] = 1
@ -9,5 +9,5 @@ config.disableThreading()
htmldir = "test/html"
for file in ('mail.html',):
url = os.path.join(htmldir, file)
config.appendUrl(linkcheck.UrlData.GetUrlDataFrom(url, 0))
config.appendUrl(linkcheck.UrlData.GetUrlDataFrom(url, 0, config))
linkcheck.checkUrls(config)

View file

@ -1,4 +1,4 @@
import os, sys
import os
import linkcheck
config = linkcheck.Config.Configuration()
config['recursionlevel'] = 1
@ -9,5 +9,5 @@ config.disableThreading()
htmldir = "test/html"
for file in ('misc.html',):
url = os.path.join(htmldir, file)
config.appendUrl(linkcheck.UrlData.GetUrlDataFrom(url, 0))
config.appendUrl(linkcheck.UrlData.GetUrlDataFrom(url, 0, config))
linkcheck.checkUrls(config)

View file

@ -1,4 +1,4 @@
import os, sys
import os
import linkcheck
config = linkcheck.Config.Configuration()
config['recursionlevel'] = 1
@ -9,5 +9,5 @@ config.disableThreading()
htmldir = "test/html"
for file in ('news.html',):
url = os.path.join(htmldir, file)
config.appendUrl(linkcheck.UrlData.GetUrlDataFrom(url, 0))
config.appendUrl(linkcheck.UrlData.GetUrlDataFrom(url, 0, config))
linkcheck.checkUrls(config)