git-svn-id: https://linkchecker.svn.sourceforge.net/svnroot/linkchecker/trunk/linkchecker@1367 e7d03fd6-7b0d-0410-9947-9c21f3af8025
This commit is contained in:
calvin 2004-07-19 09:02:52 +00:00
parent 1fa1bc6287
commit 6d8ae43f37
11 changed files with 0 additions and 1687 deletions

View file

@ -1,149 +0,0 @@
# -*- coding: iso-8859-1 -*-
"""Handle local file: links"""
# Copyright (C) 2000-2004 Bastian Kleineidam
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
import re
import os
import urlparse
import linkcheck
# OSError is thrown on Windows when a file is not found
linkcheck.UrlData.ExcList.append(OSError)
# if file extension was fruitless, look at the content
contents = {
"html": re.compile(r'(?i)<html>.*</html>'),
"opera" : re.compile(r'Opera Hotlist'),
# "text" : re.compile(r'[\w\s]+'),
}
_schemes = r"""(
acap # application configuration access protocol
|afs # Andrew File System global file names
|cid # content identifier
|data # data
|dav # dav
|fax # fax
|imap # internet message access protocol
|ldap # Lightweight Directory Access Protocol
|mailserver # Access to data available from mail servers
|mid # message identifier
|modem # modem
|nfs # network file system protocol
|opaquelocktoken # opaquelocktoken
|pop # Post Office Protocol v3
|prospero # Prospero Directory Service
|rtsp # real time streaming protocol
|service # service location
|sip # session initiation protocol
|tel # telephone
|tip # Transaction Internet Protocol
|tn3270 # Interactive 3270 emulation sessions
|vemmi # versatile multimedia interface
|wais # Wide Area Information Servers
|z39\.50r # Z39.50 Retrieval
|z39\.50s # Z39.50 Session
|chrome # Mozilla specific
|find # Mozilla specific
|clsid # Microsoft specific
|javascript # JavaScript
|isbn # ISBN (int. book numbers)
|https? # HTTP/HTTPS
|ftp # FTP
|file # local file
|telnet # telnet
|mailto # mailto
|gopher # gopher
|s?news # news
|nntp # news
)"""
class FileUrlData (linkcheck.UrlData.UrlData):
"Url link with file scheme"
def __init__ (self,
urlName,
config,
recursionLevel,
parentName = None,
baseRef = None, line=0, column=0, name=""):
super(FileUrlData, self).__init__(urlName, config, recursionLevel,
parentName=parentName, baseRef=baseRef,
line=line, column=column, name=name)
if not (parentName or baseRef or self.urlName.startswith("file:")):
self.urlName = os.path.expanduser(self.urlName)
if not self.urlName.startswith("/"):
self.urlName = os.getcwd()+"/"+self.urlName
self.urlName = "file://"+self.urlName
self.urlName = self.urlName.replace("\\", "/")
# transform c:/windows into /c|/windows
self.urlName = re.sub(r"^file://(/?)([a-zA-Z]):", r"file:///\2|",
self.urlName)
def buildUrl (self):
super(FileUrlData, self).buildUrl()
# ignore query and fragment url parts for filesystem urls
self.urlparts[3] = self.urlparts[4] = ''
self.url = urlparse.urlunsplit(self.urlparts)
def getCacheKeys (self):
# the host in urlparts is lowercase()d
if self.urlparts:
self.urlparts[4] = self.anchor
key = urlparse.urlunsplit(self.urlparts)
self.urlparts[4] = ''
return [key]
return []
def isHtml (self):
if linkcheck.extensions['html'].search(self.url):
return True
if contents['html'].search(self.getContent()[:20]):
return True
return False
def isFile (self):
return True
def isParseable (self):
# guess by extension
for ro in linkcheck.extensions.values():
if ro.search(self.url):
return True
# try to read content (can fail, so catch error)
try:
for ro in contents.values():
if ro.search(self.getContent()[:20]):
return True
except IOError:
pass
return False
def parseUrl (self):
for key, ro in linkcheck.extensions.items():
if ro.search(self.url):
return getattr(self, "parse_"+key)()
for key, ro in contents.items():
if ro.search(self.getContent()[:20]):
return getattr(self, "parse_"+key)()
return None

View file

@ -1,133 +0,0 @@
# -*- coding: iso-8859-1 -*-
"""Handle FTP links"""
# Copyright (C) 2000-2004 Bastian Kleineidam
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
import ftplib
import linkcheck
from linkcheck.debug import *
linkcheck.UrlData.ExcList.extend([
ftplib.error_reply,
ftplib.error_temp,
ftplib.error_perm,
ftplib.error_proto,
])
class FtpUrlData (linkcheck.ProxyUrlData.ProxyUrlData):
"""
Url link with ftp scheme.
"""
def checkConnection (self):
# proxy support (we support only http)
self.setProxy(self.config["proxy"].get(self.scheme))
if self.proxy:
http = linkcheck.HttpUrlData.HttpUrlData(self.urlName,
self.recursionLevel,
self.config,
parentName=self.parentName,
baseRef=self.baseRef,
line=self.line,
column=self.column,
name=self.name)
http.buildUrl()
return http.check()
# using no proxy here
# get login credentials
if self.userinfo:
_user, _password = urllib.splitpasswd(self.userinfo)
else:
_user, _password = self.getUserPassword()
if _user is None or _password is None:
raise linkcheck.LinkCheckerError(linkcheck.i18n._("No user or password found"))
self.login(_user, _password)
filename = self.cwd()
if filename:
self.retrieve(filename)
return None
def isHtml (self):
if linkcheck.extensions['html'].search(self.url):
return True
return False
def isParseable (self):
for ro in linkcheck.extensions.values():
if ro.search(self.url):
return True
return False
def parseUrl (self):
for key,ro in linkcheck.extensions.items():
if ro.search(self.url):
return getattr(self, "parse_"+key)()
return None
def login (self, _user, _password):
"""log into ftp server and check the welcome message"""
# ready to connect
try:
self.urlConnection = ftplib.FTP()
self.urlConnection.set_debuglevel(get_debuglevel())
self.urlConnection.connect(self.urlparts[1])
self.urlConnection.login(_user, _password)
except EOFError:
raise linkcheck.LinkCheckerError(linkcheck.i18n._("Remote host has closed connection"))
if not self.urlConnection.getwelcome():
self.closeConnection()
raise linkcheck.LinkCheckerError(linkcheck.i18n._("Got no answer from FTP server"))
# dont set info anymore, this may change every time we logged in
#self.setInfo(info)
def cwd (self):
"""change directory to given path"""
# leeched from webcheck
dirs = self.urlparts[2].split('/')
filename = dirs.pop()
if len(dirs) and not dirs[0]: del dirs[0]
for d in dirs:
self.urlConnection.cwd(d)
return filename
def retrieve (self, filename):
"""initiate download of given filename"""
# it could be a directory if the trailing slash was forgotten
try:
self.urlConnection.cwd(filename)
self.setWarning(linkcheck.i18n._("Missing trailing directory slash in ftp url"))
return
except ftplib.error_perm:
pass
self.urlConnection.voidcmd('TYPE I')
conn, size = self.urlConnection.ntransfercmd('RETR %s'%filename)
if size:
self.dlsize = size
# dont download data XXX recursion
#page = conn.makefile().read(size)
#else:
# page = conn.makefile().read()
def closeConnection (self):
try: self.urlConnection.closet()
except: pass
self.urlConnection = None

