Use constants for warning tags, avoiding typos in string constants. And move the constants into a separate module const.py

git-svn-id: https://linkchecker.svn.sourceforge.net/svnroot/linkchecker/trunk/linkchecker@3611 e7d03fd6-7b0d-0410-9947-9c21f3af8025
This commit is contained in:
calvin 2007-11-29 07:50:22 +00:00
parent e007ea5dae
commit 9cf3314eab
18 changed files with 286 additions and 284 deletions

View file

@ -42,6 +42,12 @@
Changed: linkchecker, linkcheck/lc_cgi.py,
linkcheck/checker/{__init__,urlbase,httpurl,unknownurl}.py
* Fix typo in warnings and use constants for the warning strings
to avoid this in the future.
Type: bugfix
Closes: SF bug #1838803
Changed: linkcheck/checker/__init__.py
4.7 "300" (released 17.6.2007)
* Mention in the documentation that --anchors enables logging of

View file

@ -28,20 +28,16 @@ class Timeout (StandardError):
pass
class Empty (StandardError):
"Exception raised by get()."
"""Exception raised by get()."""
pass
class UrlQueue (object):
"""
A queue supporting several consumer tasks. The task_done() idea is
from the Python 2.5 implementation of Queue.Queue().
"""
"""A queue supporting several consumer tasks. The task_done() idea is
from the Python 2.5 implementation of Queue.Queue()."""
def __init__ (self):
"""
Initialize the queue state and task counters.
"""
"""Initialize the queue state and task counters."""
# Note: don't put a maximum size on the queue since it would
# lead to deadlocks when all worker threads called put().
self.queue = collections.deque()
@ -79,8 +75,7 @@ class UrlQueue (object):
return not self.queue
def get (self, timeout=None):
"""
Get first not-in-progress url from the queue and
"""Get first not-in-progress url from the queue and
return it. If no such url is available return None. The
url might be already cached.
"""
@ -139,9 +134,7 @@ class UrlQueue (object):
self.mutex.release()
def _put (self, url_data):
"""
Put URL in queue, increase number of unfished tasks.
"""
"""Put URL in queue, increase number of unfished tasks."""
if self.shutdown:
# don't accept more URLs
return
@ -199,9 +192,7 @@ class UrlQueue (object):
self.all_tasks_done.release()
def _cache_url (self, key, url_data):
"""
Put URL result data into cache.
"""
"""Put URL result data into cache."""
assert None == linkcheck.log.debug(linkcheck.LOG_CACHE,
"Caching %r", key)
assert key in self.in_progress, \
@ -220,9 +211,7 @@ class UrlQueue (object):
self.checked[key] = data
def _sort (self):
"""
Sort URL queue by putting all cached URLs at the beginning.
"""
"""Sort URL queue by putting all cached URLs at the beginning."""
newqueue = collections.deque()
while self.queue:
url_data = self.queue.popleft()
@ -266,9 +255,7 @@ class UrlQueue (object):
self.all_tasks_done.release()
def do_shutdown (self):
"""
Shutdown the queue by not accepting any more URLs.
"""
"""Shutdown the queue by not accepting any more URLs."""
self.mutex.acquire()
try:
unfinished = self.unfinished_tasks - len(self.queue)

View file

@ -20,103 +20,13 @@ Main functions for link checking.
import os
import cgi
import socket
import select
import re
import urllib
import nntplib
import ftplib
import linkcheck.httplib2
import linkcheck.strformat
import linkcheck.dns.exception
from linkcheck.strformat import unicode_safe
from linkcheck.url import url_is_absolute
# helper alias
unicode_safe = linkcheck.strformat.unicode_safe
# Catch these exception on syntax checks.
ExcSyntaxList = [
linkcheck.LinkCheckerError,
]
# Catch these exceptions on content and connect checks. All other
# exceptions are internal or system errors
ExcCacheList = [
IOError,
OSError, # OSError is thrown on Windows when a file is not found
linkcheck.LinkCheckerError,
linkcheck.dns.exception.DNSException,
socket.error,
select.error,
# nttp errors (including EOFError)
nntplib.error_reply,
nntplib.error_temp,
nntplib.error_perm,
nntplib.error_proto,
EOFError,
# http error
linkcheck.httplib2.error,
# ftp errors
ftplib.error_reply,
ftplib.error_temp,
ftplib.error_perm,
ftplib.error_proto,
]
# Exceptions that do not put the URL in the cache so that the URL can
# be checked again.
ExcNoCacheList = [
socket.timeout,
]
ExcList = ExcCacheList + ExcNoCacheList
# registered warnings
Warnings = {
"url-effective-url":
_("The effective URL is different from the original."),
"url-error-getting-content":
_("Could not get the content of the URL."),
"url-unicode-domain": _("URL uses a unicode domain."),
"url-unnormed": _("URL is not normed."),
"url-anchor-not-found": _("URL anchor was not found."),
"url-warnregex-found":
_("The warning regular expression was found in the URL contents."),
"url-content-too-large": _("The URL content is too large."),
"file-missing-slash": _("The file: URL is missing a trailing slash."),
"file-system-path":
_("The file: path is not the same as the system specific path."),
"ftp-missing-slash": _("The ftp: URL is missing a trailing slash."),
"http-robots-denied": _("The http: URL checking has been denied."),
"http-no-anchor-support": _("The HTTP server had no anchor support."),
"http-moved-permanent": _("The URL has moved permanently."),
"http-wrong-redirect":
_("The URL has been redirected to an URL of a different type."),
"http-empty-content": _("The URL had no content."),
"http-cookie-store-error": _("An error occurred while storing a cookie."),
"http-decompress-error":
_("An error occurred while decompressing the URL content."),
"http-unsupported-encoding":
_("The URL content is encoded with an unknown encoding."),
"ignore-url": _("The URL has been ignored."),
"mail-no-addresses": _("The mailto: URL contained no addresses."),
"mail-no-mx-host": _("The mail MX host could not be found."),
"mail-unverified-address":
_("The mailto: address could not be verified."),
"mail-no-connection":
_("No connection to a MX host could be established."),
"nntp-no-server": _("No NNTP server was found."),
"nntp-no-newsgroup": _("The NNTP newsgroup could not be found."),
"nntp-busy": _("The NNTP server was busy."),
}
# file extensions we can parse recursively
extensions = {
"html": re.compile(r'(?i)\.s?html?$'),
"opera": re.compile(r'^(?i)opera.adr$'), # opera bookmark file
"css": re.compile(r'(?i)\.css$'), # CSS stylesheet
}
# all the URL classes
import linkcheck.checker.fileurl
import linkcheck.checker.unknownurl
import linkcheck.checker.ftpurl
@ -140,11 +50,11 @@ def absolute_url (base_url, base_ref, parent_url):
@param parent_url: url of parent document
@type parent_url: string or None
"""
if base_url and linkcheck.url.url_is_absolute(base_url):
if base_url and url_is_absolute(base_url):
return base_url
elif base_ref and linkcheck.url.url_is_absolute(base_ref):
elif base_ref and url_is_absolute(base_ref):
return base_ref
elif parent_url and linkcheck.url.url_is_absolute(parent_url):
elif parent_url and url_is_absolute(parent_url):
return parent_url
return u""

