diff --git a/linkcheck/checker/errorurl.py b/linkcheck/checker/errorurl.py index 392e6448..e26ef573 100644 --- a/linkcheck/checker/errorurl.py +++ b/linkcheck/checker/errorurl.py @@ -27,6 +27,9 @@ class ErrorUrl (urlbase.UrlBase): """ def check_syntax (self): + """ + Log a warning that the URL syntax is invalid or unknown. + """ linkcheck.log.debug(linkcheck.LOG_CHECK, "checking syntax") self.url, is_idn = linkcheck.url.url_norm(self.base_url) self.set_result(_("URL is unrecognized or has invalid syntax"), diff --git a/linkcheck/checker/fileurl.py b/linkcheck/checker/fileurl.py index 3791dd92..3ec97a24 100644 --- a/linkcheck/checker/fileurl.py +++ b/linkcheck/checker/fileurl.py @@ -86,6 +86,12 @@ class FileUrl (urlbase.UrlBase): def __init__ (self, base_url, recursion_level, consumer, parent_url = None, base_ref = None, line=0, column=0, name=u""): + """ + Besides the usual initialization the URL is normed according + to the platform: + - the base URL is made an absolute file:// URL + - under Windows platform the drive specifier is normed + """ super(FileUrl, self).__init__(base_url, recursion_level, consumer, parent_url=parent_url, base_ref=base_ref, line=line, column=column, name=name) @@ -103,6 +109,9 @@ class FileUrl (urlbase.UrlBase): self.base_url, is_idn = linkcheck.url.url_norm(base_url) def build_url (self): + """ + Calls super.build_url() and adds a trailing slash to directories. + """ super(FileUrl, self).build_url() # ignore query and fragment url parts for filesystem urls self.urlparts[3] = self.urlparts[4] = '' @@ -112,6 +121,10 @@ class FileUrl (urlbase.UrlBase): self.url = urlparse.urlunsplit(self.urlparts) def check_connection (self): + """ + Try to open the local file. Under NT systems the case sensitivity + is checked. + """ if self.is_directory(): self.set_result(_("directory")) else: @@ -119,11 +132,13 @@ class FileUrl (urlbase.UrlBase): self.check_case_sensitivity() def check_case_sensitivity (self): + """ + Check if url and windows path name match cases + else there might be problems when copying such + files on web servers that are case sensitive. + """ if os.name != 'nt': return - # Check if url and windows path name match cases - # else there might be problems when copying such - # files on web servers that are case sensitive. path = self.get_os_filename() realpath = get_nt_filename(path) if path != realpath: @@ -133,6 +148,10 @@ class FileUrl (urlbase.UrlBase): pass def get_content (self): + """ + Return file content, or in case of directories a dummy HTML file + with links to the files. + """ if not self.valid: return "" if self.has_content: @@ -143,6 +162,12 @@ class FileUrl (urlbase.UrlBase): return super(FileUrl, self).get_content() def get_directory_content (self): + """ + Get dummy HTML data for the directory content. + + @return: HTML data + @rtype: string + """ t = time.time() files = get_files(self.get_os_filename()) data = linkcheck.checker.get_index_html(files) @@ -153,6 +178,9 @@ class FileUrl (urlbase.UrlBase): return self.data def is_html (self): + """ + Check if file is a parseable HTML file. + """ if linkcheck.checker.extensions['html'].search(self.url): return True if contents['html'].search(self.get_content()): @@ -160,19 +188,43 @@ class FileUrl (urlbase.UrlBase): return False def is_file (self): + """ + This is a file. + + @return: True + @rtype: bool + """ return True def get_os_filename (self): + """ + Construct os specific file path out of the file:// URL. + + @return: file name + @rtype: string + """ path = self.urlparts[2] if os.name == 'nt': path = prepare_urlpath_for_nt(path) return urllib.url2pathname(path) def is_directory (self): + """ + Check if file is a directory. + + @return: True iff file is a directory + @rtype: bool + """ filename = self.get_os_filename() return os.path.isdir(filename) and not os.path.islink(filename) def is_parseable (self): + """ + Check if content is parseable for recursion. + + @return: True if content is parseable + @rtype: bool + """ if self.is_directory(): return True # guess by extension @@ -189,12 +241,17 @@ class FileUrl (urlbase.UrlBase): return False def parse_url (self): + """ + Parse file contents for new links to check. + """ if self.is_directory(): - return self.parse_html() + self.parse_html() + return for key, ro in linkcheck.checker.extensions.items(): if ro.search(self.url): - return getattr(self, "parse_"+key)() + getattr(self, "parse_"+key)() + return for key, ro in contents.items(): if ro.search(self.get_content()[:30]): - return getattr(self, "parse_"+key)() - return None + getattr(self, "parse_"+key)() + return diff --git a/linkcheck/checker/ftpurl.py b/linkcheck/checker/ftpurl.py index 536ba347..285f74c4 100644 --- a/linkcheck/checker/ftpurl.py +++ b/linkcheck/checker/ftpurl.py @@ -49,6 +49,10 @@ class FtpUrl (urlbase.UrlBase, proxysupport.ProxySupport): self.filename = None def check_connection (self): + """ + In case of proxy, delegate to HttpUrl. Else check in this + order: login, changing directory, list the file. + """ # proxy support (we support only http) self.set_proxy(self.consumer.config["proxy"].get(self.scheme)) if self.proxy: @@ -70,7 +74,9 @@ class FtpUrl (urlbase.UrlBase, proxysupport.ProxySupport): return None def get_user_password (self): - # get login credentials + """ + Get credentials to use for login. + """ if self.userinfo: return urllib.splitpasswd(self.userinfo) return super(FtpUrl, self).get_user_password() @@ -165,11 +171,17 @@ class FtpUrl (urlbase.UrlBase, proxysupport.ProxySupport): return files def is_html (self): + """ + See if URL target is a HTML file by looking at the extension. + """ if linkcheck.checker.extensions['html'].search(self.url): return True return False def is_parseable (self): + """ + See if URL target is parseable for recursion. + """ if self.is_directory(): return True for ro in linkcheck.checker.extensions.values(): @@ -178,9 +190,15 @@ class FtpUrl (urlbase.UrlBase, proxysupport.ProxySupport): return False def is_directory (self): + """ + See if URL target is a directory. + """ return self.url.endswith('/') def parse_url (self): + """ + Parse URL target for links. + """ if self.is_directory(): return self.parse_html() for key, ro in linkcheck.checker.extensions.items(): @@ -189,6 +207,10 @@ class FtpUrl (urlbase.UrlBase, proxysupport.ProxySupport): return None def get_content (self): + """ + Return URL target content, or in case of directories a dummy HTML + file with links to the files. + """ if not self.valid: return "" if self.has_content: @@ -213,6 +235,9 @@ class FtpUrl (urlbase.UrlBase, proxysupport.ProxySupport): return self.data def close_connection (self): + """ + Add the open connection to the connection pool. + """ if self.url_connection is None: return # add to cached connections diff --git a/linkcheck/checker/httpurl.py b/linkcheck/checker/httpurl.py index 37f98892..440e4bf4 100644 --- a/linkcheck/checker/httpurl.py +++ b/linkcheck/checker/httpurl.py @@ -47,6 +47,13 @@ DEFAULT_TIMEOUT_SECS = 300 def has_header_value (headers, name, value): + """ + Look in headers for a specific header name and value. + Both name and value are case insensitive. + + @return: True if header name and value are found + @rtype: bool + """ name = name.lower() value = value.lower() for hname, hvalue in headers: @@ -56,6 +63,15 @@ def has_header_value (headers, name, value): def http_persistent (response): + """ + See if the HTTP connection can be kept open according the the + header values found in the response object. + + @param response: response instance + @type response: httplib.HTTPResponse + @return: True if connection is persistent + @rtype: bool + """ headers = response.getheaders() if response.version == 11: return has_header_value(headers, 'Connection', 'Close') @@ -63,6 +79,15 @@ def http_persistent (response): def http_timeout (response): + """ + Get HTTP timeout value, either from the Keep-Alive header or a + default value. + + @param response: response instance + @type response: httplib.HTTPResponse + @return: timeout + @rtype: int + """ timeout = response.getheader("Keep-Alive") if timeout is not None: try: @@ -94,6 +119,10 @@ class HttpUrl (urlbase.UrlBase, proxysupport.ProxySupport): self.persistent = False def build_url (self): + """ + Call super.build_url() and add a missing trailing slash to + the URL if the URL path is empty. + """ super(HttpUrl, self).build_url() # encode userinfo # XXX @@ -104,6 +133,15 @@ class HttpUrl (urlbase.UrlBase, proxysupport.ProxySupport): self.url = urlparse.urlunsplit(self.urlparts) def allows_robots (self, url): + """ + Fetch and parse the robots.txt of given url. Checks if LinkChecker + can access the requested resource. + + @param url: the url to be requested + @type url: string + @return: True if access is granted, otherwise False + @rtype: bool + """ roboturl = self.get_robots_txt_url() user, password = self.get_user_password() return self.consumer.cache.robots_txt_allows_url(roboturl, url, @@ -264,7 +302,9 @@ class HttpUrl (urlbase.UrlBase, proxysupport.ProxySupport): self.check_response(response, fallback_GET) def follow_redirections (self, response): - """follow all redirections of http response""" + """ + Follow all redirections of http response. + """ linkcheck.log.debug(linkcheck.LOG_CHECK, "follow all redirections") redirected = self.url tries = 0 @@ -347,7 +387,9 @@ class HttpUrl (urlbase.UrlBase, proxysupport.ProxySupport): return tries, response def check_response (self, response, fallback_GET): - """check final result""" + """ + Check final result and log it. + """ if response.status >= 400: self.set_result("%r %s" % (response.status, response.reason), valid=False) @@ -439,6 +481,16 @@ class HttpUrl (urlbase.UrlBase, proxysupport.ProxySupport): return response def get_http_object (self, host, scheme): + """ + Open a HTTP connection. + + @param host: the host to connect to + @type host: string of the form [:] + @param scheme: 'http' or 'https' + @type scheme: string + @return: open HTTP(S) connection + @rtype: httplib.HTTP(S)Connection + """ _user, _password = self.get_user_password() key = (scheme, self.urlparts[1], _user, _password) conn = self.consumer.cache.get_connection(key) @@ -458,6 +510,13 @@ class HttpUrl (urlbase.UrlBase, proxysupport.ProxySupport): return h def get_content (self): + """ + Get content of the URL target. The content data is cached after + the first call to this method. + + @return: URL content, decompressed and decoded + @rtype: string + """ if not self.has_content: self.method = "GET" self.has_content = True @@ -481,6 +540,13 @@ class HttpUrl (urlbase.UrlBase, proxysupport.ProxySupport): return self.data def is_html (self): + """ + See if this URL points to a HTML file by looking at the + Content-Type header, file extension and file content. + + @return: True if URL points to HTML file + @rtype: bool + """ if not (self.valid and self.headers): return False if self.headers.gettype()[:9] != "text/html": @@ -493,15 +559,34 @@ class HttpUrl (urlbase.UrlBase, proxysupport.ProxySupport): return True def is_http (self): + """ + This is a HTTP file. + + @return: True + @rtype: bool + """ return True def get_content_type (self): + """ + Get the MIME type from the Content-Type header value, or + 'application/octet-stream' if not found. + + @return: MIME type + @rtype: string + """ ptype = self.headers.get('Content-Type', 'application/octet-stream') if ";" in ptype: ptype = ptype.split(';')[0] return ptype def is_parseable (self): + """ + Check if content is parseable for recursion. + + @return: True if content is parseable + @rtype: bool + """ if not (self.valid and self.headers): return False if self.get_content_type() not in ("text/html", "text/css"): @@ -514,6 +599,9 @@ class HttpUrl (urlbase.UrlBase, proxysupport.ProxySupport): return True def parse_url (self): + """ + Parse file contents for new links to check. + """ ptype = self.get_content_type() if ptype == "text/html": self.parse_html() @@ -522,9 +610,19 @@ class HttpUrl (urlbase.UrlBase, proxysupport.ProxySupport): return None def get_robots_txt_url (self): + """ + Get the according robots.txt URL for this URL. + + @return: robots.txt URL + @rtype: string + """ return "%s://%s/robots.txt" % tuple(self.urlparts[0:2]) def close_connection (self): + """ + If connection is persistent, add it to the connection pool. + Else close the connection. Errors on closing are ignored. + """ if self.url_connection is None: # no connection is open return diff --git a/linkcheck/checker/ignoredurl.py b/linkcheck/checker/ignoredurl.py index 1598e4bf..3db6164d 100644 --- a/linkcheck/checker/ignoredurl.py +++ b/linkcheck/checker/ignoredurl.py @@ -26,7 +26,16 @@ class IgnoredUrl (urlbase.UrlBase): """ def local_check (self): + """ + Only logs a warning that this URL is ignored. + """ self.add_warning(_("%s URL ignored.") % self.scheme.capitalize()) def can_get_content (self): + """ + Ignored URLs have no content. + + @return: False + @rtype: bool + """ return False diff --git a/linkcheck/checker/mailtourl.py b/linkcheck/checker/mailtourl.py index 742e005a..34e15f88 100644 --- a/linkcheck/checker/mailtourl.py +++ b/linkcheck/checker/mailtourl.py @@ -30,12 +30,37 @@ import linkcheck.log import linkcheck.dns.resolver +def _split_address (address): + """ + Split username and hostname of address. The hostname defaults + to 'localhost' if it is not specified. + + @param address: an email address + @type address: string + @return: a tuple (username, hostname) + @rtype: tuple + @raise: LinkCheckerError if address could not be split + """ + split = address.split("@", 1) + if len(split) == 2: + if not split[1]: + return (split[0], "localhost") + return tuple(split) + if len(split) == 1: + return (split[0], "localhost") + raise linkcheck.LinkCheckerError(_("Could not split the mail address")) + + class MailtoUrl (urlbase.UrlBase): """ Url link with mailto scheme. """ def build_url (self): + """ + Call super.build_url(), extract list of mail addresses from URL, + and check their syntax. + """ super(MailtoUrl, self).build_url() self.headers = {} self.addresses = email.Utils.getaddresses([self.cutout_addresses()]) @@ -46,13 +71,20 @@ class MailtoUrl (urlbase.UrlBase): self.addresses.extend(email.Utils.getaddresses([a])) # check syntax of emails for name, addr in self.addresses: - username, domain = self._split_address(addr) + username, domain = _split_address(addr) if not linkcheck.url.is_safe_domain(domain): raise linkcheck.LinkCheckerError(_("Invalid mail syntax")) linkcheck.log.debug(linkcheck.LOG_CHECK, "addresses: %s", self.addresses) def cutout_addresses (self): + """ + Parse all mail addresses out of the URL target. Additionally + store headers. + + @return: comma separated list of email addresses + @rtype: string + """ # cut off leading mailto: and unquote url = urllib.unquote(self.base_url[7:]) # search for cc, bcc, to and store in headers @@ -115,7 +147,7 @@ class MailtoUrl (urlbase.UrlBase): linkcheck.log.debug(linkcheck.LOG_CHECK, "checking mail address %r", mail) linkcheck.log.debug(linkcheck.LOG_CHECK, "splitting address") - username, domain = self._split_address(mail) + username, domain = _split_address(mail) linkcheck.log.debug(linkcheck.LOG_CHECK, "looking up MX mailhost %r", domain) answers = linkcheck.dns.resolver.query(domain, 'MX') @@ -140,7 +172,12 @@ class MailtoUrl (urlbase.UrlBase): def check_smtp_connect (self, mxdata, username): """ - mxdata is a list of (preference, host) tuples to check for + Connect to SMTP servers and check emails. + + @param mxdata: list of (preference, host) tuples to check for + @type mxdata: list + @param username: the username to verify + @type username: string """ smtpconnect = 0 for preference, host in mxdata: @@ -178,17 +215,6 @@ class MailtoUrl (urlbase.UrlBase): else: self.set_result(_("Found MX mail host %(host)s") % {'host': host}) - def _split_address (self, address): - split = address.split("@", 1) - if len(split) == 2: - if not split[1]: - return (split[0], "localhost") - return tuple(split) - if len(split) == 1: - return (split[0], "localhost") - raise linkcheck.LinkCheckerError( - _("Could not split the mail address")) - def close_connection (self): """ Close a possibly opened SMTP connection. @@ -218,5 +244,6 @@ class MailtoUrl (urlbase.UrlBase): mailto: URLs do not have any content @return: False + @rtype: bool """ return False diff --git a/linkcheck/checker/nntpurl.py b/linkcheck/checker/nntpurl.py index eb762b68..ec330cf9 100644 --- a/linkcheck/checker/nntpurl.py +++ b/linkcheck/checker/nntpurl.py @@ -38,12 +38,16 @@ class NntpUrl (urlbase.UrlBase): """ def check_connection (self): + """ + Connect to NNTP server and try to request the URL article + resource (if specified). + """ nntpserver = self.host or self.consumer.config["nntpserver"] if not nntpserver: self.add_warning( _("No NNTP server was specified, skipping this URL.")) return - nntp = self._connectNntp(nntpserver) + nntp = self._connect_nntp(nntpserver) group = self.urlparts[2] while group[:1] == '/': group = group[1:] @@ -63,7 +67,7 @@ class NntpUrl (urlbase.UrlBase): # group name is the empty string self.add_warning(_("No newsgroup specified in NNTP URL.")) - def _connectNntp (self, nntpserver): + def _connect_nntp (self, nntpserver): """ This is done only once per checking task. Also, the newly introduced error codes 504 and 505 (both inclining "Too busy, retry @@ -89,4 +93,10 @@ class NntpUrl (urlbase.UrlBase): return nntp def can_get_content (self): + """ + NNTP urls have no content. + + @return: False + @rtype: bool + """ return False diff --git a/linkcheck/checker/telneturl.py b/linkcheck/checker/telneturl.py index c577dbb4..6aa0704d 100644 --- a/linkcheck/checker/telneturl.py +++ b/linkcheck/checker/telneturl.py @@ -32,6 +32,10 @@ class TelnetUrl (urlbase.UrlBase): """ def build_url (self): + """ + Call super.build_url(), set default telnet port and initialize + the login credentials. + """ super(TelnetUrl, self).build_url() # default port if self.port is None: @@ -43,12 +47,19 @@ class TelnetUrl (urlbase.UrlBase): self.user, self.password = self.get_user_password() def local_check (self): + """ + Warn about empty host names. Else call super.local_check(). + """ if not self.host: self.set_result(_("Host is empty"), valid=False) return super(TelnetUrl, self).local_check() def check_connection (self): + """ + Open a telnet connection and try to login. Expected login + label is "login: ", expected password label is "Password: ". + """ self.url_connection = telnetlib.Telnet() if self.consumer.config.get("debug"): self.url_connection.set_debuglevel(1) @@ -63,4 +74,10 @@ class TelnetUrl (urlbase.UrlBase): self.url_connection.write("exit\n") def can_get_content (self): + """ + Telnet URLs have no content. + + @return: False + @rtype: bool + """ return False diff --git a/linkcheck/checker/urlbase.py b/linkcheck/checker/urlbase.py index d38b7626..9207d43d 100644 --- a/linkcheck/checker/urlbase.py +++ b/linkcheck/checker/urlbase.py @@ -467,6 +467,10 @@ class UrlBase (object): return h.follow def check_anchors (self): + """ + If URL was valid and a HTML resource, check the anchors and + log a warning when an anchor was not found. + """ if not (self.valid and self.anchor and self.is_html() and \ self.can_get_content()): # do not bother @@ -487,12 +491,24 @@ class UrlBase (object): self.add_warning(_("Anchor #%s not found.") % self.anchor) def is_extern (self): - # apply filter + """ + Determine if this URL is extern or not. + + @return: True if URL is extern, else False + @rtype: bool + """ linkcheck.log.debug(linkcheck.LOG_CHECK, "extern=%s", self.extern) return self.extern[0] and \ (self.consumer.config["externstrictall"] or self.extern[1]) def _get_extern (self, url): + """ + Match URL against intern and extern link patterns, according + to the configured denyallow order. + + @return: a tuple (is_extern, is_strict) + @rtype: tuple (bool, bool) + """ if not (self.consumer.config["externlinks"] or \ self.consumer.config["internlinks"]): return (0, 0) @@ -691,8 +707,20 @@ class UrlBase (object): ]) def __str__ (self): + """ + Get URL info. + + @return: URL info, encoded with the output logger encoding + @rtype: string + """ s = self.serialized() return self.consumer.config['logger'].encode(s) def __repr__ (self): + """ + Get URL info. + + @return: URL info + @rtype: unicode + """ return u"<%s >" % self.serialized()