View file

@ -1,23 +0,0 @@
# -*- coding: iso-8859-1 -*-
"""Handle Gopher links"""
# Copyright (C) 2000-2004 Bastian Kleineidam
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
import linkcheck
class GopherUrlData (linkcheck.UrlData.UrlData):
"Url link with gopher scheme"
pass

View file

@ -1,46 +0,0 @@
# -*- coding: iso-8859-1 -*-
"""Base handle for links with a hostname"""
# Copyright (C) 2000-2004 Bastian Kleineidam
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
import socket
import urllib
import linkcheck
class HostCheckingUrlData (linkcheck.UrlData.UrlData):
"Url link for which we have to connect to a specific host"
def __init__ (self, urlName, recursionLevel, config, parentName=None,
baseRef=None, line=0, column=0, name=""):
super(HostCheckingUrlData, self).__init__(urlName, recursionLevel,
config, parentName=parentName, baseRef=baseRef,
line=line, column=column, name=name)
self.host = None
self.url = urllib.unquote(self.urlName)
def buildUrl (self):
# to avoid anchor checking
self.urlparts = None
def getCacheKeys (self):
return ["%s:%s" % (self.scheme, self.host)]
def checkConnection (self):
ip = socket.gethostbyname(self.host)
self.setValid(self.host+"("+ip+") "+linkcheck.i18n._("found"))

View file