136
linkcheck/checker/const.py Normal file
View file

@ -0,0 +1,136 @@
# -*- coding: iso-8859-1 -*-
# Copyright (C) 2000-2007 Bastian Kleineidam
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
"""
Helper constants.
"""
import socket
import select
import nntplib
import ftplib
import re
import linkcheck
# Catch these exception on syntax checks.
ExcSyntaxList = [
linkcheck.LinkCheckerError,
]
# Catch these exceptions on content and connect checks. All other
# exceptions are internal or system errors
ExcCacheList = [
IOError,
OSError, # OSError is thrown on Windows when a file is not found
linkcheck.LinkCheckerError,
linkcheck.dns.exception.DNSException,
socket.error,
select.error,
# nttp errors (including EOFError)
nntplib.error_reply,
nntplib.error_temp,
nntplib.error_perm,
nntplib.error_proto,
EOFError,
# http error
linkcheck.httplib2.error,
# ftp errors
ftplib.error_reply,
ftplib.error_temp,
ftplib.error_perm,
ftplib.error_proto,
]
# Exceptions that do not put the URL in the cache so that the URL can
# be checked again.
ExcNoCacheList = [
socket.timeout,
]
ExcList = ExcCacheList + ExcNoCacheList
WARN_URL_EFFECTIVE_URL = "url-effective-url"
WARN_URL_ERROR_GETTING_CONTENT = "url-error-getting-content"
WARN_URL_UNICODE_DOMAIN = "url-unicode-domain"
WARN_URL_UNNORMED = "url-unnormed"
WARN_URL_ANCHOR_NOT_FOUND = "url-anchor-not-found"
WARN_URL_WARNREGEX_FOUND = "url-warnregex-found"
WARN_URL_CONTENT_TOO_LARGE = "url-content-too-large"
WARN_FILE_MISSING_SLASH = "file-missing-slash"
WARN_FILE_SYSTEM_PATH = "file-system-path"
WARN_FTP_MISSING_SLASH = "ftp-missing-slash"
WARN_HTTP_ROBOTS_DENIED = "http-robots-denied"
WARN_HTTP_NO_ANCHOR_SUPPORT = "http-no-anchor-support"
WARN_HTTP_MOVED_PERMANENT = "http-moved-permanent"
WARN_HTTP_WRONG_REDIRECT = "http-wrong-redirect"
WARN_HTTP_EMPTY_CONTENT = "http-empty-content"
WARN_HTTP_COOKIE_STORE_ERROR = "http-cookie-store-error"
WARN_HTTP_DECOMPRESS_ERROR = "http-decompress-error"
WARN_HTTP_UNSUPPORTED_ENCODING = "http-unsupported-encoding"
WARN_IGNORE_URL = "ignore-url"
WARN_MAIL_NO_ADDRESSES = "mail-no-addresses"
WARN_MAIL_NO_MX_HOST = "mail-no-mx-host"
WARN_MAIL_UNVERIFIED_ADDRESS = "mail-unverified-address"
WARN_MAIL_NO_CONNECTION = "mail-no-connection"
WARN_NNTP_NO_SERVER = "nntp-no-server"
WARN_NNTP_NO_NEWSGROUP = "nntp-no-newsgroup"
WARN_NNTP_BUSY = "nntp-busy"
# registered warnings
Warnings = {
WARN_URL_EFFECTIVE_URL:
_("The effective URL is different from the original."),
WARN_URL_ERROR_GETTING_CONTENT:
_("Could not get the content of the URL."),
WARN_URL_UNICODE_DOMAIN: _("URL uses a unicode domain."),
WARN_URL_UNNORMED: _("URL is not normed."),
WARN_URL_ANCHOR_NOT_FOUND: _("URL anchor was not found."),
WARN_URL_WARNREGEX_FOUND:
_("The warning regular expression was found in the URL contents."),
WARN_URL_CONTENT_TOO_LARGE: _("The URL content is too large."),
WARN_FILE_MISSING_SLASH: _("The file: URL is missing a trailing slash."),
WARN_FILE_SYSTEM_PATH:
_("The file: path is not the same as the system specific path."),
WARN_FTP_MISSING_SLASH: _("The ftp: URL is missing a trailing slash."),
WARN_HTTP_ROBOTS_DENIED: _("The http: URL checking has been denied."),
WARN_HTTP_NO_ANCHOR_SUPPORT: _("The HTTP server had no anchor support."),
WARN_HTTP_MOVED_PERMANENT: _("The URL has moved permanently."),
WARN_HTTP_WRONG_REDIRECT:
_("The URL has been redirected to an URL of a different type."),
WARN_HTTP_EMPTY_CONTENT: _("The URL had no content."),
WARN_HTTP_COOKIE_STORE_ERROR:
_("An error occurred while storing a cookie."),
WARN_HTTP_DECOMPRESS_ERROR:
_("An error occurred while decompressing the URL content."),
WARN_HTTP_UNSUPPORTED_ENCODING:
_("The URL content is encoded with an unknown encoding."),
WARN_IGNORE_URL: _("The URL has been ignored."),
WARN_MAIL_NO_ADDRESSES: _("The mailto: URL contained no addresses."),
WARN_MAIL_NO_MX_HOST: _("The mail MX host could not be found."),
WARN_MAIL_UNVERIFIED_ADDRESS:
_("The mailto: address could not be verified."),
WARN_MAIL_NO_CONNECTION:
_("No connection to a MX host could be established."),
WARN_NNTP_NO_SERVER: _("No NNTP server was found."),
WARN_NNTP_NO_NEWSGROUP: _("The NNTP newsgroup could not be found."),
WARN_NNTP_BUSY: _("The NNTP server was busy."),
}
# file extensions we can parse recursively
extensions = {
"html": re.compile(r'(?i)\.s?html?$'),
"opera": re.compile(r'^(?i)opera.adr$'), # opera bookmark file
"css": re.compile(r'(?i)\.css$'), # CSS stylesheet
}

