diff --git a/linkcheck/FileUrlData.py b/linkcheck/FileUrlData.py deleted file mode 100644 index 5d6b575c..00000000 --- a/linkcheck/FileUrlData.py +++ /dev/null @@ -1,149 +0,0 @@ -# -*- coding: iso-8859-1 -*- -"""Handle local file: links""" -# Copyright (C) 2000-2004 Bastian Kleineidam -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software -# Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. - -import re -import os -import urlparse -import linkcheck - -# OSError is thrown on Windows when a file is not found -linkcheck.UrlData.ExcList.append(OSError) - -# if file extension was fruitless, look at the content -contents = { - "html": re.compile(r'(?i).*'), - "opera" : re.compile(r'Opera Hotlist'), -# "text" : re.compile(r'[\w\s]+'), -} - -_schemes = r"""( -acap # application configuration access protocol -|afs # Andrew File System global file names -|cid # content identifier -|data # data -|dav # dav -|fax # fax -|imap # internet message access protocol -|ldap # Lightweight Directory Access Protocol -|mailserver # Access to data available from mail servers -|mid # message identifier -|modem # modem -|nfs # network file system protocol -|opaquelocktoken # opaquelocktoken -|pop # Post Office Protocol v3 -|prospero # Prospero Directory Service -|rtsp # real time streaming protocol -|service # service location -|sip # session initiation protocol -|tel # telephone -|tip # Transaction Internet Protocol -|tn3270 # Interactive 3270 emulation sessions -|vemmi # versatile multimedia interface -|wais # Wide Area Information Servers -|z39\.50r # Z39.50 Retrieval -|z39\.50s # Z39.50 Session -|chrome # Mozilla specific -|find # Mozilla specific -|clsid # Microsoft specific -|javascript # JavaScript -|isbn # ISBN (int. book numbers) -|https? # HTTP/HTTPS -|ftp # FTP -|file # local file -|telnet # telnet -|mailto # mailto -|gopher # gopher -|s?news # news -|nntp # news -)""" - -class FileUrlData (linkcheck.UrlData.UrlData): - "Url link with file scheme" - - def __init__ (self, - urlName, - config, - recursionLevel, - parentName = None, - baseRef = None, line=0, column=0, name=""): - super(FileUrlData, self).__init__(urlName, config, recursionLevel, - parentName=parentName, baseRef=baseRef, - line=line, column=column, name=name) - if not (parentName or baseRef or self.urlName.startswith("file:")): - self.urlName = os.path.expanduser(self.urlName) - if not self.urlName.startswith("/"): - self.urlName = os.getcwd()+"/"+self.urlName - self.urlName = "file://"+self.urlName - self.urlName = self.urlName.replace("\\", "/") - # transform c:/windows into /c|/windows - self.urlName = re.sub(r"^file://(/?)([a-zA-Z]):", r"file:///\2|", - self.urlName) - - - def buildUrl (self): - super(FileUrlData, self).buildUrl() - # ignore query and fragment url parts for filesystem urls - self.urlparts[3] = self.urlparts[4] = '' - self.url = urlparse.urlunsplit(self.urlparts) - - - def getCacheKeys (self): - # the host in urlparts is lowercase()d - if self.urlparts: - self.urlparts[4] = self.anchor - key = urlparse.urlunsplit(self.urlparts) - self.urlparts[4] = '' - return [key] - return [] - - - def isHtml (self): - if linkcheck.extensions['html'].search(self.url): - return True - if contents['html'].search(self.getContent()[:20]): - return True - return False - - - def isFile (self): - return True - - - def isParseable (self): - # guess by extension - for ro in linkcheck.extensions.values(): - if ro.search(self.url): - return True - # try to read content (can fail, so catch error) - try: - for ro in contents.values(): - if ro.search(self.getContent()[:20]): - return True - except IOError: - pass - return False - - - def parseUrl (self): - for key, ro in linkcheck.extensions.items(): - if ro.search(self.url): - return getattr(self, "parse_"+key)() - for key, ro in contents.items(): - if ro.search(self.getContent()[:20]): - return getattr(self, "parse_"+key)() - return None diff --git a/linkcheck/FtpUrlData.py b/linkcheck/FtpUrlData.py deleted file mode 100644 index df6ebfc2..00000000 --- a/linkcheck/FtpUrlData.py +++ /dev/null @@ -1,133 +0,0 @@ -# -*- coding: iso-8859-1 -*- -"""Handle FTP links""" -# Copyright (C) 2000-2004 Bastian Kleineidam -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software -# Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. - -import ftplib -import linkcheck -from linkcheck.debug import * - -linkcheck.UrlData.ExcList.extend([ - ftplib.error_reply, - ftplib.error_temp, - ftplib.error_perm, - ftplib.error_proto, -]) - -class FtpUrlData (linkcheck.ProxyUrlData.ProxyUrlData): - """ - Url link with ftp scheme. - """ - def checkConnection (self): - # proxy support (we support only http) - self.setProxy(self.config["proxy"].get(self.scheme)) - if self.proxy: - http = linkcheck.HttpUrlData.HttpUrlData(self.urlName, - self.recursionLevel, - self.config, - parentName=self.parentName, - baseRef=self.baseRef, - line=self.line, - column=self.column, - name=self.name) - http.buildUrl() - return http.check() - # using no proxy here - # get login credentials - if self.userinfo: - _user, _password = urllib.splitpasswd(self.userinfo) - else: - _user, _password = self.getUserPassword() - if _user is None or _password is None: - raise linkcheck.LinkCheckerError(linkcheck.i18n._("No user or password found")) - self.login(_user, _password) - filename = self.cwd() - if filename: - self.retrieve(filename) - return None - - - def isHtml (self): - if linkcheck.extensions['html'].search(self.url): - return True - return False - - - def isParseable (self): - for ro in linkcheck.extensions.values(): - if ro.search(self.url): - return True - return False - - - def parseUrl (self): - for key,ro in linkcheck.extensions.items(): - if ro.search(self.url): - return getattr(self, "parse_"+key)() - return None - - - def login (self, _user, _password): - """log into ftp server and check the welcome message""" - # ready to connect - try: - self.urlConnection = ftplib.FTP() - self.urlConnection.set_debuglevel(get_debuglevel()) - self.urlConnection.connect(self.urlparts[1]) - self.urlConnection.login(_user, _password) - except EOFError: - raise linkcheck.LinkCheckerError(linkcheck.i18n._("Remote host has closed connection")) - if not self.urlConnection.getwelcome(): - self.closeConnection() - raise linkcheck.LinkCheckerError(linkcheck.i18n._("Got no answer from FTP server")) - # dont set info anymore, this may change every time we logged in - #self.setInfo(info) - - - def cwd (self): - """change directory to given path""" - # leeched from webcheck - dirs = self.urlparts[2].split('/') - filename = dirs.pop() - if len(dirs) and not dirs[0]: del dirs[0] - for d in dirs: - self.urlConnection.cwd(d) - return filename - - - def retrieve (self, filename): - """initiate download of given filename""" - # it could be a directory if the trailing slash was forgotten - try: - self.urlConnection.cwd(filename) - self.setWarning(linkcheck.i18n._("Missing trailing directory slash in ftp url")) - return - except ftplib.error_perm: - pass - self.urlConnection.voidcmd('TYPE I') - conn, size = self.urlConnection.ntransfercmd('RETR %s'%filename) - if size: - self.dlsize = size - # dont download data XXX recursion - #page = conn.makefile().read(size) - #else: - # page = conn.makefile().read() - - - def closeConnection (self): - try: self.urlConnection.closet() - except: pass - self.urlConnection = None diff --git a/linkcheck/GopherUrlData.py b/linkcheck/GopherUrlData.py deleted file mode 100644 index 2768747c..00000000 --- a/linkcheck/GopherUrlData.py +++ /dev/null @@ -1,23 +0,0 @@ -# -*- coding: iso-8859-1 -*- -"""Handle Gopher links""" -# Copyright (C) 2000-2004 Bastian Kleineidam -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software -# Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. - -import linkcheck - -class GopherUrlData (linkcheck.UrlData.UrlData): - "Url link with gopher scheme" - pass diff --git a/linkcheck/HostCheckingUrlData.py b/linkcheck/HostCheckingUrlData.py deleted file mode 100644 index daeec4d6..00000000 --- a/linkcheck/HostCheckingUrlData.py +++ /dev/null @@ -1,46 +0,0 @@ -# -*- coding: iso-8859-1 -*- -"""Base handle for links with a hostname""" -# Copyright (C) 2000-2004 Bastian Kleineidam -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software -# Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. - -import socket -import urllib -import linkcheck - -class HostCheckingUrlData (linkcheck.UrlData.UrlData): - "Url link for which we have to connect to a specific host" - - def __init__ (self, urlName, recursionLevel, config, parentName=None, - baseRef=None, line=0, column=0, name=""): - super(HostCheckingUrlData, self).__init__(urlName, recursionLevel, - config, parentName=parentName, baseRef=baseRef, - line=line, column=column, name=name) - self.host = None - self.url = urllib.unquote(self.urlName) - - - def buildUrl (self): - # to avoid anchor checking - self.urlparts = None - - - def getCacheKeys (self): - return ["%s:%s" % (self.scheme, self.host)] - - - def checkConnection (self): - ip = socket.gethostbyname(self.host) - self.setValid(self.host+"("+ip+") "+linkcheck.i18n._("found")) diff --git a/linkcheck/HttpUrlData.py b/linkcheck/HttpUrlData.py deleted file mode 100644 index 0e9c4ba9..00000000 --- a/linkcheck/HttpUrlData.py +++ /dev/null @@ -1,449 +0,0 @@ -# -*- coding: iso-8859-1 -*- -"""Handle http links""" -# Copyright (C) 2000-2004 Bastian Kleineidam -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software -# Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. - -import urlparse -import sys -import time -import re -import zlib -import gzip -import socket -import cStringIO as StringIO -import linkcheck -from linkcheck.debug import * -supportHttps = hasattr(linkcheck.httplib2, "HTTPSConnection") and \ - hasattr(socket, "ssl") - -linkcheck.UrlData.ExcList.extend([linkcheck.httplib2.error,]) - -_supported_encodings = ('gzip', 'x-gzip', 'deflate') - -# Amazon blocks all HEAD requests -_isAmazonHost = re.compile(r'^www\.amazon\.(com|de|ca|fr|co\.(uk|jp))').search - - -class HttpUrlData (linkcheck.ProxyUrlData.ProxyUrlData): - "Url link with http scheme" - - def __init__ (self, urlName, recursionLevel, config, parentName=None, - baseRef=None, line=0, column=0, name=""): - super(HttpUrlData, self).__init__(urlName, recursionLevel, config, - parentName=parentName, baseRef=baseRef, line=line, - column=column, name=name) - self.aliases = [] - self.max_redirects = 5 - self.has301status = False - self.no_anchor = False # remove anchor in request url - - - def buildUrl (self): - super(HttpUrlData, self).buildUrl() - # encode userinfo - # XXX - # check for empty paths - if not self.urlparts[2]: - self.setWarning(linkcheck.i18n._("URL path is empty, assuming '/' as path")) - self.urlparts[2] = '/' - self.url = urlparse.urlunsplit(self.urlparts) - - - def checkConnection (self): - """ - Check a URL with HTTP protocol. - Here is an excerpt from RFC 1945 with common response codes: - The first digit of the Status-Code defines the class of response. The - last two digits do not have any categorization role. There are 5 - values for the first digit: - o 1xx: Informational - Not used, but reserved for future use - o 2xx: Success - The action was successfully received, - understood, and accepted. - o 3xx: Redirection - Further action must be taken in order to - complete the request - o 4xx: Client Error - The request contains bad syntax or cannot - be fulfilled - o 5xx: Server Error - The server failed to fulfill an apparently - valid request - The individual values of the numeric status codes defined for - HTTP/1.0, and an example set of corresponding Reason-Phrase's, are - presented below. The reason phrases listed here are only recommended - -- they may be replaced by local equivalents without affecting the - protocol. These codes are fully defined in Section 9. - Status-Code = "200" ; OK - | "201" ; Created - | "202" ; Accepted - | "204" ; No Content - | "301" ; Moved Permanently - | "302" ; Moved Temporarily - | "304" ; Not Modified - | "305" ; Use Proxy - | "400" ; Bad Request - | "401" ; Unauthorized - | "403" ; Forbidden - | "404" ; Not Found - | "405" ; Method not allowed - | "407" ; Proxy Authentication Required - | "500" ; Internal Server Error - | "501" ; Not Implemented - | "502" ; Bad Gateway - | "503" ; Service Unavailable - | extension-code - """ - # set the proxy, so a 407 status after this is an error - self.setProxy(self.config["proxy"].get(self.scheme)) - if self.proxy: - self.setInfo(linkcheck.i18n._("Using Proxy %r")%self.proxy) - self.headers = None - self.auth = None - self.cookies = [] - if not self.robotsTxtAllowsUrl(): - self.setWarning(linkcheck.i18n._("Access denied by robots.txt, checked only syntax")) - return - - if _isAmazonHost(self.urlparts[1]): - self.setWarning(linkcheck.i18n._("Amazon servers block HTTP HEAD requests, " - "using GET instead")) - self.method = "GET" - else: - # first try with HEAD - self.method = "HEAD" - fallback_GET = False - redirectCache = [self.url] - while True: - try: - response = self._getHttpResponse() - except linkcheck.httplib2.BadStatusLine: - # some servers send empty HEAD replies - if self.method=="HEAD": - self.method = "GET" - redirectCache = [self.url] - fallback_GET = True - continue - raise - self.headers = response.msg - debug(BRING_IT_ON, response.status, response.reason, self.headers) - # proxy enforcement (overrides standard proxy) - if response.status == 305 and self.headers: - oldproxy = (self.proxy, self.proxyauth) - self.setProxy(self.headers.getheader("Location")) - self.setInfo(linkcheck.i18n._("Enforced Proxy %r")%self.proxy) - response = self._getHttpResponse() - self.headers = response.msg - self.proxy, self.proxyauth = oldproxy - # follow all redirections - tries, response = self.followRedirections(response, redirectCache) - if tries == -1: - # already handled - return - if tries >= self.max_redirects: - if self.method=="HEAD": - # Microsoft servers tend to recurse HEAD requests - self.method = "GET" - redirectCache = [self.url] - fallback_GET = True - continue - self.setError(linkcheck.i18n._("more than %d redirections, aborting")%self.max_redirects) - return - # user authentication - if response.status == 401: - if not self.auth: - import base64 - _user, _password = self.getUserPassword() - self.auth = "Basic "+\ - base64.encodestring("%s:%s" % (_user, _password)) - debug(BRING_IT_ON, "Authentication", _user, "/", _password) - continue - elif response.status >= 400: - if self.headers and self.urlparts[4]: - self.no_anchor = True - continue - if self.method=="HEAD": - # fall back to GET - self.method = "GET" - redirectCache = [self.url] - fallback_GET = True - continue - elif self.headers and self.method!="GET": - # test for HEAD support - mime = self.headers.gettype() - poweredby = self.headers.get('X-Powered-By', '') - server = self.headers.get('Server', '') - if mime=='application/octet-stream' and \ - (poweredby.startswith('Zope') or \ - server.startswith('Zope')): - self.setWarning(linkcheck.i18n._("Zope Server cannot determine" - " MIME type with HEAD, falling back to GET")) - self.method = "GET" - continue - break - # check url warnings - effectiveurl = urlparse.urlunsplit(self.urlparts) - if self.url != effectiveurl: - self.setWarning(linkcheck.i18n._("Effective URL %s") % effectiveurl) - self.url = effectiveurl - # check response - self.checkResponse(response, fallback_GET) - - - def followRedirections (self, response, redirectCache): - """follow all redirections of http response""" - redirected = self.url - tries = 0 - while response.status in [301,302] and self.headers and \ - tries < self.max_redirects: - newurl = self.headers.getheader("Location", - self.headers.getheader("Uri", "")) - redirected = linkcheck.url.url_norm(urlparse.urljoin(redirected, newurl)) - # note: urlparts has to be a list - self.urlparts = list(urlparse.urlsplit(redirected)) - # check internal redirect cache to avoid recursion - if redirected in redirectCache: - redirectCache.append(redirected) - if self.method == "HEAD": - # Microsoft servers tend to recurse HEAD requests - # fall back to the original url and use GET - self.urlparts = list(urlparse.urlsplit(self.url)) - return self.max_redirects, response - self.setError( - linkcheck.i18n._("recursive redirection encountered:\n %s") % \ - "\n => ".join(redirectCache)) - return -1, response - redirectCache.append(redirected) - # remember this alias - if response.status == 301: - if not self.has301status: - self.setWarning(linkcheck.i18n._("HTTP 301 (moved permanent) encountered: you " - "should update this link.")) - if not (self.url.endswith('/') or self.url.endswith('.html')): - self.setWarning(linkcheck.i18n._("A HTTP 301 redirection occured and the url has no " - "trailing / at the end. All urls which point to (home) " - "directories should end with a / to avoid redirection.")) - self.has301status = True - self.aliases.append(redirected) - # check cache again on possibly changed URL - key = self.getCacheKey() - if self.config.urlCache_has_key(key): - self.copyFromCache(self.config.urlCache_get(key)) - self.cached = True - self.logMe() - return -1, response - # check if we still have a http url, it could be another - # scheme, eg https or news - if self.urlparts[0]!="http": - self.setWarning(linkcheck.i18n._("HTTP redirection to non-http url encountered; " - "the original url was %r.")%self.url) - # make new UrlData object - newobj = linkcheck.UrlData.GetUrlDataFrom(redirected, self.recursionLevel, self.config, - parentName=self.parentName, baseRef=self.baseRef, - line=self.line, column=self.column, name=self.name) - newobj.warningString = self.warningString - newobj.infoString = self.infoString - # append new object to queue - self.config.appendUrl(newobj) - # pretend to be finished and logged - self.cached = True - return -1, response - # new response data - response = self._getHttpResponse() - self.headers = response.msg - debug(BRING_IT_ON, "Redirected", self.headers) - tries += 1 - return tries, response - - - def checkResponse (self, response, fallback_GET): - """check final result""" - if response.status >= 400: - self.setError("%r %s"%(response.status, response.reason)) - else: - if self.headers and self.headers.has_key("Server"): - server = self.headers['Server'] - else: - server = linkcheck.i18n._("unknown") - if fallback_GET: - self.setWarning(linkcheck.i18n._("Server %r did not support HEAD request, used GET for checking")%server) - if self.no_anchor: - self.setWarning(linkcheck.i18n._("Server %r had no anchor support, removed anchor from request")%server) - if response.status == 204: - # no content - self.setWarning(response.reason) - # store cookies for valid links - if self.config['cookies']: - for c in self.cookies: - self.setInfo("Cookie: %s"%c) - out = self.config.storeCookies(self.headers, self.urlparts[1]) - for h in out: - self.setInfo(h) - if response.status >= 200: - self.setValid("%r %s"%(response.status,response.reason)) - else: - self.setValid("OK") - modified = self.headers.get('Last-Modified', '') - if modified: - self.setInfo(linkcheck.i18n._("Last modified %s") % modified) - - - def getCacheKeys (self): - keys = super(HttpUrlData, self).getCacheKeys() - keys.extend(self.aliases) - return keys - - - def _getHttpResponse (self): - """Put request and return (status code, status text, mime object). - host can be host:port format - """ - if self.proxy: - host = self.proxy - scheme = "http" - else: - host = self.urlparts[1] - scheme = self.urlparts[0] - debug(HURT_ME_PLENTY, "host", host) - if self.urlConnection: - self.closeConnection() - self.urlConnection = self.getHTTPObject(host, scheme) - # quote url before submit - url = linkcheck.url.url_quote(urlparse.urlunsplit(self.urlparts)) - qurlparts = list(urlparse.urlsplit(url)) - if self.no_anchor: - qurlparts[4] = '' - if self.proxy: - path = urlparse.urlunsplit(qurlparts) - else: - path = urlparse.urlunsplit(('', '', qurlparts[2], - qurlparts[3], qurlparts[4])) - self.urlConnection.putrequest(self.method, path, skip_host=True) - self.urlConnection.putheader("Host", host) - # userinfo is from http://user@pass:host/ - if self.userinfo: - self.urlConnection.putheader("Authorization", self.userinfo) - # auth is the -u and -p configuration options - elif self.auth: - self.urlConnection.putheader("Authorization", self.auth) - if self.proxyauth: - self.urlConnection.putheader("Proxy-Authorization", - self.proxyauth) - if self.parentName: - self.urlConnection.putheader("Referer", self.parentName) - self.urlConnection.putheader("User-Agent", linkcheck.Config.UserAgent) - self.urlConnection.putheader("Accept-Encoding", "gzip;q=1.0, deflate;q=0.9, identity;q=0.5") - if self.config['cookies']: - self.cookies = self.config.getCookies(self.urlparts[1], - self.urlparts[2]) - for c in self.cookies: - self.urlConnection.putheader("Cookie", c) - self.urlConnection.endheaders() - return self.urlConnection.getresponse() - - - def getHTTPObject (self, host, scheme): - if scheme=="http": - h = linkcheck.httplib2.HTTPConnection(host) - elif scheme=="https": - h = linkcheck.httplib2.HTTPSConnection(host) - else: - raise linkcheck.LinkCheckerError, "invalid url scheme %s" % scheme - h.set_debuglevel(get_debuglevel()) - h.connect() - return h - - - def getContent (self): - if not self.has_content: - self.method = "GET" - self.has_content = True - self.closeConnection() - t = time.time() - response = self._getHttpResponse() - self.headers = response.msg - self.data = response.read() - encoding = self.headers.get("Content-Encoding") - if encoding in _supported_encodings: - try: - if encoding == 'deflate': - f = StringIO.StringIO(zlib.decompress(self.data)) - else: - f = gzip.GzipFile('', 'rb', 9, StringIO.StringIO(self.data)) - except zlib.error: - f = StringIO.StringIO(self.data) - self.data = f.read() - self.downloadtime = time.time() - t - return self.data - - - def isHtml (self): - if not (self.valid and self.headers): - return False - if self.headers.gettype()[:9]!="text/html": - return False - encoding = self.headers.get("Content-Encoding") - if encoding and encoding not in _supported_encodings and \ - encoding!='identity': - self.setWarning(linkcheck.i18n._('Unsupported content encoding %r.')%encoding) - return False - return True - - - def isHttp (self): - return True - - - def getContentType (self): - ptype = self.headers.get('Content-Type', 'application/octet-stream') - if ";" in ptype: - ptype = ptype.split(';')[0] - return ptype - - - def isParseable (self): - if not (self.valid and self.headers): - return False - if self.getContentType() not in ("text/html", "text/css"): - return False - encoding = self.headers.get("Content-Encoding") - if encoding and encoding not in _supported_encodings and \ - encoding!='identity': - self.setWarning(linkcheck.i18n._('Unsupported content encoding %r.')%encoding) - return False - return True - - - def parseUrl (self): - ptype = self.getContentType() - if ptype=="text/html": - self.parse_html() - elif ptype=="text/css": - self.parse_css() - return None - - - def getRobotsTxtUrl (self): - return "%s://%s/robots.txt"%tuple(self.urlparts[0:2]) - - - def robotsTxtAllowsUrl (self): - roboturl = self.getRobotsTxtUrl() - debug(HURT_ME_PLENTY, "robots.txt url", roboturl) - debug(HURT_ME_PLENTY, "url", self.url) - if not self.config.robotsTxtCache_has_key(roboturl): - rp = linkcheck.robotparser2.RobotFileParser() - rp.set_url(roboturl) - rp.read() - self.config.robotsTxtCache_set(roboturl, rp) - rp = self.config.robotsTxtCache_get(roboturl) - return rp.can_fetch(linkcheck.Config.UserAgent, self.url) diff --git a/linkcheck/HttpsUrlData.py b/linkcheck/HttpsUrlData.py deleted file mode 100644 index 0a3ead33..00000000 --- a/linkcheck/HttpsUrlData.py +++ /dev/null @@ -1,31 +0,0 @@ -# -*- coding: iso-8859-1 -*- -"""Handle https links""" -# Copyright (C) 2000-2004 Bastian Kleineidam -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software -# Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. - -import linkcheck -from linkcheck.debug import * - - -class HttpsUrlData (linkcheck.HttpUrlData.HttpUrlData): - """Url link with https scheme""" - - def _check (self): - if linkcheck.HttpUrlData.supportHttps: - super(HttpsUrlData, self)._check() - else: - self.setWarning(linkcheck.i18n._("%s url ignored")%self.scheme.capitalize()) - self.logMe() diff --git a/linkcheck/IgnoredUrlData.py b/linkcheck/IgnoredUrlData.py deleted file mode 100644 index ea5e54e8..00000000 --- a/linkcheck/IgnoredUrlData.py +++ /dev/null @@ -1,66 +0,0 @@ -# -*- coding: iso-8859-1 -*- -"""Handle for uncheckable application-specific links""" -# Copyright (C) 2001-2004 Bastian Kleineidam -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software -# Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. - -import re -import linkcheck - -ignored_schemes = r"""^( -acap # application configuration access protocol -|afs # Andrew File System global file names -|cid # content identifier -|data # data -|dav # dav -|fax # fax -|imap # internet message access protocol -|ldap # Lightweight Directory Access Protocol -|mailserver # Access to data available from mail servers -|mid # message identifier -|modem # modem -|nfs # network file system protocol -|opaquelocktoken # opaquelocktoken -|pop # Post Office Protocol v3 -|prospero # Prospero Directory Service -|rtsp # real time streaming protocol -|service # service location -|sip # session initiation protocol -|tel # telephone -|tip # Transaction Internet Protocol -|tn3270 # Interactive 3270 emulation sessions -|vemmi # versatile multimedia interface -|wais # Wide Area Information Servers -|z39\.50r # Z39.50 Retrieval -|z39\.50s # Z39.50 Session -|chrome # Mozilla specific -|find # Mozilla specific -|clsid # Microsoft specific -|javascript # JavaScript -|isbn # ISBN (int. book numbers) -):""" - -ignored_schemes_re = re.compile(ignored_schemes, re.VERBOSE) - - -class IgnoredUrlData (linkcheck.UrlData.UrlData): - """Some schemes are defined in http://www.w3.org/Addressing/schemes""" - - def _check (self): - self.setWarning(linkcheck.i18n._("%s url ignored")%self.scheme.capitalize()) - self.logMe() - - def hasContent (self): - return False diff --git a/linkcheck/NntpUrlData.py b/linkcheck/NntpUrlData.py deleted file mode 100644 index 12551b4a..00000000 --- a/linkcheck/NntpUrlData.py +++ /dev/null @@ -1,101 +0,0 @@ -# -*- coding: iso-8859-1 -*- -"""Handle nntp: and news: links""" -# Copyright (C) 2000-2004 Bastian Kleineidam -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software -# Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. - -import re, time, sys, nntplib, urlparse, random -from linkcheck import linkcheck.LinkCheckerError, Config -from UrlData import ExcList, UrlData -from debug import * -random.seed() - -ExcList.extend([nntplib.error_reply, - nntplib.error_temp, - nntplib.error_perm, - nntplib.error_proto, - EOFError, - ]) - -class NntpUrlData (UrlData): - "Url link with NNTP scheme" - - def buildUrl (self): - # use nntp instead of news to comply with the unofficial internet - # draft of Alfred Gilman which unifies (s)news and nntp URLs - # note: we use this only internally (for parsing and caching) - if self.urlName[:4].lower()=='news': - self.url = 'nntp'+self.urlName[4:] - else: - self.url = self.urlName - self.urlparts = urlparse.urlsplit(self.url) - Config.debug(BRING_IT_ON, self.urlparts) - - - def checkConnection (self): - nntpserver = self.urlparts[1] or self.config["nntpserver"] - if not nntpserver: - self.setWarning(linkcheck.i18n._("No NNTP server specified, skipping this URL")) - return - nntp = self._connectNntp(nntpserver) - group = self.urlparts[2] - while group[:1]=='/': - group = group[1:] - if '@' in group: - # request article - resp,number,mid = nntp.stat("<"+group+">") - self.setInfo(linkcheck.i18n._('Articel number %s found') % number) - else: - # split off trailing articel span - group = group.split('/',1)[0] - if group: - # request group info - resp,count,first,last,name = nntp.group(group) - self.setInfo(linkcheck.i18n._("Group %s has %s articles, range %s to %s") %\ - (name, count, first, last)) - else: - # group name is the empty string - self.setWarning(linkcheck.i18n._("No newsgroup specified in NNTP URL")) - - - def _connectNntp (self, nntpserver): - """This is done only once per checking task. Also, the newly - introduced error codes 504 and 505 (both inclining "Too busy, retry - later", are caught.""" - tries = 0 - nntp = value = None - while tries < 5: - tries += 1 - try: - nntp=nntplib.NNTP(nntpserver) - except nntplib.error_perm: - value = sys.exc_info()[1] - if re.compile("^50[45]").search(str(value)): - time.sleep(random.randrange(10,30)) - else: - raise - if nntp is None: - raise linkcheck.LinkCheckerError(linkcheck.i18n._("NTTP server too busy; tried more than %d times")%tries) - if value is not None: - self.setWarning(linkcheck.i18n._("NNTP busy: %s")%str(value)) - return nntp - - - def getCacheKeys (self): - return [self.url] - - - def hasContent (self): - return False diff --git a/linkcheck/ProxyUrlData.py b/linkcheck/ProxyUrlData.py deleted file mode 100644 index e4476818..00000000 --- a/linkcheck/ProxyUrlData.py +++ /dev/null @@ -1,39 +0,0 @@ -# -*- coding: iso-8859-1 -*- -# Copyright (C) 2000-2004 Bastian Kleineidam -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software -# Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. -from UrlData import UrlData -from urllib import splittype, splithost, splituser - -class ProxyUrlData (UrlData): - """urldata with ability for proxying and for urls with user:pass@host - setting""" - - - def setProxy (self, proxy): - self.proxy = proxy - self.proxyauth = None - if self.proxy: - if self.proxy[:7].lower() != "http://": - self.proxy = "http://"+self.proxy - self.proxy = splittype(self.proxy)[1] - self.proxy = splithost(self.proxy)[0] - self.proxyauth, self.proxy = splituser(self.proxy) - if self.proxyauth is not None: - if ":" not in self.proxyauth: self.proxyauth += ":" - import base64 - self.proxyauth = base64.encodestring(self.proxyauth).strip() - self.proxyauth = "Basic "+self.proxyauth - diff --git a/linkcheck/TelnetUrlData.py b/linkcheck/TelnetUrlData.py deleted file mode 100644 index ba190343..00000000 --- a/linkcheck/TelnetUrlData.py +++ /dev/null @@ -1,64 +0,0 @@ -# -*- coding: iso-8859-1 -*- -"""Handle telnet: links""" -# Copyright (C) 2000-2004 Bastian Kleineidam -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software -# Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. - -import telnetlib -import urlparse -import urllib -import linkcheck -from linkcheck.debug import * - - -class TelnetUrlData (linkcheck.HostCheckingUrlData.HostCheckingUrlData): - "Url link with telnet scheme" - - def buildUrl (self): - super(TelnetUrlData, self).buildUrl() - parts = urlparse.urlsplit(self.url) - userinfo, self.host = urllib.splituser(parts[1]) - self.host, self.port = urllib.splitport(self.host) - if self.port is not None: - if not linkcheck.UrlData.is_valid_port(self.port): - raise linkcheck.LinkCheckerError(linkcheck.i18n._("URL has invalid port number %s")\ - % self.port) - self.port = int(self.port) - else: - self.port = 23 - if userinfo: - self.user, self.password = urllib.splitpasswd(userinfo) - else: - self.user, self.password = self.getUserPassword() - - - def checkConnection (self): - super(TelnetUrlData, self).checkConnection() - self.urlConnection = telnetlib.Telnet() - self.urlConnection.set_debuglevel(get_debuglevel()) - self.urlConnection.open(self.host, self.port) - if self.user: - self.urlConnection.read_until("login: ", 10) - self.urlConnection.write(self.user+"\n") - if self.password: - self.urlConnection.read_until("Password: ", 10) - self.urlConnection.write(self.password+"\n") - # XXX how to tell if we are logged in?? - self.urlConnection.write("exit\n") - - - def hasContent (self): - return False - diff --git a/linkcheck/UrlData.py b/linkcheck/UrlData.py deleted file mode 100644 index eabe5cca..00000000 --- a/linkcheck/UrlData.py +++ /dev/null @@ -1,586 +0,0 @@ -# -*- coding: iso-8859-1 -*- -"""Base URL handler""" -# Copyright (C) 2000-2004 Bastian Kleineidam -# -# This program is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 2 of the License, or -# (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software -# Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. - -import sys -import re -import urlparse -import urllib2 -import urllib -import time -import traceback -import socket -import select -import linkcheck -import linkcheck.DNS - - -ws_at_start_or_end = re.compile(r"(^\s+)|(\s+$)").search - -# helper function for internal errors -def internal_error (): - print >>sys.stderr, linkcheck.i18n._("""\n********** Oops, I did it again. ************* - -You have found an internal error in LinkChecker. Please write a bug report -at http://sourceforge.net/tracker/?func=add&group_id=1913&atid=101913 -or send mail to %s and include the following information: -1) The URL or file you are testing -2) Your commandline arguments and/or configuration. -3) The system information below. - -If you disclose some information because its too private to you thats ok. -I will try to help you nontheless (but you have to give me *something* -I can work with ;). -""") % linkcheck.Config.Email - etype, value = sys.exc_info()[:2] - print >>sys.stderr, etype, value - traceback.print_exc() - print_app_info() - print >>sys.stderr, linkcheck.i18n._("\n******** LinkChecker internal error, bailing out ********") - sys.exit(1) - - -def print_app_info (): - import os - print >>sys.stderr, linkcheck.i18n._("System info:") - print >>sys.stderr, linkcheck.Config.App - print >>sys.stderr, "Python %s on %s" % (sys.version, sys.platform) - for key in ("LC_ALL", "LC_MESSAGES", "http_proxy", "ftp_proxy"): - value = os.getenv(key) - if value is not None: - print >>sys.stderr, key, "=", repr(value) - - -def get_absolute_url (urlName, baseRef, parentName): - """Search for the absolute url to detect the link type. This does not - join any url fragments together! Returns the url in lower case to - simplify urltype matching.""" - if urlName and ":" in urlName: - return urlName.lower() - elif baseRef and ":" in baseRef: - return baseRef.lower() - elif parentName and ":" in parentName: - return parentName.lower() - return "" - - -# we catch these exceptions, all other exceptions are internal -# or system errors -ExcList = [ - IOError, - ValueError, # from httplib.py - linkcheck.LinkCheckerError, - linkcheck.DNS.Error, - socket.timeout, - socket.error, - select.error, -] - -if hasattr(socket, "sslerror"): - ExcList.append(socket.sslerror) - -# regular expression for port numbers -is_valid_port = re.compile(r"\d+").match - - -class UrlData (object): - "Representing a URL with additional information like validity etc" - - def __init__ (self, - urlName, - recursionLevel, - config, - parentName = None, - baseRef = None, - line = 0, - column = 0, - name = ""): - self.urlName = urlName - self.anchor = None - self.recursionLevel = recursionLevel - self.config = config - self.parentName = parentName - self.baseRef = baseRef - self.errorString = linkcheck.i18n._("Error") - self.validString = linkcheck.i18n._("Valid") - self.warningString = None - self.infoString = None - self.valid = True - self.url = None - self.urlparts = None - self.line = line - self.column = column - self.name = name - self.dltime = -1 - self.dlsize = -1 - self.checktime = 0 - self.cached = False - self.urlConnection = None - self.extern = (1, 0) - self.data = None - self.has_content = False - url = get_absolute_url(self.urlName, self.baseRef, self.parentName) - # assume file link if no scheme is found - self.scheme = url.split(":", 1)[0] or "file" - - def setError (self, s): - self.valid = False - self.errorString = linkcheck.i18n._("Error")+": "+s - - def setValid (self, s): - self.valid = True - self.validString = linkcheck.i18n._("Valid")+": "+s - - def isParseable (self): - return False - - def isHtml (self): - return False - - def isHttp (self): - return False - - def isFile (self): - return False - - def setWarning (self, s): - if self.warningString: - self.warningString += "\n"+s - else: - self.warningString = s - - def setInfo (self, s): - if self.infoString: - self.infoString += "\n"+s - else: - self.infoString = s - - def copyFromCache (self, cacheData): - """fill attributes from cache data""" - self.errorString = cacheData["errorString"] - self.validString = cacheData["validString"] - if self.warningString: - if cacheData["warningString"]: - self.warningString += "\n"+cacheData["warningString"] - else: - self.warningString = cacheData["warningString"] - self.infoString = cacheData["infoString"] - self.valid = cacheData["valid"] - self.dltime = cacheData["dltime"] - - def getCacheData (self): - """return all data values that should be put in the cache""" - return {"errorString": self.errorString, - "validString": self.validString, - "warningString": self.warningString, - "infoString": self.infoString, - "valid": self.valid, - "dltime": self.dltime, - } - - def buildUrl (self): - if self.baseRef: - if ":" not in self.baseRef: - self.baseRef = urlparse.urljoin(self.parentName, self.baseRef) - self.url = urlparse.urljoin(self.baseRef, self.urlName) - elif self.parentName: - self.url = urlparse.urljoin(self.parentName, self.urlName) - else: - self.url = self.urlName - # unquote url - self.url = urllib.unquote(self.url) - # split into (modifiable) list - self.urlparts = list(urlparse.urlsplit(self.url)) - # check userinfo@host:port syntax - self.userinfo, host = urllib.splituser(self.urlparts[1]) - x, port = urllib.splitport(host) - if port is not None and not is_valid_port(port): - raise linkcheck.LinkCheckerError(linkcheck.i18n._("URL has invalid port number %r")\ - % str(port)) - # set host lowercase and without userinfo - self.urlparts[1] = host.lower() - # safe anchor for later checking - self.anchor = self.urlparts[4] - - def logMe (self): - debug(BRING_IT_ON, "logging url") - self.config.incrementLinknumber() - if self.config["verbose"] or not self.valid or \ - (self.warningString and self.config["warnings"]): - self.config.log_newUrl(self) - - def check (self): - try: - self._check() - except KeyboardInterrupt: - raise - except (socket.error, select.error): - # on Unix, ctrl-c can raise - # error: (4, 'Interrupted system call') - etype, value = sys.exc_info()[:2] - if etype!=4: - raise - except linkcheck.test_support.Error: - raise - except: - internal_error() - - def _check (self): - debug(BRING_IT_ON, "Checking", self) - if self.recursionLevel and self.config['wait']: - debug(BRING_IT_ON, "sleeping for", self.config['wait'], "seconds") - time.sleep(self.config['wait']) - t = time.time() - if not self.checkCache(): - return - # apply filter - debug(BRING_IT_ON, "extern =", self.extern) - if self.extern[0] and (self.config["strict"] or self.extern[1]): - self.setWarning( - linkcheck.i18n._("outside of domain filter, checked only syntax")) - self.logMe() - return - - # check connection - debug(BRING_IT_ON, "checking connection") - try: - self.checkConnection() - if self.cached: - return - if self.config["anchors"]: - self.checkAnchors() - except tuple(ExcList): - etype, evalue, etb = sys.exc_info() - debug(HURT_ME_PLENTY, "exception", traceback.format_tb(etb)) - # make nicer error msg for unknown hosts - if isinstance(evalue, socket.error) and evalue[0]==-2: - evalue = linkcheck.i18n._('Hostname not found') - # make nicer error msg for bad status line - if isinstance(evalue, linkcheck.httplib2.BadStatusLine): - evalue = linkcheck.i18n._('Bad HTTP response %r')%str(evalue) - self.setError(str(evalue)) - - # check content - warningregex = self.config["warningregex"] - if warningregex and self.valid: - debug(BRING_IT_ON, "checking content") - try: - self.checkContent(warningregex) - except tuple(ExcList): - value, tb = sys.exc_info()[1:] - debug(HURT_ME_PLENTY, "exception", traceback.format_tb(tb)) - self.setError(str(value)) - - self.checktime = time.time() - t - # check recursion - debug(BRING_IT_ON, "checking recursion") - try: - if self.allowsRecursion(): - self.parseUrl() - # check content size - self.checkSize() - except tuple(ExcList): - value, tb = sys.exc_info()[1:] - debug(HURT_ME_PLENTY, "exception", traceback.format_tb(tb)) - self.setError(linkcheck.i18n._("could not parse content: %r")%str(value)) - # close - self.closeConnection() - self.logMe() - debug(BRING_IT_ON, "caching") - self.putInCache() - - def checkSyntax (self): - debug(BRING_IT_ON, "checking syntax") - if not self.urlName or self.urlName=="": - self.setError(linkcheck.i18n._("URL is null or empty")) - self.logMe() - return False - if ws_at_start_or_end(self.urlName): - self.setError(linkcheck.i18n._("URL has whitespace at beginning or end")) - self.logMe() - return False - try: - self.buildUrl() - self.extern = self._getExtern() - except linkcheck.LinkCheckerError, msg: - self.setError(str(msg)) - self.logMe() - return False - return True - - def checkCache (self): - debug(BRING_IT_ON, "checking cache") - for key in self.getCacheKeys(): - if self.config.urlCache_has_key(key): - self.copyFromCache(self.config.urlCache_get(key)) - self.cached = True - self.logMe() - return False - return True - - def closeConnection (self): - # brute force closing - if self.urlConnection is not None: - try: self.urlConnection.close() - except: pass - # release variable for garbage collection - self.urlConnection = None - - def putInCache (self): - if not self.cached: - data = self.getCacheData() - for key in self.getCacheKeys(): - self.config.urlCache_set(key, data) - self.config.urlSeen_set(key) - self.cached = True - - def getCacheKeys (self): - key = self.getCacheKey() - if key is None: - return [] - return [key] - - def isCached (self): - key = self.getCacheKey() - return self.cached or self.config.urlSeen_has_key(key) - - def getCacheKey (self): - # note: the host is already lowercase - if self.urlparts: - if self.config["anchorcaching"]: - # do not ignore anchor - return urlparse.urlunsplit(self.urlparts) - else: - # removed anchor from cache key - return urlparse.urlunsplit(self.urlparts[:4]+['']) - return None - - def checkConnection (self): - self.urlConnection = urllib2.urlopen(self.url) - - def allowsRecursion (self): - # note: test self.valid before self.isParseable() - return self.valid and \ - self.isParseable() and \ - self.hasContent() and \ - not self.isCached() and \ - (self.config["recursionlevel"] < 0 or - self.recursionLevel < self.config["recursionlevel"]) and \ - not self.extern[0] and self.contentAllowsRobots() - - def contentAllowsRobots (self): - if not self.isHtml(): - return True - if not (self.isHttp() or self.isFile()): - return True - h = linkcheck.linkparse.MetaRobotsFinder(self.getContent()) - p = bk.HtmlParser.htmlsax.parser(h) - h.parser = p - p.feed(self.getContent()) - p.flush() - h.parser = None - p.handler = None - return h.follow - - - def checkAnchors (self): - if not (self.valid and self.anchor and self.isHtml() and \ - self.hasContent()): - # do not bother - return - debug(HURT_ME_PLENTY, "checking anchor", self.anchor) - h = linkcheck.linkparse.LinkFinder(self.getContent(), tags={'a': ['name'], None: ['id']}) - p = bk.HtmlParser.htmlsax.parser(h) - h.parser = p - p.feed(self.getContent()) - p.flush() - h.parser = None - p.handler = None - for cur_anchor,line,column,name,base in h.urls: - if cur_anchor == self.anchor: - return - self.setWarning(linkcheck.i18n._("anchor #%s not found") % self.anchor) - - def _getExtern (self): - if not (self.config["externlinks"] or self.config["internlinks"]): - return (0, 0) - # deny and allow external checking - linkcheck.Config.debug(HURT_ME_PLENTY, "Url", self.url) - if self.config["denyallow"]: - for entry in self.config["externlinks"]: - linkcheck.Config.debug(HURT_ME_PLENTY, "Extern entry", entry) - match = entry['pattern'].search(self.url) - if (entry['negate'] and not match) or \ - (match and not entry['negate']): - return (1, entry['strict']) - for entry in self.config["internlinks"]: - linkcheck.Config.debug(HURT_ME_PLENTY, "Intern entry", entry) - match = entry['pattern'].search(self.url) - if (entry['negate'] and not match) or \ - (match and not entry['negate']): - return (0, 0) - return (0, 0) - else: - for entry in self.config["internlinks"]: - linkcheck.Config.debug(HURT_ME_PLENTY, "Intern entry", entry) - match = entry['pattern'].search(self.url) - if (entry['negate'] and not match) or \ - (match and not entry['negate']): - return (0, 0) - for entry in self.config["externlinks"]: - linkcheck.Config.debug(HURT_ME_PLENTY, "Extern entry", entry) - match = entry['pattern'].search(self.url) - if (entry['negate'] and not match) or \ - (match and not entry['negate']): - return (1, entry['strict']) - return (1,0) - - def hasContent (self): - """indicate wether url getContent() can be called""" - return True - - def getContent (self): - """Precondition: urlConnection is an opened URL.""" - if not self.has_content: - self.has_content = True - t = time.time() - self.data = self.urlConnection.read() - self.dltime = time.time() - t - self.dlsize = len(self.data) - return self.data - - def checkContent (self, warningregex): - """if a warning expression was given, call this function to check it - against the content of this url""" - if not self.hasContent(): - return - match = warningregex.search(self.getContent()) - if match: - self.setWarning(linkcheck.i18n._("Found %r in link contents")%match.group()) - - def checkSize (self): - """if a maximum size was given, call this function to check it - against the content size of this url""" - maxbytes = self.config["warnsizebytes"] - if maxbytes is not None and self.dlsize >= maxbytes: - self.setWarning(linkcheck.i18n._("Content size %s is larger than %s")%\ - (linkcheck.StringUtil.strsize(self.dlsize), - linkcheck.StringUtil.strsize(maxbytes))) - - def parseUrl (self): - # default parse type is html - debug(BRING_IT_ON, "Parsing recursively into", self) - self.parse_html(); - - def getUserPassword (self): - for auth in self.config["authentication"]: - if auth['pattern'].match(self.url): - return auth['user'], auth['password'] - return None,None - - def parse_html (self): - # search for a possible base reference - h = linkcheck.linkparse.LinkFinder(self.getContent(), tags={'base': ['href']}) - p = bk.HtmlParser.htmlsax.parser(h) - h.parser = p - p.feed(self.getContent()) - p.flush() - h.parser = None - p.handler = None - baseRef = None - if len(h.urls)>=1: - baseRef = h.urls[0][0] - if len(h.urls)>1: - self.setWarning(linkcheck.i18n._( - "more than one tag found, using only the first one")) - h = linkcheck.linkparse.LinkFinder(self.getContent()) - p = bk.HtmlParser.htmlsax.parser(h) - h.parser = p - p.feed(self.getContent()) - p.flush() - h.parser = None - p.handler = None - for s in h.parse_info: - # the parser had warnings/errors - self.setWarning(s) - for url,line,column,name,codebase in h.urls: - if codebase: - base = codebase - else: - base = baseRef - debug(NIGHTMARE, "Put url %r in queue"%url) - self.config.appendUrl(GetUrlDataFrom(url, - self.recursionLevel+1, self.config, - parentName=self.url, baseRef=base, - line=line, column=column, name=name)) - - def parse_opera (self): - # parse an opera bookmark file - name = "" - lineno = 0 - lines = self.getContent().splitlines() - for line in lines: - lineno += 1 - line = line.strip() - if line.startswith("NAME="): - name = line[5:] - elif line.startswith("URL="): - url = line[4:] - if url: - self.config.appendUrl(GetUrlDataFrom(url, - self.recursionLevel+1, self.config, self.url, None, lineno, name)) - name = "" - - def parse_text (self): - """parse a text file with on url per line; comment and blank - lines are ignored - UNUSED and UNTESTED, just use linkchecker `cat file.txt` - """ - lineno = 0 - for line in self.getContent().splitlines(): - lineno += 1 - line = line.strip() - if not line or line.startswith('#'): continue - self.config.appendUrl(GetUrlDataFrom(line, self.recursionLevel+1, - self.config, parentName=self.url, line=lineno)) - - def parse_css (self): - """parse a CSS file for url() patterns""" - lineno = 0 - for line in self.getContent().splitlines(): - lineno += 1 - for mo in linkcheck.linkparse.css_url_re.finditer(line): - column = mo.start("url") - self.config.appendUrl(GetUrlDataFrom(mo.group("url"), - self.recursionLevel+1, self.config, - parentName=self.url, line=lineno, column=column)) - - def __str__ (self): - return ("%s link\n" - "urlname=%s\n" - "parentName=%s\n" - "baseRef=%s\n" - "cached=%s\n" - "recursionLevel=%s\n" - "urlConnection=%s\n" - "line=%s\n" - "column=%s\n" - "name=%s" % \ - (self.scheme, self.urlName, self.parentName, self.baseRef, - self.cached, self.recursionLevel, self.urlConnection, self.line, - self.column, self.name)) -