@ -1,449 +0,0 @@
# -*- coding: iso-8859-1 -*-
"""Handle http links"""
# Copyright (C) 2000-2004 Bastian Kleineidam
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
import urlparse
import sys
import time
import re
import zlib
import gzip
import socket
import cStringIO as StringIO
import linkcheck
from linkcheck.debug import *
supportHttps = hasattr(linkcheck.httplib2, "HTTPSConnection") and \
hasattr(socket, "ssl")
linkcheck.UrlData.ExcList.extend([linkcheck.httplib2.error,])
_supported_encodings = ('gzip', 'x-gzip', 'deflate')
# Amazon blocks all HEAD requests
_isAmazonHost = re.compile(r'^www\.amazon\.(com|de|ca|fr|co\.(uk|jp))').search
class HttpUrlData (linkcheck.ProxyUrlData.ProxyUrlData):
"Url link with http scheme"
def __init__ (self, urlName, recursionLevel, config, parentName=None,
baseRef=None, line=0, column=0, name=""):
super(HttpUrlData, self).__init__(urlName, recursionLevel, config,
parentName=parentName, baseRef=baseRef, line=line,
column=column, name=name)
self.aliases = []
self.max_redirects = 5
self.has301status = False
self.no_anchor = False # remove anchor in request url
def buildUrl (self):
super(HttpUrlData, self).buildUrl()
# encode userinfo
# XXX
# check for empty paths
if not self.urlparts[2]:
self.setWarning(linkcheck.i18n._("URL path is empty, assuming '/' as path"))
self.urlparts[2] = '/'
self.url = urlparse.urlunsplit(self.urlparts)
def checkConnection (self):
"""
Check a URL with HTTP protocol.
Here is an excerpt from RFC 1945 with common response codes:
The first digit of the Status-Code defines the class of response. The
last two digits do not have any categorization role. There are 5
values for the first digit:
o 1xx: Informational - Not used, but reserved for future use
o 2xx: Success - The action was successfully received,
understood, and accepted.
o 3xx: Redirection - Further action must be taken in order to
complete the request
o 4xx: Client Error - The request contains bad syntax or cannot
be fulfilled
o 5xx: Server Error - The server failed to fulfill an apparently
valid request
The individual values of the numeric status codes defined for
HTTP/1.0, and an example set of corresponding Reason-Phrase's, are
presented below. The reason phrases listed here are only recommended
-- they may be replaced by local equivalents without affecting the
protocol. These codes are fully defined in Section 9.
Status-Code = "200" ; OK
| "201" ; Created
| "202" ; Accepted
| "204" ; No Content
| "301" ; Moved Permanently
| "302" ; Moved Temporarily
| "304" ; Not Modified
| "305" ; Use Proxy
| "400" ; Bad Request
| "401" ; Unauthorized
| "403" ; Forbidden
| "404" ; Not Found
| "405" ; Method not allowed
| "407" ; Proxy Authentication Required
| "500" ; Internal Server Error
| "501" ; Not Implemented
| "502" ; Bad Gateway
| "503" ; Service Unavailable
| extension-code
"""
# set the proxy, so a 407 status after this is an error
self.setProxy(self.config["proxy"].get(self.scheme))
if self.proxy:
self.setInfo(linkcheck.i18n._("Using Proxy %r")%self.proxy)
self.headers = None
self.auth = None
self.cookies = []
if not self.robotsTxtAllowsUrl():
self.setWarning(linkcheck.i18n._("Access denied by robots.txt, checked only syntax"))
return
if _isAmazonHost(self.urlparts[1]):
self.setWarning(linkcheck.i18n._("Amazon servers block HTTP HEAD requests, "
"using GET instead"))
self.method = "GET"
else:
# first try with HEAD
self.method = "HEAD"
fallback_GET = False
redirectCache = [self.url]
while True:
try:
response = self._getHttpResponse()
except linkcheck.httplib2.BadStatusLine:
# some servers send empty HEAD replies
if self.method=="HEAD":
self.method = "GET"
redirectCache = [self.url]
fallback_GET = True
continue
raise
self.headers = response.msg
debug(BRING_IT_ON, response.status, response.reason, self.headers)
# proxy enforcement (overrides standard proxy)
if response.status == 305 and self.headers:
oldproxy = (self.proxy, self.proxyauth)
self.setProxy(self.headers.getheader("Location"))
self.setInfo(linkcheck.i18n._("Enforced Proxy %r")%self.proxy)
response = self._getHttpResponse()
self.headers = response.msg
self.proxy, self.proxyauth = oldproxy
# follow all redirections
tries, response = self.followRedirections(response, redirectCache)
if tries == -1:
# already handled
return
if tries >= self.max_redirects:
if self.method=="HEAD":
# Microsoft servers tend to recurse HEAD requests
self.method = "GET"
redirectCache = [self.url]
fallback_GET = True
continue
self.setError(linkcheck.i18n._("more than %d redirections, aborting")%self.max_redirects)
return
# user authentication
if response.status == 401:
if not self.auth:
import base64
_user, _password = self.getUserPassword()
self.auth = "Basic "+\
base64.encodestring("%s:%s" % (_user, _password))
debug(BRING_IT_ON, "Authentication", _user, "/", _password)
continue
elif response.status >= 400:
if self.headers and self.urlparts[4]:
self.no_anchor = True
continue
if self.method=="HEAD":
# fall back to GET
self.method = "GET"
redirectCache = [self.url]
fallback_GET = True
continue
elif self.headers and self.method!="GET":
# test for HEAD support
mime = self.headers.gettype()
poweredby = self.headers.get('X-Powered-By', '')
server = self.headers.get('Server', '')
if mime=='application/octet-stream' and \
(poweredby.startswith('Zope') or \
server.startswith('Zope')):
self.setWarning(linkcheck.i18n._("Zope Server cannot determine"
" MIME type with HEAD, falling back to GET"))
self.method = "GET"
continue
break
# check url warnings
effectiveurl = urlparse.urlunsplit(self.urlparts)
if self.url != effectiveurl:
self.setWarning(linkcheck.i18n._("Effective URL %s") % effectiveurl)
self.url = effectiveurl
# check response
self.checkResponse(response, fallback_GET)
def followRedirections (self, response, redirectCache):
"""follow all redirections of http response"""
redirected = self.url
tries = 0
while response.status in [301,302] and self.headers and \
tries < self.max_redirects:
newurl = self.headers.getheader("Location",
self.headers.getheader("Uri", ""))
redirected = linkcheck.url.url_norm(urlparse.urljoin(redirected, newurl))
# note: urlparts has to be a list
self.urlparts = list(urlparse.urlsplit(redirected))
# check internal redirect cache to avoid recursion
if redirected in redirectCache:
redirectCache.append(redirected)
if self.method == "HEAD":
# Microsoft servers tend to recurse HEAD requests
# fall back to the original url and use GET
self.urlparts = list(urlparse.urlsplit(self.url))
return self.max_redirects, response
self.setError(
linkcheck.i18n._("recursive redirection encountered:\n %s") % \
"\n => ".join(redirectCache))
return -1, response
redirectCache.append(redirected)
# remember this alias
if response.status == 301:
if not self.has301status:
self.setWarning(linkcheck.i18n._("HTTP 301 (moved permanent) encountered: you "
"should update this link."))
if not (self.url.endswith('/') or self.url.endswith('.html')):
self.setWarning(linkcheck.i18n._("A HTTP 301 redirection occured and the url has no "
"trailing / at the end. All urls which point to (home) "
"directories should end with a / to avoid redirection."))
self.has301status = True
self.aliases.append(redirected)
# check cache again on possibly changed URL
key = self.getCacheKey()
if self.config.urlCache_has_key(key):
self.copyFromCache(self.config.urlCache_get(key))
self.cached = True
self.logMe()
return -1, response
# check if we still have a http url, it could be another
# scheme, eg https or news
if self.urlparts[0]!="http":
self.setWarning(linkcheck.i18n._("HTTP redirection to non-http url encountered; "
"the original url was %r.")%self.url)
# make new UrlData object
newobj = linkcheck.UrlData.GetUrlDataFrom(redirected, self.recursionLevel, self.config,
parentName=self.parentName, baseRef=self.baseRef,
line=self.line, column=self.column, name=self.name)
newobj.warningString = self.warningString
newobj.infoString = self.infoString
# append new object to queue
self.config.appendUrl(newobj)
# pretend to be finished and logged
self.cached = True
return -1, response
# new response data
response = self._getHttpResponse()
self.headers = response.msg
debug(BRING_IT_ON, "Redirected", self.headers)
tries += 1
return tries, response
def checkResponse (self, response, fallback_GET):
"""check final result"""
if response.status >= 400:
self.setError("%r %s"%(response.status, response.reason))
else:
if self.headers and self.headers.has_key("Server"):
server = self.headers['Server']
else:
server = linkcheck.i18n._("unknown")
if fallback_GET:
self.setWarning(linkcheck.i18n._("Server %r did not support HEAD request, used GET for checking")%server)
if self.no_anchor:
self.setWarning(linkcheck.i18n._("Server %r had no anchor support, removed anchor from request")%server)
if response.status == 204:
# no content
self.setWarning(response.reason)
# store cookies for valid links
if self.config['cookies']:
for c in self.cookies:
self.setInfo("Cookie: %s"%c)
out = self.config.storeCookies(self.headers, self.urlparts[1])
for h in out:
self.setInfo(h)
if response.status >= 200:
self.setValid("%r %s"%(response.status,response.reason))
else:
self.setValid("OK")
modified = self.headers.get('Last-Modified', '')
if modified:
self.setInfo(linkcheck.i18n._("Last modified %s") % modified)
def getCacheKeys (self):
keys = super(HttpUrlData, self).getCacheKeys()
keys.extend(self.aliases)
return keys
def _getHttpResponse (self):
"""Put request and return (status code, status text, mime object).
host can be host:port format
"""
if self.proxy:
host = self.proxy
scheme = "http"
else:
host = self.urlparts[1]
scheme = self.urlparts[0]
debug(HURT_ME_PLENTY, "host", host)
if self.urlConnection:
self.closeConnection()
self.urlConnection = self.getHTTPObject(host, scheme)
# quote url before submit
url = linkcheck.url.url_quote(urlparse.urlunsplit(self.urlparts))
qurlparts = list(urlparse.urlsplit(url))
if self.no_anchor:
qurlparts[4] = ''
if self.proxy:
path = urlparse.urlunsplit(qurlparts)
else:
path = urlparse.urlunsplit(('', '', qurlparts[2],
qurlparts[3], qurlparts[4]))
self.urlConnection.putrequest(self.method, path, skip_host=True)
self.urlConnection.putheader("Host", host)
# userinfo is from http://user@pass:host/
if self.userinfo:
self.urlConnection.putheader("Authorization", self.userinfo)
# auth is the -u and -p configuration options
elif self.auth:
self.urlConnection.putheader("Authorization", self.auth)
if self.proxyauth:
self.urlConnection.putheader("Proxy-Authorization",
self.proxyauth)
if self.parentName:
self.urlConnection.putheader("Referer", self.parentName)
self.urlConnection.putheader("User-Agent", linkcheck.Config.UserAgent)
self.urlConnection.putheader("Accept-Encoding", "gzip;q=1.0, deflate;q=0.9, identity;q=0.5")
if self.config['cookies']:
self.cookies = self.config.getCookies(self.urlparts[1],
self.urlparts[2])
for c in self.cookies:
self.urlConnection.putheader("Cookie", c)
self.urlConnection.endheaders()
return self.urlConnection.getresponse()
def getHTTPObject (self, host, scheme):
if scheme=="http":
h = linkcheck.httplib2.HTTPConnection(host)
elif scheme=="https":
h = linkcheck.httplib2.HTTPSConnection(host)
else:
raise linkcheck.LinkCheckerError, "invalid url scheme %s" % scheme
h.set_debuglevel(get_debuglevel())
h.connect()
return h
def getContent (self):
if not self.has_content:
self.method = "GET"
self.has_content = True
self.closeConnection()
t = time.time()
response = self._getHttpResponse()
self.headers = response.msg
self.data = response.read()
encoding = self.headers.get("Content-Encoding")
if encoding in _supported_encodings:
try:
if encoding == 'deflate':
f = StringIO.StringIO(zlib.decompress(self.data))
else:
f = gzip.GzipFile('', 'rb', 9, StringIO.StringIO(self.data))
except zlib.error:
f = StringIO.StringIO(self.data)
self.data = f.read()
self.downloadtime = time.time() - t
return self.data
def isHtml (self):
if not (self.valid and self.headers):
return False
if self.headers.gettype()[:9]!="text/html":
return False
encoding = self.headers.get("Content-Encoding")
if encoding and encoding not in _supported_encodings and \
encoding!='identity':
self.setWarning(linkcheck.i18n._('Unsupported content encoding %r.')%encoding)
return False
return True
def isHttp (self):
return True
def getContentType (self):
ptype = self.headers.get('Content-Type', 'application/octet-stream')
if ";" in ptype:
ptype = ptype.split(';')[0]
return ptype
def isParseable (self):
if not (self.valid and self.headers):
return False
if self.getContentType() not in ("text/html", "text/css"):
return False
encoding = self.headers.get("Content-Encoding")
if encoding and encoding not in _supported_encodings and \
encoding!='identity':
self.setWarning(linkcheck.i18n._('Unsupported content encoding %r.')%encoding)
return False
return True
def parseUrl (self):
ptype = self.getContentType()
if ptype=="text/html":
self.parse_html()
elif ptype=="text/css":
self.parse_css()
return None
def getRobotsTxtUrl (self):
return "%s://%s/robots.txt"%tuple(self.urlparts[0:2])
def robotsTxtAllowsUrl (self):
roboturl = self.getRobotsTxtUrl()
debug(HURT_ME_PLENTY, "robots.txt url", roboturl)
debug(HURT_ME_PLENTY, "url", self.url)
if not self.config.robotsTxtCache_has_key(roboturl):
rp = linkcheck.robotparser2.RobotFileParser()
rp.set_url(roboturl)
rp.read()
self.config.robotsTxtCache_set(roboturl, rp)
rp = self.config.robotsTxtCache_get(roboturl)
return rp.can_fetch(linkcheck.Config.UserAgent, self.url)