View file

@ -29,6 +29,7 @@ import urlbase
import linkcheck.log
import linkcheck.checker
import linkcheck.fileutil
from const import WARN_FILE_MISSING_SLASH, WARN_FILE_SYSTEM_PATH
# if file extension lookup was unsuccessful, look at the content
contents = {
@ -118,7 +119,7 @@ class FileUrl (urlbase.UrlBase):
self.urlparts[3] = self.urlparts[4] = ''
if self.is_directory() and not self.urlparts[2].endswith('/'):
self.add_warning(_("Added trailing slash to directory."),
tag="file-missing-slash")
tag=WARN_FILE_MISSING_SLASH)
self.urlparts[2] += '/'
self.url = urlparse.urlunsplit(self.urlparts)
@ -149,7 +150,7 @@ class FileUrl (urlbase.UrlBase):
"system path %(realpath)r. You should always use "
"the system path in URLs.") % \
{"path": path, "realpath": realpath},
tag="file-system-path")
tag=WARN_FILE_SYSTEM_PATH)
def get_content (self):
"""
@ -185,7 +186,7 @@ class FileUrl (urlbase.UrlBase):
"""
Check if file is a parseable HTML file.
"""
if linkcheck.checker.extensions['html'].search(self.url):
if linkcheck.checker.const.extensions['html'].search(self.url):
return True
if contents['html'].search(self.get_content()):
return True
@ -232,7 +233,7 @@ class FileUrl (urlbase.UrlBase):
if self.is_directory():
return True
# guess by extension
for ro in linkcheck.checker.extensions.itervalues():
for ro in linkcheck.checker.const.extensions.itervalues():
if ro.search(self.url):
return True
# try to read content (can fail, so catch error)
@ -251,7 +252,7 @@ class FileUrl (urlbase.UrlBase):
if self.is_directory():
self.parse_html()
return
for key, ro in linkcheck.checker.extensions.iteritems():
for key, ro in linkcheck.checker.const.extensions.iteritems():
if ro.search(self.url):
getattr(self, "parse_"+key)()
return

View file

@ -28,6 +28,7 @@ import proxysupport
import httpurl
import internpaturl
import linkcheck.ftpparse._ftpparse as ftpparse
from const import WARN_FTP_MISSING_SLASH
DEFAULT_TIMEOUT_SECS = 300
@ -146,7 +147,7 @@ class FtpUrl (internpaturl.InternPatternUrl, proxysupport.ProxySupport):
if not self.url.endswith('/'):
self.add_warning(
_("Missing trailing directory slash in ftp url."),
tag="ftp-missing-slash")
tag=WARN_FTP_MISSING_SLASH)
self.url += '/'
return
raise ftplib.error_perm("550 File not found")
@ -180,7 +181,7 @@ class FtpUrl (internpaturl.InternPatternUrl, proxysupport.ProxySupport):
"""
See if URL target is a HTML file by looking at the extension.
"""
if linkcheck.checker.extensions['html'].search(self.url):
if linkcheck.checker.const.extensions['html'].search(self.url):
return True
return False
@ -190,7 +191,7 @@ class FtpUrl (internpaturl.InternPatternUrl, proxysupport.ProxySupport):
"""
if self.is_directory():
return True
for ro in linkcheck.checker.extensions.itervalues():
for ro in linkcheck.checker.const.extensions.itervalues():
if ro.search(self.url):
return True
return False
@ -208,7 +209,7 @@ class FtpUrl (internpaturl.InternPatternUrl, proxysupport.ProxySupport):
if self.is_directory():
self.parse_html()
return
for key, ro in linkcheck.checker.extensions.iteritems():
for key, ro in linkcheck.checker.const.extensions.iteritems():
if ro.search(self.url):
getattr(self, "parse_"+key)()