View file

@ -1,31 +0,0 @@
# -*- coding: iso-8859-1 -*-
"""Handle https links"""
# Copyright (C) 2000-2004 Bastian Kleineidam
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
import linkcheck
from linkcheck.debug import *
class HttpsUrlData (linkcheck.HttpUrlData.HttpUrlData):
"""Url link with https scheme"""
def _check (self):
if linkcheck.HttpUrlData.supportHttps:
super(HttpsUrlData, self)._check()
else:
self.setWarning(linkcheck.i18n._("%s url ignored")%self.scheme.capitalize())
self.logMe()

View file

@ -1,66 +0,0 @@
# -*- coding: iso-8859-1 -*-
"""Handle for uncheckable application-specific links"""
# Copyright (C) 2001-2004 Bastian Kleineidam
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
import re
import linkcheck
ignored_schemes = r"""^(
acap # application configuration access protocol
|afs # Andrew File System global file names
|cid # content identifier
|data # data
|dav # dav
|fax # fax
|imap # internet message access protocol
|ldap # Lightweight Directory Access Protocol
|mailserver # Access to data available from mail servers
|mid # message identifier
|modem # modem
|nfs # network file system protocol
|opaquelocktoken # opaquelocktoken
|pop # Post Office Protocol v3
|prospero # Prospero Directory Service
|rtsp # real time streaming protocol
|service # service location
|sip # session initiation protocol
|tel # telephone
|tip # Transaction Internet Protocol
|tn3270 # Interactive 3270 emulation sessions
|vemmi # versatile multimedia interface
|wais # Wide Area Information Servers
|z39\.50r # Z39.50 Retrieval
|z39\.50s # Z39.50 Session
|chrome # Mozilla specific
|find # Mozilla specific
|clsid # Microsoft specific
|javascript # JavaScript
|isbn # ISBN (int. book numbers)
):"""
ignored_schemes_re = re.compile(ignored_schemes, re.VERBOSE)
class IgnoredUrlData (linkcheck.UrlData.UrlData):
"""Some schemes are defined in http://www.w3.org/Addressing/schemes"""
def _check (self):
self.setWarning(linkcheck.i18n._("%s url ignored")%self.scheme.capitalize())
self.logMe()
def hasContent (self):
return False

View file

@ -1,101 +0,0 @@
# -*- coding: iso-8859-1 -*-
"""Handle nntp: and news: links"""
# Copyright (C) 2000-2004 Bastian Kleineidam
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
import re, time, sys, nntplib, urlparse, random
from linkcheck import linkcheck.LinkCheckerError, Config
from UrlData import ExcList, UrlData
from debug import *
random.seed()
ExcList.extend([nntplib.error_reply,
nntplib.error_temp,
nntplib.error_perm,
nntplib.error_proto,
EOFError,
])
class NntpUrlData (UrlData):
"Url link with NNTP scheme"
def buildUrl (self):
# use nntp instead of news to comply with the unofficial internet
# draft of Alfred Gilman which unifies (s)news and nntp URLs
# note: we use this only internally (for parsing and caching)
if self.urlName[:4].lower()=='news':
self.url = 'nntp'+self.urlName[4:]
else:
self.url = self.urlName
self.urlparts = urlparse.urlsplit(self.url)
Config.debug(BRING_IT_ON, self.urlparts)
def checkConnection (self):
nntpserver = self.urlparts[1] or self.config["nntpserver"]
if not nntpserver:
self.setWarning(linkcheck.i18n._("No NNTP server specified, skipping this URL"))
return
nntp = self._connectNntp(nntpserver)
group = self.urlparts[2]
while group[:1]=='/':
group = group[1:]
if '@' in group:
# request article
resp,number,mid = nntp.stat("<"+group+">")
self.setInfo(linkcheck.i18n._('Articel number %s found') % number)
else:
# split off trailing articel span
group = group.split('/',1)[0]
if group:
# request group info
resp,count,first,last,name = nntp.group(group)
self.setInfo(linkcheck.i18n._("Group %s has %s articles, range %s to %s") %\
(name, count, first, last))
else:
# group name is the empty string
self.setWarning(linkcheck.i18n._("No newsgroup specified in NNTP URL"))
def _connectNntp (self, nntpserver):
"""This is done only once per checking task. Also, the newly
introduced error codes 504 and 505 (both inclining "Too busy, retry
later", are caught."""
tries = 0
nntp = value = None
while tries < 5:
tries += 1
try:
nntp=nntplib.NNTP(nntpserver)
except nntplib.error_perm:
value = sys.exc_info()[1]
if re.compile("^50[45]").search(str(value)):
time.sleep(random.randrange(10,30))
else:
raise
if nntp is None:
raise linkcheck.LinkCheckerError(linkcheck.i18n._("NTTP server too busy; tried more than %d times")%tries)
if value is not None:
self.setWarning(linkcheck.i18n._("NNTP busy: %s")%str(value))
return nntp
def getCacheKeys (self):
return [self.url]
def hasContent (self):
return False

View file

@ -1,39 +0,0 @@
# -*- coding: iso-8859-1 -*-
# Copyright (C) 2000-2004 Bastian Kleineidam
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
from UrlData import UrlData
from urllib import splittype, splithost, splituser
class ProxyUrlData (UrlData):
"""urldata with ability for proxying and for urls with user:pass@host
setting"""
def setProxy (self, proxy):
self.proxy = proxy
self.proxyauth = None
if self.proxy:
if self.proxy[:7].lower() != "http://":
self.proxy = "http://"+self.proxy
self.proxy = splittype(self.proxy)[1]
self.proxy = splithost(self.proxy)[0]
self.proxyauth, self.proxy = splituser(self.proxy)
if self.proxyauth is not None:
if ":" not in self.proxyauth: self.proxyauth += ":"
import base64
self.proxyauth = base64.encodestring(self.proxyauth).strip()
self.proxyauth = "Basic "+self.proxyauth

View file