View file

@ -35,6 +35,11 @@ import linkcheck.httplib2
import httpheaders as headers
import internpaturl
import proxysupport
# import warnings
from const import WARN_HTTP_ROBOTS_DENIED, WARN_HTTP_NO_ANCHOR_SUPPORT, \
WARN_HTTP_WRONG_REDIRECT, WARN_HTTP_MOVED_PERMANENT, \
WARN_HTTP_EMPTY_CONTENT, WARN_HTTP_COOKIE_STORE_ERROR, \
WARN_HTTP_DECOMPRESS_ERROR, WARN_HTTP_UNSUPPORTED_ENCODING
# helper alias
unicode_safe = linkcheck.strformat.unicode_safe
@ -163,7 +168,7 @@ class HttpUrl (internpaturl.InternPatternUrl, proxysupport.ProxySupport):
# remove all previously stored results
self.add_warning(
_("Access denied by robots.txt, checked only syntax."),
tag="http-robots-denied")
tag=WARN_HTTP_ROBOTS_DENIED)
self.set_result(u"syntax OK")
return
# check for amazon server quirk
@ -186,7 +191,7 @@ class HttpUrl (internpaturl.InternPatternUrl, proxysupport.ProxySupport):
if self.no_anchor:
self.add_warning(_("Server %r had no anchor support, removed"
" anchor from request.") % server,
tag="http-no-anchor-support")
tag=WARN_HTTP_NO_ANCHOR_SUPPORT)
# redirections might have changed the URL
newurl = urlparse.urlunsplit(self.urlparts)
if self.url != newurl:
@ -315,8 +320,7 @@ class HttpUrl (internpaturl.InternPatternUrl, proxysupport.ProxySupport):
newurl = unicode_safe(newurl)
assert None == linkcheck.log.debug(linkcheck.LOG_CHECK,
"Redirected to %r", newurl)
self.add_info(_("Redirected to %(url)s.") % {'url': newurl},
tag="http-redirect")
self.add_info(_("Redirected to %(url)s.") % {'url': newurl})
# norm base url - can raise UnicodeError from url.idna_encode()
redirected, is_idn = linkcheck.checker.urlbase.url_norm(newurl)
assert None == linkcheck.log.debug(linkcheck.LOG_CHECK,
@ -337,7 +341,7 @@ class HttpUrl (internpaturl.InternPatternUrl, proxysupport.ProxySupport):
self.add_warning(
_("Access to redirected URL denied by robots.txt, "
"checked only syntax."),
tag="http-robots-denied")
tag=WARN_HTTP_ROBOTS_DENIED)
self.set_result(u"syntax OK")
return -1, response
# see about recursive redirect
@ -364,7 +368,7 @@ class HttpUrl (internpaturl.InternPatternUrl, proxysupport.ProxySupport):
self.add_warning(
_("HTTP 301 (moved permanent) encountered: you"
" should update this link."),
tag="http-moved-permanent")
tag=WARN_HTTP_MOVED_PERMANENT)
self.has301status = True
# check cache again on the changed URL
if self.aggregate.urlqueue.checked_redirect(redirected, self):
@ -375,7 +379,7 @@ class HttpUrl (internpaturl.InternPatternUrl, proxysupport.ProxySupport):
self.add_warning(
_("Redirection to different URL type encountered; "
"the original URL was %r.") % self.url,
tag="http-wrong-redirect")
tag=WARN_HTTP_WRONG_REDIRECT)
newobj = linkcheck.checker.get_url_from(
redirected, self.recursion_level, self.aggregate,
parent_url=self.parent_url, base_ref=self.base_ref,
@ -413,7 +417,7 @@ class HttpUrl (internpaturl.InternPatternUrl, proxysupport.ProxySupport):
if response.status == 204:
# no content
self.add_warning(unicode_safe(response.reason),
tag="http-empty-content")
tag=WARN_HTTP_EMPTY_CONTENT)
# store cookies for valid links
if self.aggregate.config['storecookies']:
for c in self.cookies:
@ -428,7 +432,7 @@ class HttpUrl (internpaturl.InternPatternUrl, proxysupport.ProxySupport):
except Cookie.CookieError, msg:
self.add_warning(_("Could not store cookies: %(msg)s.") %
{'msg': str(msg)},
tag="http-cookie-store-error")
tag=WARN_HTTP_COOKIE_STORE_ERROR)
if response.status >= 200:
self.set_result(u"%r %s" % (response.status, response.reason))
else:
@ -569,7 +573,7 @@ class HttpUrl (internpaturl.InternPatternUrl, proxysupport.ProxySupport):
except zlib.error, msg:
self.add_warning(_("Decompress error %(err)s") %
{"err": str(msg)},
tag="http-decompress-error")
tag=WARN_HTTP_DECOMPRESS_ERROR)
f = StringIO.StringIO(self.data)
self.data = f.read()
self.downloadtime = time.time() - t
@ -591,7 +595,7 @@ class HttpUrl (internpaturl.InternPatternUrl, proxysupport.ProxySupport):
if encoding and encoding not in _supported_encodings and \
encoding != 'identity':
self.add_warning(_('Unsupported content encoding %r.') % encoding,
tag="http-unsupported-encoding")
tag=WARN_HTTP_UNSUPPORTED_ENCODING)
return False
return True
@ -620,7 +624,7 @@ class HttpUrl (internpaturl.InternPatternUrl, proxysupport.ProxySupport):
if encoding and encoding not in _supported_encodings and \
encoding != 'identity':
self.add_warning(_('Unsupported content encoding %r.') % encoding,
tag="http-unsupported-encoding")
tag=WARN_HTTP_UNSUPPORTED_ENCODING)
return False
return True