@ -1,64 +0,0 @@
# -*- coding: iso-8859-1 -*-
"""Handle telnet: links"""
# Copyright (C) 2000-2004 Bastian Kleineidam
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
import telnetlib
import urlparse
import urllib
import linkcheck
from linkcheck.debug import *
class TelnetUrlData (linkcheck.HostCheckingUrlData.HostCheckingUrlData):
"Url link with telnet scheme"
def buildUrl (self):
super(TelnetUrlData, self).buildUrl()
parts = urlparse.urlsplit(self.url)
userinfo, self.host = urllib.splituser(parts[1])
self.host, self.port = urllib.splitport(self.host)
if self.port is not None:
if not linkcheck.UrlData.is_valid_port(self.port):
raise linkcheck.LinkCheckerError(linkcheck.i18n._("URL has invalid port number %s")\
% self.port)
self.port = int(self.port)
else:
self.port = 23
if userinfo:
self.user, self.password = urllib.splitpasswd(userinfo)
else:
self.user, self.password = self.getUserPassword()
def checkConnection (self):
super(TelnetUrlData, self).checkConnection()
self.urlConnection = telnetlib.Telnet()
self.urlConnection.set_debuglevel(get_debuglevel())
self.urlConnection.open(self.host, self.port)
if self.user:
self.urlConnection.read_until("login: ", 10)
self.urlConnection.write(self.user+"\n")
if self.password:
self.urlConnection.read_until("Password: ", 10)
self.urlConnection.write(self.password+"\n")
# XXX how to tell if we are logged in??
self.urlConnection.write("exit\n")
def hasContent (self):
return False

View file