View file

@ -26,6 +26,8 @@ import email.Utils
import urlbase
import linkcheck.log
import linkcheck.dns.resolver
from const import WARN_MAIL_NO_ADDRESSES, WARN_MAIL_NO_MX_HOST, \
WARN_MAIL_UNVERIFIED_ADDRESS, WARN_MAIL_NO_CONNECTION
def _split_address (address):
@ -135,7 +137,7 @@ class MailtoUrl (urlbase.UrlBase):
"""
if not self.addresses:
self.add_warning(_("No addresses found."),
tag="mail-no-addresses")
tag=WARN_MAIL_NO_ADDRESSES)
return
for name, mail in self.addresses:
self.check_smtp_domain(name, mail)
@ -156,7 +158,7 @@ class MailtoUrl (urlbase.UrlBase):
if len(answers) == 0:
self.add_warning(_("No MX mail host for %(domain)s found.") %
{'domain': domain},
tag="mail-no-mx-host")
tag=WARN_MAIL_NO_MX_HOST)
answers = linkcheck.dns.resolver.query(domain, 'A')
if len(answers) == 0:
self.set_result(_("No host for %(domain)s found.") %
@ -216,12 +218,12 @@ class MailtoUrl (urlbase.UrlBase):
" But mail will be sent anyway.") % d)
else:
self.add_warning(_("Unverified address: %(info)s.") % d,
tag="mail-unverified-address")
tag=WARN_MAIL_UNVERIFIED_ADDRESS)
except smtplib.SMTPException, msg:
self.add_warning(
_("MX mail host %(host)s did not accept connections: "
"%(error)s.") % {'host': host, 'error': str(msg)},
tag="mail-no-connection")
tag=WARN_MAIL_NO_CONNECTION)
if smtpconnect:
break
if not smtpconnect:

View file

@ -27,6 +27,8 @@ import random
import linkcheck
import urlbase
import linkcheck.log
from const import WARN_NNTP_NO_SERVER, WARN_NNTP_NO_NEWSGROUP, \
WARN_NNTP_BUSY
random.seed()
@ -44,7 +46,7 @@ class NntpUrl (urlbase.UrlBase):
if not nntpserver:
self.add_warning(
_("No NNTP server was specified, skipping this URL."),
tag="nntp-no-server")
tag=WARN_NNTP_NO_SERVER)
return
nntp = self._connect_nntp(nntpserver)
group = self.urlparts[2]
@ -64,7 +66,7 @@ class NntpUrl (urlbase.UrlBase):
else:
# group name is the empty string
self.add_warning(_("No newsgroup specified in NNTP URL."),
tag="nttp-no-newsgroup")
tag=WARN_NNTP_NO_NEWSGROUP)
def _connect_nntp (self, nntpserver):
"""
@ -86,10 +88,10 @@ class NntpUrl (urlbase.UrlBase):
raise
if nntp is None:
raise linkcheck.LinkCheckerError(
_("NTTP server too busy; tried more than %d times.") % tries)
_("NNTP server too busy; tried more than %d times.") % tries)
if value is not None:
self.add_warning(_("NNTP busy: %s.") % str(value),
tag="nttp-busy")
tag=WARN_NNTP_BUSY)
return nntp
def can_get_content (self):

View file

@ -96,9 +96,10 @@ class TestHttp (httptest.HttpServerTest):
u"info Redirected to %s." % rurl,
u"valid",
u"url newurl.html (cached)",
u"cache key %s" % nurl.replace("redirect", "newurl"),
u"real url %s" % rurl.replace("redirect", "newurl"),
u"cache key %s" % rurl,
u"real url %s" % rurl,
u"name Recursive Redirect",
u"info Redirected to %s." % rurl,
u"valid",
]
self.direct(url, resultlines, recursionlevel=99)

View file

@ -20,6 +20,7 @@ Handle uncheckable URLs.
import re
import urlbase
from const import WARN_IGNORE_URL
ignored_schemes = r"""^(
acap # application configuration access protocol
@ -77,7 +78,7 @@ class UnknownUrl (urlbase.UrlBase):
self.add_info(_("Outside of domain filter, checked only syntax."))
elif self.ignored():
self.add_warning(_("%s URL ignored.") % self.scheme.capitalize(),
tag="ignore-url")
tag=WARN_IGNORE_URL)
else:
self.set_result(_("URL is unrecognized or has invalid syntax"),
valid=False)

View file

@ -39,6 +39,10 @@ import linkcheck.containers
import linkcheck.log
import linkcheck.httplib2
import linkcheck.HtmlParser.htmlsax
from const import WARN_URL_EFFECTIVE_URL, WARN_URL_UNICODE_DOMAIN, \
WARN_URL_UNNORMED, WARN_URL_ERROR_GETTING_CONTENT, \
WARN_URL_ANCHOR_NOT_FOUND, WARN_URL_WARNREGEX_FOUND, \
WARN_URL_CONTENT_TOO_LARGE
# helper alias
unicode_safe = linkcheck.strformat.unicode_safe
@ -66,9 +70,7 @@ def url_norm (url):
class UrlBase (object):
"""
An URL with additional information like validity etc.
"""
"""An URL with additional information like validity etc."""
def __init__ (self, base_url, recursion_level, aggregate,
parent_url = None, base_ref = None,
@ -293,9 +295,9 @@ class UrlBase (object):
effectiveurl = urlparse.urlunsplit(self.urlparts)
if self.url != effectiveurl:
self.add_warning(_("Effective URL %r.") % effectiveurl,
tag="url-effective-url")
tag=WARN_URL_EFFECTIVE_URL)
self.url = effectiveurl
except tuple(linkcheck.checker.ExcSyntaxList), msg:
except tuple(linkcheck.checker.const.ExcSyntaxList), msg:
self.set_result(unicode_safe(msg), valid=False)
return
self.set_cache_keys()
@ -312,11 +314,11 @@ class UrlBase (object):
is not yet widely supported. You should use
the URL %(idna_url)r instead.""") % \
{"url": self.base_url, "idna_url": base_url},
tag="url-unicode-domain")
tag=WARN_URL_UNICODE_DOMAIN)
elif self.base_url != base_url:
self.add_warning(
_("Base URL is not properly normed. Normed URL is %(url)s.") %
{'url': base_url}, tag="url-unnormed")
{'url': base_url}, tag=WARN_URL_UNNORMED)
# make url absolute
if self.base_ref:
# use base reference as parent url
@ -404,7 +406,7 @@ class UrlBase (object):
self.add_country_info()
if self.aggregate.config["anchors"]:
self.check_anchors()
except tuple(linkcheck.checker.ExcList):
except tuple(linkcheck.checker.const.ExcList):
value = self.handle_exception()
# make nicer error msg for unknown hosts
if isinstance(value, socket.error) and value[0] == -2:
@ -421,7 +423,7 @@ class UrlBase (object):
"checking content")
try:
self.check_content(warningregex)
except tuple(linkcheck.checker.ExcList):
except tuple(linkcheck.checker.const.ExcList):
value = self.handle_exception()
self.set_result(unicode_safe(value), valid=False)
@ -432,10 +434,10 @@ class UrlBase (object):
self.parse_url()
# check content size
self.check_size()
except tuple(linkcheck.checker.ExcList):
except tuple(linkcheck.checker.const.ExcList):
value = self.handle_exception()
self.add_warning(_("could not get content: %r") % str(value),
tag="url-error-getting-content")
tag=WARN_URL_ERROR_GETTING_CONTENT)
# close
self.close_connection()
@ -461,7 +463,7 @@ class UrlBase (object):
assert None == linkcheck.log.debug(linkcheck.LOG_CHECK,
"exception %s", traceback.format_tb(tb))
# note: etype must be the exact class, not a subclass
if (etype in linkcheck.checker.ExcNoCacheList) or \
if (etype in linkcheck.checker.const.ExcNoCacheList) or \
(etype == socket.error and value[0]==errno.EBADF) or \
not value:
# EBADF occurs when operating on an already socket
@ -556,7 +558,7 @@ class UrlBase (object):
if [x for x in handler.urls if x[0] == self.anchor]:
return
self.add_warning(_("Anchor #%s not found.") % self.anchor,
tag="url-anchor-not-found")
tag=WARN_URL_ANCHOR_NOT_FOUND)
def set_extern (self, url):
"""
@ -615,7 +617,7 @@ class UrlBase (object):
match = warningregex.search(self.get_content())
if match:
self.add_warning(_("Found %r in link contents.") % match.group(),
tag="url-warnregex-found")
tag=WARN_URL_WARNREGEX_FOUND)
def check_size (self):
"""
@ -628,7 +630,7 @@ class UrlBase (object):
_("Content size %(dlsize)s is larger than %(maxbytes)s.") %
{"dlsize": linkcheck.strformat.strsize(self.dlsize),
"maxbytes": linkcheck.strformat.strsize(maxbytes)},
tag="url-content-too-large")
tag=WARN_URL_CONTENT_TOO_LARGE)
def parse_url (self):
"""

View file

@ -29,8 +29,7 @@ import console
def check_urls (aggregate):
"""
Main check function; checks all configured URLs until interrupted
"""Main check function; checks all configured URLs until interrupted
with Ctrl-C.
@return: None
"""
@ -54,9 +53,7 @@ def check_urls (aggregate):
def check_url (aggregate):
"""
Helper function waiting for URL queue.
"""
"""Helper function waiting for URL queue."""
while True:
try:
aggregate.urlqueue.join(timeout=1)
@ -71,9 +68,8 @@ def check_url (aggregate):
def interrupt (aggregate):
"""
Interrupt execution and shutdown, ignoring any subsequent interrupts.
"""
"""Interrupt execution and shutdown, ignoring any subsequent
interrupts."""
while True:
try:
linkcheck.log.warn(linkcheck.LOG_CHECK,
@ -96,9 +92,7 @@ def print_active_threads (aggregate):
def abort (aggregate):
"""
Helper function to ensure a clean shutdown.
"""
"""Helper function to ensure a clean shutdown."""
while True:
try:
aggregate.abort()
@ -109,10 +103,9 @@ def abort (aggregate):
linkcheck.log.warn(linkcheck.LOG_CHECK, _("shutdown in progress"))
print_active_threads(aggregate)
def get_aggregate (config):
"""
Get an aggregator instance with given configuration.
"""
"""Get an aggregator instance with given configuration."""
urlqueue = linkcheck.cache.urlqueue.UrlQueue()
connections = linkcheck.cache.connection.ConnectionPool(wait=config["wait"])
cookies = linkcheck.cache.cookie.CookieJar()

View file

@ -14,7 +14,9 @@
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
"""Aggregate needed object instances for checker threads."""
"""
Aggregate needed object instances for checker threads.
"""
import linkcheck.log
import linkcheck.director
import logger
@ -23,9 +25,7 @@ import checker
class Aggregate (object):
"""
Store thread-safe data collections for checker threads.
"""
"""Store thread-safe data collections for checker threads."""
def __init__ (self, config, urlqueue, connections, cookies, robots_txt):
self.config = config
@ -37,9 +37,7 @@ class Aggregate (object):
self.threads = []
def start_threads (self):
"""
Spawn threads for URL checking and status printing.
"""
"""Spawn threads for URL checking and status printing."""
if self.config["status"]:
t = status.Status(self.urlqueue)
t.start()
@ -54,9 +52,7 @@ class Aggregate (object):
checker.check_url(self.urlqueue, self.logger)
def abort (self):
"""
Empty the URL queue.
"""
"""Empty the URL queue."""
self.urlqueue.do_shutdown()
try:
self.urlqueue.join(timeout=self.config["timeout"])
@ -64,12 +60,11 @@ class Aggregate (object):
linkcheck.log.warn(linkcheck.LOG_CHECK, "Abort timed out")
def remove_stopped_threads (self):
"Remove the stopped threads from the internal thread list."""
self.threads = [t for t in self.threads if t.isAlive()]
def finish (self):
"""
Wait for checker threads to finish.
"""
"""Wait for checker threads to finish."""
assert self.urlqueue.empty()
for t in self.threads:
t.stop()

View file

@ -14,15 +14,16 @@
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
"""
URL checking functions.
"""
import time
import task
import linkcheck.cache.urlqueue
def check_url (urlqueue, logger):
"""
Check URLs without threading.
"""
"""Check URLs without threading."""
while not urlqueue.empty():
url_data = urlqueue.get()
try:
@ -34,52 +35,42 @@ def check_url (urlqueue, logger):
class Checker (task.CheckedTask):
"""
URL check thread.
"""
"""URL check thread."""
def __init__ (self, urlqueue, logger):
"""
Store URL queue and logger.
"""
"""Store URL queue and logger."""
super(Checker, self).__init__()
self.urlqueue = urlqueue
self.logger = logger
self.origname = self.getName()
def run_checked (self):
"""
Check URLs in the queue.
"""
"""Check URLs in the queue."""
while True:
self.check_url()
if self.stopped():
break
def check_url (self):
"""
Try to get URL data from queue and check it.
"""
"""Try to get URL data from queue and check it."""
try:
url_data = self.urlqueue.get(timeout=0.1)
if url_data is not None:
self.check_url_data(url_data)
try:
self.check_url_data(url_data)
finally:
self.urlqueue.task_done(url_data)
self.setName(self.origname)
except linkcheck.cache.urlqueue.Empty:
time.sleep(0.1)
def check_url_data (self, url_data):
"""
Check one URL data instance.
"""
try:
if url_data.url is None:
url = ""
else:
url = url_data.url.encode("ascii", "replace")
self.setName("Check-%s" % url)
if not url_data.has_result:
url_data.check()
self.logger.log_url(url_data)
finally:
self.urlqueue.task_done(url_data)
"""Check one URL data instance."""
if url_data.url is None:
url = ""
else:
url = url_data.url.encode("ascii", "replace")
self.setName("Check-%s" % url)
if not url_data.has_result:
url_data.check()
self.logger.log_url(url_data)

View file

@ -24,21 +24,15 @@ from console import stderr
class Status (task.CheckedTask):
"""
Status thread.
"""
"""Status thread."""
def __init__ (self, urlqueue):
"""
Store urlqueue object.
"""
"""Store urlqueue object."""
super(Status, self).__init__()
self.urlqueue = urlqueue
def run_checked (self):
"""
Print periodic status messages.
"""
"""Print periodic status messages."""
self.start_time = time.time()
self.setName("Status")
while True:
@ -49,9 +43,7 @@ class Status (task.CheckedTask):
self.print_status()
def print_status (self):
"""
Print a status message.
"""
"""Print a status message."""
duration = time.time() - self.start_time
checked, in_progress, queue = self.urlqueue.status()
msg = _n("%2d URL active,", "%2d URLs active,", in_progress) % \

View file

@ -22,14 +22,10 @@ import console
class CheckedTask (linkcheck.threader.StoppableThread):
"""
Stoppable URL check task, handling error conditions while running.
"""
"""Stoppable URL check task, handling error conditions while running."""
def run (self):
"""
Handle keyboard interrupt and other errors.
"""
"""Handle keyboard interrupt and other errors."""
try:
self.run_checked()
except KeyboardInterrupt:
@ -41,7 +37,5 @@ class CheckedTask (linkcheck.threader.StoppableThread):
@linkcheck.decorators.notimplemented
def run_checked (self):
"""
Overload in subclass.
"""
"""Overload in subclass."""
pass

View file

@ -16,7 +16,8 @@
# along with this program; if not, write to the Free Software
# Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
"""
Check HTML pages for broken links.
Check HTML pages for broken links. This is the commandline
client. Run this file with the -h option to see how it's done.
"""
import sys
@ -48,7 +49,7 @@ _profile = "linkchecker.prof"
_username = None
_password = None
# main usage text
# usage texts
Usage = _("""USAGE\tlinkchecker [options] file-or-url...
""")
@ -173,34 +174,32 @@ Warnings = _(r"""IGNORE WARNINGS
The following warnings are recognized in the 'ignorewarnings' config
file entry:
""")
for tag, desc in linkcheck.checker.Warnings.items():
for tag, desc in linkcheck.checker.const.Warnings.items():
Warnings += u" o %s\n %s\n" % (tag, desc)
def encode (s, codec="iso8859-15"):
"""
Encode string with given codec for screen print.
"""
"""Encode string with given codec for screen print."""
# XXX the console encoding should be detected automatically
# according to locale settings.
return s.encode(codec, "ignore")
def print_version ():
"""
Print the program version and exit.
"""
"""Print the program version and exit."""
print encode(linkcheck.configuration.AppInfo)
sys.exit(0)
def print_usage (msg):
"""
Print a program msg text to stderr and exit.
"""
"""Print a program msg text to stderr and exit."""
print >>sys.stderr, encode(_("Error: %s") % msg)
print >>sys.stderr, encode(_("Execute 'linkchecker -h' for help"))
sys.exit(1)
def check_user ():
def ensure_notroot ():
"""Make sure LinkChecker is not run under root on POSIX systems."""
if os.name != 'posix':
return
if os.geteuid() == 0:
@ -211,9 +210,7 @@ def check_user ():
def viewprof ():
"""
Print profiling data and exit.
"""
"""Print profiling data and exit."""
if not has_pstats:
linkcheck.log.error(linkcheck.LOG_CMDLINE,
_("The `pstats' Python module is not installed,"
@ -232,10 +229,8 @@ def viewprof ():
def try_compile_re (arg):
"""
Try to compile the regular expression. On error print an error message
and exit.
"""
"""Try to compile the regular expression. On error print an error
message and exit."""
try:
return re.compile(arg)
except re.error, msg:
@ -245,6 +240,7 @@ def try_compile_re (arg):
def has_encoding (encoding):
"""Detect if Python can encode in a certain encoding."""
try:
codecs.lookup(encoding)
return True
@ -253,9 +249,7 @@ def has_encoding (encoding):
class LCHelpFormatter (optparse.IndentedHelpFormatter):
"""
Help formatter indenting paragraph-wise.
"""
"""Help formatter indenting paragraph-wise."""
def format_option (self, option):
# The help for each option consists of two parts:
@ -294,32 +288,22 @@ class LCHelpFormatter (optparse.IndentedHelpFormatter):
class LCOptionParser (optparse.OptionParser, object):
"""
Option parser with custom help text layout.
"""
"""Option parser with custom help text layout."""
def __init__ (self):
"""
Initializing using our own help formatter class.
"""
"""Initializing using our own help formatter class."""
super(LCOptionParser, self).__init__(formatter=LCHelpFormatter())
def error (self, msg):
"""
Print usage info and given message.
"""
"""Print usage info and given message."""
print_usage(msg)
def get_usage (self):
"""
Return translated usage text.
"""
"""Return translated usage text."""
return Usage
def print_help (self, file=None):
"""
Print translated help text.
"""
"""Print translated help text."""
s = u"%s\n%s\n%s\n%s\n%s\n%s\n%s\n%s\n%s" % (self.format_help(),
Examples, LoggerTypes, RegularExpressions, CookieFormat,
ProxySupport, Notes, Retval, Warnings)
@ -543,12 +527,12 @@ except linkcheck.LinkCheckerError, msg:
print_usage(str(msg))
# test if running with root privileges
if not options.allowroot:
check_user()
ensure_notroot()
# test if running with -O
if options.debug and not __debug__:
linkcheck.log.warn(linkcheck.LOG_CMDLINE,
_("Running with python -O disables debugging."))
# apply commandline options and arguments
# apply commandline options and arguments to configuration
constructauth = False
do_profile = False
if not options.priority:
@ -748,7 +732,7 @@ if do_profile:
" therefore the --profile option is disabled."))
do_profile = False
# start checking
# finally, start checking
if do_profile:
import profile
profile.run("linkcheck.director.check_urls(aggregate)", _profile)