@ -1,586 +0,0 @@
# -*- coding: iso-8859-1 -*-
"""Base URL handler"""
# Copyright (C) 2000-2004 Bastian Kleineidam
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
import sys
import re
import urlparse
import urllib2
import urllib
import time
import traceback
import socket
import select
import linkcheck
import linkcheck.DNS
ws_at_start_or_end = re.compile(r"(^\s+)|(\s+$)").search
# helper function for internal errors
def internal_error ():
print >>sys.stderr, linkcheck.i18n._("""\n********** Oops, I did it again. *************
You have found an internal error in LinkChecker. Please write a bug report
at http://sourceforge.net/tracker/?func=add&group_id=1913&atid=101913
or send mail to %s and include the following information:
1) The URL or file you are testing
2) Your commandline arguments and/or configuration.
3) The system information below.
If you disclose some information because its too private to you thats ok.
I will try to help you nontheless (but you have to give me *something*
I can work with ;).
""") % linkcheck.Config.Email
etype, value = sys.exc_info()[:2]
print >>sys.stderr, etype, value
traceback.print_exc()
print_app_info()
print >>sys.stderr, linkcheck.i18n._("\n******** LinkChecker internal error, bailing out ********")
sys.exit(1)
def print_app_info ():
import os
print >>sys.stderr, linkcheck.i18n._("System info:")
print >>sys.stderr, linkcheck.Config.App
print >>sys.stderr, "Python %s on %s" % (sys.version, sys.platform)
for key in ("LC_ALL", "LC_MESSAGES", "http_proxy", "ftp_proxy"):
value = os.getenv(key)
if value is not None:
print >>sys.stderr, key, "=", repr(value)
def get_absolute_url (urlName, baseRef, parentName):
"""Search for the absolute url to detect the link type. This does not
join any url fragments together! Returns the url in lower case to
simplify urltype matching."""
if urlName and ":" in urlName:
return urlName.lower()
elif baseRef and ":" in baseRef:
return baseRef.lower()
elif parentName and ":" in parentName:
return parentName.lower()
return ""
# we catch these exceptions, all other exceptions are internal
# or system errors
ExcList = [
IOError,
ValueError, # from httplib.py
linkcheck.LinkCheckerError,
linkcheck.DNS.Error,
socket.timeout,
socket.error,
select.error,
]
if hasattr(socket, "sslerror"):
ExcList.append(socket.sslerror)
# regular expression for port numbers
is_valid_port = re.compile(r"\d+").match
class UrlData (object):
"Representing a URL with additional information like validity etc"
def __init__ (self,
urlName,
recursionLevel,
config,
parentName = None,
baseRef = None,
line = 0,
column = 0,
name = ""):
self.urlName = urlName
self.anchor = None
self.recursionLevel = recursionLevel
self.config = config
self.parentName = parentName
self.baseRef = baseRef
self.errorString = linkcheck.i18n._("Error")
self.validString = linkcheck.i18n._("Valid")
self.warningString = None
self.infoString = None
self.valid = True
self.url = None
self.urlparts = None
self.line = line
self.column = column
self.name = name
self.dltime = -1
self.dlsize = -1
self.checktime = 0
self.cached = False
self.urlConnection = None
self.extern = (1, 0)
self.data = None
self.has_content = False
url = get_absolute_url(self.urlName, self.baseRef, self.parentName)
# assume file link if no scheme is found
self.scheme = url.split(":", 1)[0] or "file"
def setError (self, s):
self.valid = False
self.errorString = linkcheck.i18n._("Error")+": "+s
def setValid (self, s):
self.valid = True
self.validString = linkcheck.i18n._("Valid")+": "+s
def isParseable (self):
return False
def isHtml (self):
return False
def isHttp (self):
return False
def isFile (self):
return False
def setWarning (self, s):
if self.warningString:
self.warningString += "\n"+s
else:
self.warningString = s
def setInfo (self, s):
if self.infoString:
self.infoString += "\n"+s
else:
self.infoString = s
def copyFromCache (self, cacheData):
"""fill attributes from cache data"""
self.errorString = cacheData["errorString"]
self.validString = cacheData["validString"]
if self.warningString:
if cacheData["warningString"]:
self.warningString += "\n"+cacheData["warningString"]
else:
self.warningString = cacheData["warningString"]
self.infoString = cacheData["infoString"]
self.valid = cacheData["valid"]
self.dltime = cacheData["dltime"]
def getCacheData (self):
"""return all data values that should be put in the cache"""
return {"errorString": self.errorString,
"validString": self.validString,
"warningString": self.warningString,
"infoString": self.infoString,
"valid": self.valid,
"dltime": self.dltime,
}
def buildUrl (self):
if self.baseRef:
if ":" not in self.baseRef:
self.baseRef = urlparse.urljoin(self.parentName, self.baseRef)
self.url = urlparse.urljoin(self.baseRef, self.urlName)
elif self.parentName:
self.url = urlparse.urljoin(self.parentName, self.urlName)
else:
self.url = self.urlName
# unquote url
self.url = urllib.unquote(self.url)
# split into (modifiable) list
self.urlparts = list(urlparse.urlsplit(self.url))
# check userinfo@host:port syntax
self.userinfo, host = urllib.splituser(self.urlparts[1])
x, port = urllib.splitport(host)
if port is not None and not is_valid_port(port):
raise linkcheck.LinkCheckerError(linkcheck.i18n._("URL has invalid port number %r")\
% str(port))
# set host lowercase and without userinfo
self.urlparts[1] = host.lower()
# safe anchor for later checking
self.anchor = self.urlparts[4]
def logMe (self):
debug(BRING_IT_ON, "logging url")
self.config.incrementLinknumber()
if self.config["verbose"] or not self.valid or \
(self.warningString and self.config["warnings"]):
self.config.log_newUrl(self)
def check (self):
try:
self._check()
except KeyboardInterrupt:
raise
except (socket.error, select.error):
# on Unix, ctrl-c can raise
# error: (4, 'Interrupted system call')
etype, value = sys.exc_info()[:2]
if etype!=4:
raise
except linkcheck.test_support.Error:
raise
except:
internal_error()
def _check (self):
debug(BRING_IT_ON, "Checking", self)
if self.recursionLevel and self.config['wait']:
debug(BRING_IT_ON, "sleeping for", self.config['wait'], "seconds")
time.sleep(self.config['wait'])
t = time.time()
if not self.checkCache():
return
# apply filter
debug(BRING_IT_ON, "extern =", self.extern)
if self.extern[0] and (self.config["strict"] or self.extern[1]):
self.setWarning(
linkcheck.i18n._("outside of domain filter, checked only syntax"))
self.logMe()
return
# check connection
debug(BRING_IT_ON, "checking connection")
try:
self.checkConnection()
if self.cached:
return
if self.config["anchors"]:
self.checkAnchors()
except tuple(ExcList):
etype, evalue, etb = sys.exc_info()
debug(HURT_ME_PLENTY, "exception", traceback.format_tb(etb))
# make nicer error msg for unknown hosts
if isinstance(evalue, socket.error) and evalue[0]==-2:
evalue = linkcheck.i18n._('Hostname not found')
# make nicer error msg for bad status line
if isinstance(evalue, linkcheck.httplib2.BadStatusLine):
evalue = linkcheck.i18n._('Bad HTTP response %r')%str(evalue)
self.setError(str(evalue))
# check content
warningregex = self.config["warningregex"]
if warningregex and self.valid:
debug(BRING_IT_ON, "checking content")
try:
self.checkContent(warningregex)
except tuple(ExcList):
value, tb = sys.exc_info()[1:]
debug(HURT_ME_PLENTY, "exception", traceback.format_tb(tb))
self.setError(str(value))
self.checktime = time.time() - t
# check recursion
debug(BRING_IT_ON, "checking recursion")
try:
if self.allowsRecursion():
self.parseUrl()
# check content size
self.checkSize()
except tuple(ExcList):
value, tb = sys.exc_info()[1:]
debug(HURT_ME_PLENTY, "exception", traceback.format_tb(tb))
self.setError(linkcheck.i18n._("could not parse content: %r")%str(value))
# close
self.closeConnection()
self.logMe()
debug(BRING_IT_ON, "caching")
self.putInCache()
def checkSyntax (self):
debug(BRING_IT_ON, "checking syntax")
if not self.urlName or self.urlName=="":
self.setError(linkcheck.i18n._("URL is null or empty"))
self.logMe()
return False
if ws_at_start_or_end(self.urlName):
self.setError(linkcheck.i18n._("URL has whitespace at beginning or end"))
self.logMe()
return False
try:
self.buildUrl()
self.extern = self._getExtern()
except linkcheck.LinkCheckerError, msg:
self.setError(str(msg))
self.logMe()
return False
return True
def checkCache (self):
debug(BRING_IT_ON, "checking cache")
for key in self.getCacheKeys():
if self.config.urlCache_has_key(key):
self.copyFromCache(self.config.urlCache_get(key))
self.cached = True
self.logMe()
return False
return True
def closeConnection (self):
# brute force closing
if self.urlConnection is not None:
try: self.urlConnection.close()
except: pass
# release variable for garbage collection
self.urlConnection = None
def putInCache (self):
if not self.cached:
data = self.getCacheData()
for key in self.getCacheKeys():
self.config.urlCache_set(key, data)
self.config.urlSeen_set(key)
self.cached = True
def getCacheKeys (self):
key = self.getCacheKey()
if key is None:
return []
return [key]
def isCached (self):
key = self.getCacheKey()
return self.cached or self.config.urlSeen_has_key(key)
def getCacheKey (self):
# note: the host is already lowercase
if self.urlparts:
if self.config["anchorcaching"]:
# do not ignore anchor
return urlparse.urlunsplit(self.urlparts)
else:
# removed anchor from cache key
return urlparse.urlunsplit(self.urlparts[:4]+[''])
return None
def checkConnection (self):
self.urlConnection = urllib2.urlopen(self.url)
def allowsRecursion (self):
# note: test self.valid before self.isParseable()
return self.valid and \
self.isParseable() and \
self.hasContent() and \
not self.isCached() and \
(self.config["recursionlevel"] < 0 or
self.recursionLevel < self.config["recursionlevel"]) and \
not self.extern[0] and self.contentAllowsRobots()
def contentAllowsRobots (self):
if not self.isHtml():
return True
if not (self.isHttp() or self.isFile()):
return True
h = linkcheck.linkparse.MetaRobotsFinder(self.getContent())
p = bk.HtmlParser.htmlsax.parser(h)
h.parser = p
p.feed(self.getContent())
p.flush()
h.parser = None
p.handler = None
return h.follow
def checkAnchors (self):
if not (self.valid and self.anchor and self.isHtml() and \
self.hasContent()):
# do not bother
return
debug(HURT_ME_PLENTY, "checking anchor", self.anchor)
h = linkcheck.linkparse.LinkFinder(self.getContent(), tags={'a': ['name'], None: ['id']})
p = bk.HtmlParser.htmlsax.parser(h)
h.parser = p
p.feed(self.getContent())
p.flush()
h.parser = None
p.handler = None
for cur_anchor,line,column,name,base in h.urls:
if cur_anchor == self.anchor:
return
self.setWarning(linkcheck.i18n._("anchor #%s not found") % self.anchor)
def _getExtern (self):
if not (self.config["externlinks"] or self.config["internlinks"]):
return (0, 0)
# deny and allow external checking
linkcheck.Config.debug(HURT_ME_PLENTY, "Url", self.url)
if self.config["denyallow"]:
for entry in self.config["externlinks"]:
linkcheck.Config.debug(HURT_ME_PLENTY, "Extern entry", entry)
match = entry['pattern'].search(self.url)
if (entry['negate'] and not match) or \
(match and not entry['negate']):
return (1, entry['strict'])
for entry in self.config["internlinks"]:
linkcheck.Config.debug(HURT_ME_PLENTY, "Intern entry", entry)
match = entry['pattern'].search(self.url)
if (entry['negate'] and not match) or \
(match and not entry['negate']):
return (0, 0)
return (0, 0)
else:
for entry in self.config["internlinks"]:
linkcheck.Config.debug(HURT_ME_PLENTY, "Intern entry", entry)
match = entry['pattern'].search(self.url)
if (entry['negate'] and not match) or \
(match and not entry['negate']):
return (0, 0)
for entry in self.config["externlinks"]:
linkcheck.Config.debug(HURT_ME_PLENTY, "Extern entry", entry)
match = entry['pattern'].search(self.url)
if (entry['negate'] and not match) or \
(match and not entry['negate']):
return (1, entry['strict'])
return (1,0)
def hasContent (self):
"""indicate wether url getContent() can be called"""
return True
def getContent (self):
"""Precondition: urlConnection is an opened URL."""
if not self.has_content:
self.has_content = True
t = time.time()
self.data = self.urlConnection.read()
self.dltime = time.time() - t
self.dlsize = len(self.data)
return self.data
def checkContent (self, warningregex):
"""if a warning expression was given, call this function to check it
against the content of this url"""
if not self.hasContent():
return
match = warningregex.search(self.getContent())
if match:
self.setWarning(linkcheck.i18n._("Found %r in link contents")%match.group())
def checkSize (self):
"""if a maximum size was given, call this function to check it
against the content size of this url"""
maxbytes = self.config["warnsizebytes"]
if maxbytes is not None and self.dlsize >= maxbytes:
self.setWarning(linkcheck.i18n._("Content size %s is larger than %s")%\
(linkcheck.StringUtil.strsize(self.dlsize),
linkcheck.StringUtil.strsize(maxbytes)))
def parseUrl (self):
# default parse type is html
debug(BRING_IT_ON, "Parsing recursively into", self)
self.parse_html();
def getUserPassword (self):
for auth in self.config["authentication"]:
if auth['pattern'].match(self.url):
return auth['user'], auth['password']
return None,None
def parse_html (self):
# search for a possible base reference
h = linkcheck.linkparse.LinkFinder(self.getContent(), tags={'base': ['href']})
p = bk.HtmlParser.htmlsax.parser(h)
h.parser = p
p.feed(self.getContent())
p.flush()
h.parser = None
p.handler = None
baseRef = None
if len(h.urls)>=1:
baseRef = h.urls[0][0]
if len(h.urls)>1:
self.setWarning(linkcheck.i18n._(
"more than one <base> tag found, using only the first one"))
h = linkcheck.linkparse.LinkFinder(self.getContent())
p = bk.HtmlParser.htmlsax.parser(h)
h.parser = p
p.feed(self.getContent())
p.flush()
h.parser = None
p.handler = None
for s in h.parse_info:
# the parser had warnings/errors
self.setWarning(s)
for url,line,column,name,codebase in h.urls:
if codebase:
base = codebase
else:
base = baseRef
debug(NIGHTMARE, "Put url %r in queue"%url)
self.config.appendUrl(GetUrlDataFrom(url,
self.recursionLevel+1, self.config,
parentName=self.url, baseRef=base,
line=line, column=column, name=name))
def parse_opera (self):
# parse an opera bookmark file
name = ""
lineno = 0
lines = self.getContent().splitlines()
for line in lines:
lineno += 1
line = line.strip()
if line.startswith("NAME="):
name = line[5:]
elif line.startswith("URL="):
url = line[4:]
if url:
self.config.appendUrl(GetUrlDataFrom(url,
self.recursionLevel+1, self.config, self.url, None, lineno, name))
name = ""
def parse_text (self):
"""parse a text file with on url per line; comment and blank
lines are ignored
UNUSED and UNTESTED, just use linkchecker `cat file.txt`
"""
lineno = 0
for line in self.getContent().splitlines():
lineno += 1
line = line.strip()
if not line or line.startswith('#'): continue
self.config.appendUrl(GetUrlDataFrom(line, self.recursionLevel+1,
self.config, parentName=self.url, line=lineno))
def parse_css (self):
"""parse a CSS file for url() patterns"""
lineno = 0
for line in self.getContent().splitlines():
lineno += 1
for mo in linkcheck.linkparse.css_url_re.finditer(line):
column = mo.start("url")
self.config.appendUrl(GetUrlDataFrom(mo.group("url"),
self.recursionLevel+1, self.config,
parentName=self.url, line=lineno, column=column))
def __str__ (self):
return ("%s link\n"
"urlname=%s\n"
"parentName=%s\n"
"baseRef=%s\n"
"cached=%s\n"
"recursionLevel=%s\n"
"urlConnection=%s\n"
"line=%s\n"
"column=%s\n"
"name=%s" % \
(self.scheme, self.urlName, self.parentName, self.baseRef,
self.cached, self.recursionLevel, self.urlConnection, self.line,
self.column, self.name))