Use relative imports from Python 2.5

git-svn-id: https://linkchecker.svn.sourceforge.net/svnroot/linkchecker/trunk/linkchecker@3750 e7d03fd6-7b0d-0410-9947-9c21f3af8025
This commit is contained in:
calvin 2008-05-09 06:16:03 +00:00
parent 4bd0d755c2
commit bacb59597e
64 changed files with 432 additions and 499 deletions

View file

@ -89,36 +89,6 @@ def get_link_pat (arg, strict=False):
}
# note: don't confuse URL loggers with application logs above
from .logger.text import TextLogger
from .logger.html import HtmlLogger
from .logger.gml import GMLLogger
from .logger.dot import DOTLogger
from .logger.sql import SQLLogger
from .logger.csvlog import CSVLogger
from .logger.blacklist import BlacklistLogger
from .logger.gxml import GraphXMLLogger
from .logger.customxml import CustomXMLLogger
from .logger.none import NoneLogger
# default link logger classes
Loggers = {
"text": TextLogger,
"html": HtmlLogger,
"gml": GMLLogger,
"dot": DOTLogger,
"sql": SQLLogger,
"csv": CSVLogger,
"blacklist": BlacklistLogger,
"gxml": GraphXMLLogger,
"xml": CustomXMLLogger,
"none": NoneLogger,
}
# for easy printing: a comma separated logger list
LoggerKeys = ", ".join(repr(name) for name in Loggers)
def init_i18n ():
"""Initialize i18n with the configured locale dir. The environment
variable LOCPATH can also specify a locale dir.

View file

@ -19,10 +19,10 @@ Cache for DNS lookups.
"""
import socket
import sys
import linkcheck.lock
from linkcheck.decorators import synchronized
from ..lock import get_lock
from ..decorators import synchronized
_lock = linkcheck.lock.get_lock("addrinfo")
_lock = get_lock("addrinfo")
addrinfos = {}
@synchronized(_lock)

View file

@ -19,12 +19,12 @@ Store and retrieve open connections.
"""
import time
import linkcheck.lock
from .. import log, LOG_CACHE
from linkcheck.decorators import synchronized
from ..decorators import synchronized
from ..lock import get_lock
_lock = linkcheck.lock.get_lock("connection")
_wait_lock = linkcheck.lock.get_lock("connwait")
_lock = get_lock("connection")
_wait_lock = get_lock("connwait")
class ConnectionPool (object):
"""Thread-safe cache, storing a set of connections for URL retrieval."""

View file

@ -17,13 +17,12 @@
"""
Store and retrieve cookies.
"""
from linkcheck.decorators import synchronized
from .. import log, LOG_CACHE
import linkcheck.lock
import linkcheck.cookies
from .. import log, LOG_CACHE, cookies
from ..decorators import synchronized
from ..lock import get_lock
_lock = linkcheck.lock.get_lock("cookie")
_lock = get_lock("cookie")
class CookieJar (object):
"""
@ -43,17 +42,17 @@ class CookieJar (object):
for h in headers.getallmatchingheaders("Set-Cookie"):
# RFC 2109 (Netscape) cookie type
try:
c = linkcheck.cookies.NetscapeCookie(h, scheme, host, path)
c = cookies.NetscapeCookie(h, scheme, host, path)
jar.add(c)
except linkcheck.cookies.CookieError:
except cookies.CookieError:
log.debug(LOG_CACHE,
"Invalid cookie header for %s:%s%s: %r", scheme, host, path, h)
for h in headers.getallmatchingheaders("Set-Cookie2"):
# RFC 2965 cookie type
try:
c = linkcheck.cookies.Rfc2965Cookie(h, scheme, host, path)
c = cookies.Rfc2965Cookie(h, scheme, host, path)
jar.add(c)
except linkcheck.cookies.CookieError:
except cookies.CookieError:
log.debug(LOG_CACHE,
"Invalid cookie2 header for %s:%s%s: %r", scheme, host, path, h)
self.cache[host] = jar

View file

@ -18,12 +18,12 @@
Store and retrieve country names for IPs.
"""
import os
import linkcheck.lock
from linkcheck.decorators import synchronized
from ..lock import get_lock
from ..decorators import synchronized
# I don't know if the geoip library is already thread-safe, but
# we take no risks here.
_lock = linkcheck.lock.get_lock("geoip")
_lock = get_lock("geoip")
# initialize GeoIP database
geoip = None

View file

@ -17,15 +17,13 @@
"""
Cache robots.txt contents.
"""
from linkcheck.decorators import synchronized
import linkcheck.robotparser2
import linkcheck.configuration
import linkcheck.lock
import linkcheck.url
from .. import robotparser2, configuration, url as urlutil
from ..decorators import synchronized
from ..lock import get_lock
# lock for caching
_lock = linkcheck.lock.get_lock("robots.txt")
_lock = get_lock("robots.txt")
class RobotsTxt (object):
@ -43,17 +41,16 @@ class RobotsTxt (object):
Ask robots.txt allowance.
"""
if roboturl not in self.cache:
rp = linkcheck.robotparser2.RobotFileParser(
user=user, password=password)
rp = robotparser2.RobotFileParser(user=user, password=password)
rp.set_url(roboturl)
rp.read()
if callback is not None:
parts = linkcheck.url.url_split(rp.url)
parts = urlutil.url_split(rp.url)
host = "%s:%d" % (parts[1], parts[2])
useragent = linkcheck.configuration.UserAgent
useragent = configuration.UserAgent
wait = rp.get_crawldelay(useragent)
callback(host, wait)
self.cache[roboturl] = rp
else:
rp = self.cache[roboturl]
return rp.can_fetch(linkcheck.configuration.UserAgent, url)
return rp.can_fetch(configuration.UserAgent, url)

View file

@ -22,21 +22,7 @@ import os
import cgi
import logging
import urllib
import linkcheck.httplib2
import linkcheck.dns.exception
from linkcheck.strformat import unicode_safe
from linkcheck.url import url_is_absolute
# all the URL classes
import linkcheck.checker.fileurl
import linkcheck.checker.unknownurl
import linkcheck.checker.ftpurl
import linkcheck.checker.gopherurl
import linkcheck.checker.httpurl
import linkcheck.checker.httpsurl
import linkcheck.checker.mailtourl
import linkcheck.checker.telneturl
import linkcheck.checker.nntpurl
from .. import strformat, url as urlutil
def absolute_url (base_url, base_ref, parent_url):
@ -51,11 +37,11 @@ def absolute_url (base_url, base_ref, parent_url):
@param parent_url: url of parent document
@type parent_url: string or None
"""
if base_url and url_is_absolute(base_url):
if base_url and urlutil.url_is_absolute(base_url):
return base_url
elif base_ref and url_is_absolute(base_ref):
elif base_ref and urlutil.url_is_absolute(base_ref):
return base_ref
elif parent_url and url_is_absolute(parent_url):
elif parent_url and urlutil.url_is_absolute(parent_url):
return parent_url
return u""
@ -71,7 +57,7 @@ def get_url_from (base_url, recursion_level, aggregate,
@param recursion_level: current recursion level
@type recursion_level: number
@param aggregate: aggregate object
@type aggregate: linkcheck.checker.aggregate.Consumer
@type aggregate: aggregate.Consumer
@param parent_url: parent url
@type parent_url: string or None
@param base_ref: base url from <base> tag
@ -84,12 +70,12 @@ def get_url_from (base_url, recursion_level, aggregate,
@type name: string
"""
if base_url is not None:
base_url = unicode_safe(base_url)
base_url = strformat.unicode_safe(base_url)
if parent_url is not None:
parent_url = unicode_safe(parent_url)
parent_url = strformat.unicode_safe(parent_url)
if base_ref is not None:
base_ref = unicode_safe(base_ref)
name = unicode_safe(name)
base_ref = strformat.unicode_safe(base_ref)
name = strformat.unicode_safe(name)
url = absolute_url(base_url, base_ref, parent_url).lower()
klass = get_urlclass_from(url)
return klass(base_url, recursion_level, aggregate,
@ -100,27 +86,27 @@ def get_url_from (base_url, recursion_level, aggregate,
def get_urlclass_from (url):
"""Return checker class for given URL."""
if url.startswith("http:"):
klass = linkcheck.checker.httpurl.HttpUrl
klass = httpurl.HttpUrl
elif url.startswith("ftp:"):
klass = linkcheck.checker.ftpurl.FtpUrl
klass = ftpurl.FtpUrl
elif url.startswith("file:"):
klass = linkcheck.checker.fileurl.FileUrl
klass = fileurl.FileUrl
elif url.startswith("telnet:"):
klass = linkcheck.checker.telneturl.TelnetUrl
klass = telneturl.TelnetUrl
elif url.startswith("mailto:"):
klass = linkcheck.checker.mailtourl.MailtoUrl
klass = mailtourl.MailtoUrl
elif url.startswith("gopher:"):
klass = linkcheck.checker.gopherurl.GopherUrl
klass = gopherurl.GopherUrl
elif url.startswith("https:"):
klass = linkcheck.checker.httpsurl.HttpsUrl
klass = httpsurl.HttpsUrl
elif url.startswith(("nntp:", "news:", "snews:")):
klass = linkcheck.checker.nntpurl.NntpUrl
elif linkcheck.checker.unknownurl.is_unknown_url(url):
klass = nntpurl.NntpUrl
elif unknownurl.is_unknown_url(url):
# unknown url
klass = linkcheck.checker.unknownurl.UnknownUrl
klass = unknownurl.UnknownUrl
else:
# assume local file
klass = linkcheck.checker.fileurl.FileUrl
klass = fileurl.FileUrl
return klass
@ -154,3 +140,8 @@ class StoringHandler (logging.Handler):
if len(self.storage) >= self.maxrecords:
self.storage.pop()
self.storage.append(record)
# all the URL classes
from . import (fileurl, unknownurl, ftpurl, gopherurl, httpurl,
httpsurl, mailtourl, telneturl, nntpurl)

View file

@ -22,11 +22,12 @@ import select
import nntplib
import ftplib
import re
import linkcheck
from .. import LinkCheckerError, httplib2 as httplib
from ..dns.exception import DNSException
# Catch these exception on syntax checks.
ExcSyntaxList = [
linkcheck.LinkCheckerError,
LinkCheckerError,
]
# Catch these exceptions on content and connect checks. All other
@ -34,8 +35,8 @@ ExcSyntaxList = [
ExcCacheList = [
IOError,
OSError, # OSError is thrown on Windows when a file is not found
linkcheck.LinkCheckerError,
linkcheck.dns.exception.DNSException,
LinkCheckerError,
DNSException,
socket.error,
select.error,
# nttp errors (including EOFError)
@ -45,7 +46,7 @@ ExcCacheList = [
nntplib.error_proto,
EOFError,
# http error
linkcheck.httplib2.error,
httplib.error,
# ftp errors
ftplib.error_reply,
ftplib.error_temp,

View file

@ -25,11 +25,9 @@ import urlparse
import urllib
import urllib2
import urlbase
from .. import log, LOG_CHECK
import linkcheck.checker
import linkcheck.fileutil
from const import WARN_FILE_MISSING_SLASH, WARN_FILE_SYSTEM_PATH, \
from . import urlbase, get_index_html, absolute_url
from .. import log, LOG_CHECK, fileutil, strformat, url as urlutil
from .const import WARN_FILE_MISSING_SLASH, WARN_FILE_SYSTEM_PATH, \
PARSE_EXTENSIONS, PARSE_CONTENTS
@ -100,7 +98,7 @@ class FileUrl (urlbase.UrlBase):
base_url = re.sub("^file://(/?)([a-zA-Z]):", r"file:///\2|", base_url)
# norm base url again after changing
if self.base_url != base_url:
base_url, is_idn = linkcheck.checker.urlbase.url_norm(base_url)
base_url, is_idn = urlbase.url_norm(base_url)
if is_idn:
pass # XXX warn about idn use
self.base_url = unicode(base_url)
@ -126,7 +124,7 @@ class FileUrl (urlbase.UrlBase):
if self.is_directory():
self.set_result(_("directory"))
else:
url = linkcheck.fileutil.pathencode(self.url)
url = fileutil.pathencode(self.url)
self.url_connection = urllib2.urlopen(url)
self.check_case_sensitivity()
@ -170,7 +168,7 @@ class FileUrl (urlbase.UrlBase):
"""
t = time.time()
files = get_files(self.get_os_filename())
data = linkcheck.checker.get_index_html(files)
data = get_index_html(files)
self.data = data.encode("iso8859-1", "ignore")
self.dltime = time.time() - t
self.dlsize = len(self.data)
@ -211,7 +209,7 @@ class FileUrl (urlbase.UrlBase):
path = self.urlparts[2]
if os.name == 'nt':
path = prepare_urlpath_for_nt(path)
return linkcheck.fileutil.pathencode(urllib.url2pathname(path))
return fileutil.pathencode(urllib.url2pathname(path))
def is_directory (self):
"""
@ -268,12 +266,11 @@ class FileUrl (urlbase.UrlBase):
@return non-empty regex pattern or None
@rtype String or None
"""
absolute = linkcheck.checker.absolute_url
url = absolute(self.base_url, self.base_ref, self.parent_url)
url = absolute_url(self.base_url, self.base_ref, self.parent_url)
if not url:
return None
parts = linkcheck.strformat.url_unicode_split(url)
path, params = linkcheck.url.splitparams(parts[2])
parts = strformat.url_unicode_split(url)
path, params = urlutil.splitparams(parts[2])
segments = path.split('/')
if not self.is_directory():
# cut off filename to have a directory

View file

@ -21,15 +21,12 @@ Handle FTP links.
import ftplib
import time
import urllib
import cStringIO as StringIO
from cStringIO import StringIO
from .. import log, LOG_CHECK
import linkcheck
import proxysupport
import httpurl
import internpaturl
import linkcheck.ftpparse._ftpparse as ftpparse
from const import WARN_FTP_MISSING_SLASH, PARSE_EXTENSIONS
from .. import log, LOG_CHECK, LinkCheckerError
from ..ftpparse import _ftpparse as ftpparse
from . import proxysupport, httpurl, internpaturl, get_index_html
from .const import WARN_FTP_MISSING_SLASH, PARSE_EXTENSIONS
DEFAULT_TIMEOUT_SECS = 300
@ -109,13 +106,13 @@ class FtpUrl (internpaturl.InternPatternUrl, proxysupport.ProxySupport):
self.url_connection.login(_user, _password)
except EOFError, msg:
msg = str(msg)
raise linkcheck.LinkCheckerError(
raise LinkCheckerError(
_("Remote host has closed connection: %r") % msg)
if not self.url_connection.getwelcome():
raise linkcheck.LinkCheckerError(
raise LinkCheckerError(
_("Got no answer from FTP server"))
# don't set info anymore, this may change every time we log in
#self.add_info(linkcheck.strformat.unicode_safe(info))
#self.add_info(strformat.unicode_safe(info))
def cwd (self):
"""
@ -227,11 +224,11 @@ class FtpUrl (internpaturl.InternPatternUrl, proxysupport.ProxySupport):
if self.is_directory():
self.url_connection.cwd(self.filename)
self.files = self.get_files()
self.data = linkcheck.checker.get_index_html(self.files)
self.data = get_index_html(self.files)
else:
# download file in BINARY mode
ftpcmd = "RETR %s" % self.filename
buf = StringIO.StringIO()
buf = StringIO()
def stor_data (s):
"""Helper method storing given data"""
buf.write(s)

View file

@ -18,7 +18,7 @@
Handle Gopher links.
"""
import urlbase
from . import urlbase
class GopherUrl (urlbase.UrlBase):
"""

View file

@ -18,7 +18,7 @@
Handle https links.
"""
import httpurl
from . import httpurl
class HttpsUrl (httpurl.HttpUrl):
"""

View file

@ -24,29 +24,24 @@ import time
import re
import zlib
import socket
import cStringIO as StringIO
from cStringIO import StringIO
import Cookie
from .. import log, LOG_CHECK
import linkcheck.url
import linkcheck.strformat
import linkcheck.robotparser2
import linkcheck.httplib2
import httpheaders as headers
import internpaturl
import proxysupport
from linkcheck import gzip2 as gzip
from .. import (log, LOG_CHECK, gzip2 as gzip, strformat, url as urlutil,
httplib2 as httplib, LinkCheckerError, configuration)
from . import (internpaturl, proxysupport, httpheaders as headers, urlbase,
get_url_from)
# import warnings
from const import WARN_HTTP_ROBOTS_DENIED, WARN_HTTP_NO_ANCHOR_SUPPORT, \
from .const import WARN_HTTP_ROBOTS_DENIED, WARN_HTTP_NO_ANCHOR_SUPPORT, \
WARN_HTTP_WRONG_REDIRECT, WARN_HTTP_MOVED_PERMANENT, \
WARN_HTTP_EMPTY_CONTENT, WARN_HTTP_COOKIE_STORE_ERROR, \
WARN_HTTP_DECOMPRESS_ERROR, WARN_HTTP_UNSUPPORTED_ENCODING, \
PARSE_MIMETYPES
# helper alias
unicode_safe = linkcheck.strformat.unicode_safe
unicode_safe = strformat.unicode_safe
supportHttps = hasattr(linkcheck.httplib2, "HTTPSConnection") and \
supportHttps = hasattr(httplib, "HTTPSConnection") and \
hasattr(socket, "ssl")
_supported_encodings = ('gzip', 'x-gzip', 'deflate')
@ -220,7 +215,7 @@ Use URL %s instead for checking."""), self.url, newurl)
response.close()
try:
response = self._get_http_response()
except linkcheck.httplib2.BadStatusLine:
except httplib.BadStatusLine:
# some servers send empty HEAD replies
if self.method == "HEAD":
self.method = "GET"
@ -250,7 +245,7 @@ Use URL %s instead for checking."""), self.url, newurl)
self.proxy, self.proxyauth = oldproxy
try:
tries, response = self.follow_redirections(response)
except linkcheck.httplib2.BadStatusLine:
except httplib.BadStatusLine:
# some servers send empty HEAD replies
if self.method == "HEAD":
self.method = "GET"
@ -323,11 +318,11 @@ Use URL %s instead for checking."""), self.url, newurl)
log.debug(LOG_CHECK, "Redirected to %r", newurl)
self.add_info(_("Redirected to %(url)s.") % {'url': newurl})
# norm base url - can raise UnicodeError from url.idna_encode()
redirected, is_idn = linkcheck.checker.urlbase.url_norm(newurl)
redirected, is_idn = urlbase.url_norm(newurl)
if is_idn:
pass # XXX warn about idn use
log.debug(LOG_CHECK, "Norm redirected to %r", redirected)
urlparts = linkcheck.strformat.url_unicode_split(redirected)
urlparts = strformat.url_unicode_split(redirected)
# check extern filter again
self.set_extern(redirected)
if self.extern[0] and self.extern[0]:
@ -382,7 +377,7 @@ Use URL %s instead for checking."""), self.url, newurl)
_("Redirection to different URL type encountered; "
"the original URL was %r.") % self.url,
tag=WARN_HTTP_WRONG_REDIRECT)
newobj = linkcheck.checker.get_url_from(
newobj = get_url_from(
redirected, self.recursion_level, self.aggregate,
parent_url=self.parent_url, base_ref=self.base_ref,
line=self.line, column=self.column, name=self.name)
@ -481,14 +476,13 @@ Use URL %s instead for checking."""), self.url, newurl)
if (self.parent_url and
self.parent_url.startswith(('http://', 'https://'))):
self.url_connection.putheader("Referer", self.parent_url)
self.url_connection.putheader("User-Agent",
linkcheck.configuration.UserAgent)
self.url_connection.putheader("User-Agent", configuration.UserAgent)
self.url_connection.putheader("Accept-Encoding",
"gzip;q=1.0, deflate;q=0.9, identity;q=0.5")
if self.aggregate.config['sendcookies']:
scheme = self.urlparts[0]
host = self.urlparts[1]
port = linkcheck.url.default_ports.get(scheme, 80)
port = urlutil.default_ports.get(scheme, 80)
host, port = urllib.splitnport(host, port)
path = self.urlparts[2]
self.cookies = self.aggregate.cookies.get(scheme, host, port, path)
@ -536,12 +530,12 @@ Use URL %s instead for checking."""), self.url, newurl)
return conn
self.aggregate.connections.wait_for_host(host)
if scheme == "http":
h = linkcheck.httplib2.HTTPConnection(host)
h = httplib.HTTPConnection(host)
elif scheme == "https" and supportHttps:
h = linkcheck.httplib2.HTTPSConnection(host)
h = httplib.HTTPSConnection(host)
else:
msg = _("Unsupported HTTP url scheme %r") % scheme
raise linkcheck.LinkCheckerError(msg)
raise LinkCheckerError(msg)
if log.is_debug(LOG_CHECK):
h.set_debuglevel(1)
h.connect()
@ -572,14 +566,14 @@ Use URL %s instead for checking."""), self.url, newurl)
if encoding in _supported_encodings:
try:
if encoding == 'deflate':
f = StringIO.StringIO(zlib.decompress(data))
f = StringIO(zlib.decompress(data))
else:
f = gzip.GzipFile('', 'rb', 9, StringIO.StringIO(data))
f = gzip.GzipFile('', 'rb', 9, StringIO(data))
except zlib.error, msg:
self.add_warning(_("Decompress error %(err)s") %
{"err": str(msg)},
tag=WARN_HTTP_DECOMPRESS_ERROR)
f = StringIO.StringIO(data)
f = StringIO(data)
data = f.read()
if self.data is None and self.method == "GET" and \
response.status not in [301, 302]:

View file

@ -18,8 +18,8 @@
Intern URL pattern support.
"""
import re
import urlbase
import linkcheck.checker
from . import urlbase, absolute_url
from .. import strformat, url as urlutil
class InternPatternUrl (urlbase.UrlBase):
"""
@ -33,19 +33,18 @@ class InternPatternUrl (urlbase.UrlBase):
@return non-empty regex pattern or None
@rtype String or None
"""
absolute = linkcheck.checker.absolute_url
url = absolute(self.base_url, self.base_ref, self.parent_url)
url = absolute_url(self.base_url, self.base_ref, self.parent_url)
if not url:
return None
parts = linkcheck.strformat.url_unicode_split(url)
parts = strformat.url_unicode_split(url)
scheme = parts[0]
domain = parts[1]
domain, is_idn = linkcheck.url.idna_encode(domain)
domain, is_idn = urlutil.idna_encode(domain)
if is_idn:
pass # XXX warn about idn use
if not (domain and scheme):
return None
path = linkcheck.url.splitparams(parts[2])[0]
path = urlutil.splitparams(parts[2])[0]
segments = path.split('/')[:-1]
path = "/".join(segments)
if url.endswith('/'):

View file

@ -23,11 +23,10 @@ import urllib
import smtplib
import email.Utils
import urlbase
from .. import log, LOG_CHECK
import linkcheck.strformat
import linkcheck.dns.resolver
from const import WARN_MAIL_NO_ADDRESSES, WARN_MAIL_NO_MX_HOST, \
from . import urlbase
from .. import log, LOG_CHECK, strformat, LinkCheckerError, url as urlutil
from ..dns import resolver
from .const import WARN_MAIL_NO_ADDRESSES, WARN_MAIL_NO_MX_HOST, \
WARN_MAIL_UNVERIFIED_ADDRESS, WARN_MAIL_NO_CONNECTION
@ -49,7 +48,7 @@ def _split_address (address):
return tuple(split)
if len(split) == 1:
return (split[0], "localhost")
raise linkcheck.LinkCheckerError(_("Could not split the mail address"))
raise LinkCheckerError(_("Could not split the mail address"))
class MailtoUrl (urlbase.UrlBase):
@ -73,8 +72,8 @@ class MailtoUrl (urlbase.UrlBase):
# check syntax of emails
for name, addr in self.addresses:
username, domain = _split_address(addr)
if not linkcheck.url.is_safe_domain(domain):
raise linkcheck.LinkCheckerError(_("Invalid mail syntax"))
if not urlutil.is_safe_domain(domain):
raise LinkCheckerError(_("Invalid mail syntax"))
log.debug(LOG_CHECK, "addresses: %s", self.addresses)
def cutout_addresses (self):
@ -148,18 +147,18 @@ class MailtoUrl (urlbase.UrlBase):
Check a single mail address.
"""
log.debug(LOG_CHECK, "checking mail address %r", mail)
mail = linkcheck.strformat.ascii_safe(mail)
mail = strformat.ascii_safe(mail)
username, domain = _split_address(mail)
log.debug(LOG_CHECK, "looking up MX mailhost %r", domain)
try:
answers = linkcheck.dns.resolver.query(domain, 'MX')
except linkcheck.dns.resolver.NoAnswer:
answers = resolver.query(domain, 'MX')
except resolver.NoAnswer:
answers = []
if len(answers) == 0:
self.add_warning(_("No MX mail host for %(domain)s found.") %
{'domain': domain},
tag=WARN_MAIL_NO_MX_HOST)
answers = linkcheck.dns.resolver.query(domain, 'A')
answers = resolver.query(domain, 'A')
if len(answers) == 0:
self.set_result(_("No host for %(domain)s found.") %
{'domain': domain}, valid=False)

View file

@ -24,10 +24,9 @@ import sys
import nntplib
import random
import linkcheck
import urlbase
from const import WARN_NNTP_NO_SERVER, WARN_NNTP_NO_NEWSGROUP, \
WARN_NNTP_BUSY
from . import urlbase
from .. import LinkCheckerError
from .const import WARN_NNTP_NO_SERVER, WARN_NNTP_NO_NEWSGROUP, WARN_NNTP_BUSY
random.seed()
@ -86,7 +85,7 @@ class NntpUrl (urlbase.UrlBase):
else:
raise
if nntp is None:
raise linkcheck.LinkCheckerError(
raise LinkCheckerError(
_("NNTP server too busy; tried more than %d times.") % tries)
if value is not None:
self.add_warning(_("NNTP busy: %s.") % str(value),

View file

@ -17,9 +17,9 @@
"""
Mixin class for URLs that can be fetched over a proxy.
"""
import urllib
import linkcheck
from .. import LinkCheckerError
class ProxySupport (object):
"""
@ -40,7 +40,7 @@ class ProxySupport (object):
# Note that invalid proxies might raise TypeError in urllib2,
# so make sure to stop checking at this point, not later.
msg = _("Proxy value %r must start with 'http://'.") % self.proxy
raise linkcheck.LinkCheckerError(msg)
raise LinkCheckerError(msg)
self.proxy = urllib.splittype(self.proxy)[1]
self.proxy = urllib.splithost(self.proxy)[0]
self.proxyauth, self.proxy = urllib.splituser(self.proxy)

View file

@ -20,8 +20,7 @@ Handle telnet: links.
import telnetlib
import urllib
import urlbase
from . import urlbase
from .. import log, LOG_CHECK

View file

@ -118,7 +118,7 @@ def add_fileoutput_config (config):
devnull = 'NUL'
else:
return
for ftype in linkcheck.Loggers.keys():
for ftype in linkcheck.logger.Loggers.keys():
if ftype in ('test', 'blacklist'):
continue
logger = config.logger_new(ftype, fileoutput=1, filename=devnull)

View file

@ -19,8 +19,8 @@ Handle uncheckable URLs.
"""
import re
import urlbase
from const import WARN_IGNORE_URL
from . import urlbase
from .const import WARN_IGNORE_URL
ignored_schemes = r"""^(
acap # application configuration access protocol

View file

@ -17,7 +17,6 @@
"""
Base URL handler.
"""
import sys
import os
import logging
@ -30,21 +29,19 @@ import socket
import select
import traceback
import linkcheck.linkparse
import linkcheck.checker
import linkcheck.cache.geoip
import linkcheck.strformat
import linkcheck.containers
from .. import log, LOG_CHECK, LOG_CACHE, httputil
from .. import httplib2 as httplib
import linkcheck.HtmlParser.htmlsax
from const import WARN_URL_EFFECTIVE_URL, WARN_URL_UNICODE_DOMAIN, \
WARN_URL_UNNORMED, WARN_URL_ERROR_GETTING_CONTENT, \
WARN_URL_ANCHOR_NOT_FOUND, WARN_URL_WARNREGEX_FOUND, \
WARN_URL_CONTENT_TOO_LARGE
from . import absolute_url, StoringHandler, get_url_from
from ..cache import geoip
from .. import (log, LOG_CHECK, LOG_CACHE, httputil, httplib2 as httplib,
strformat, linkparse, containers, LinkCheckerError, url as urlutil,
trace)
from ..HtmlParser import htmlsax
from .const import (WARN_URL_EFFECTIVE_URL, WARN_URL_UNICODE_DOMAIN,
WARN_URL_UNNORMED, WARN_URL_ERROR_GETTING_CONTENT,
WARN_URL_ANCHOR_NOT_FOUND, WARN_URL_WARNREGEX_FOUND,
WARN_URL_CONTENT_TOO_LARGE, ExcList, ExcSyntaxList, ExcNoCacheList)
# helper alias
unicode_safe = linkcheck.strformat.unicode_safe
unicode_safe = strformat.unicode_safe
def urljoin (parent, url, scheme):
"""
@ -62,10 +59,10 @@ def url_norm (url):
Wrapper for url.url_norm() to convert UnicodeError in LinkCheckerError.
"""
try:
return linkcheck.url.url_norm(url)
return urlutil.url_norm(url)
except UnicodeError:
msg = _("URL has unparsable domain name: %s") % sys.exc_info()[1]
raise linkcheck.LinkCheckerError(msg)
raise LinkCheckerError(msg)
class UrlBase (object):
@ -107,12 +104,12 @@ class UrlBase (object):
self.column = column
self.name = name
if self.base_ref:
assert not linkcheck.url.url_needs_quoting(self.base_ref), \
assert not urlutil.url_needs_quoting(self.base_ref), \
"unquoted base reference URL %r" % self.base_ref
if self.parent_url:
assert not linkcheck.url.url_needs_quoting(self.parent_url), \
assert not urlutil.url_needs_quoting(self.parent_url), \
"unquoted parent URL %r" % self.parent_url
url = linkcheck.checker.absolute_url(base_url, base_ref, parent_url)
url = absolute_url(base_url, base_ref, parent_url)
# assume file link if no scheme is found
self.scheme = url.split(":", 1)[0] or "file"
# warn if URL is redirected (for commandline client)
@ -139,9 +136,9 @@ class UrlBase (object):
# valid or not
self.valid = True
# list of warnings (without duplicates)
self.warnings = linkcheck.containers.SetList()
self.warnings = containers.SetList()
# list of infos (without duplicates)
self.info = linkcheck.containers.SetList()
self.info = containers.SetList()
# download time
self.dltime = -1
# download size
@ -295,7 +292,7 @@ class UrlBase (object):
self.add_warning(_("Effective URL %r.") % effectiveurl,
tag=WARN_URL_EFFECTIVE_URL)
self.url = effectiveurl
except tuple(linkcheck.checker.const.ExcSyntaxList), msg:
except tuple(ExcSyntaxList), msg:
self.set_result(unicode_safe(msg), valid=False)
return
self.set_cache_keys()
@ -336,10 +333,10 @@ class UrlBase (object):
# note: urljoin can unnorm the url path, so norm it again
urlparts = list(urlparse.urlsplit(self.url))
if urlparts[2]:
urlparts[2] = linkcheck.url.collapse_segments(urlparts[2])
urlparts[2] = urlutil.collapse_segments(urlparts[2])
self.url = urlparse.urlunsplit(urlparts)
# split into (modifiable) list
self.urlparts = linkcheck.strformat.url_unicode_split(self.url)
self.urlparts = strformat.url_unicode_split(self.url)
# and unsplit again
self.url = urlparse.urlunsplit(self.urlparts)
# check userinfo@host:port syntax
@ -353,15 +350,15 @@ class UrlBase (object):
self.anchor = self.urlparts[4]
self.host, self.port = urllib.splitport(host)
if self.port is not None:
if not linkcheck.url.is_numeric_port(self.port):
raise linkcheck.LinkCheckerError(
if not urlutil.is_numeric_port(self.port):
raise LinkCheckerError(
_("URL has invalid port %r") % str(self.port))
self.port = int(self.port)
def check (self):
"""Main check function for checking this URL."""
if self.aggregate.config["trace"]:
linkcheck.trace.trace_on()
trace.trace_on()
try:
self.local_check()
except (socket.error, select.error):
@ -380,7 +377,7 @@ class UrlBase (object):
"""
Try to ask GeoIP database for country info.
"""
country = linkcheck.cache.geoip.get_country(self.host)
country = geoip.get_country(self.host)
if country is not None:
self.add_info(_("URL is located in %s.") % _(country))
@ -401,7 +398,7 @@ class UrlBase (object):
self.add_country_info()
if self.aggregate.config["anchors"]:
self.check_anchors()
except tuple(linkcheck.checker.const.ExcList):
except tuple(ExcList):
value = self.handle_exception()
# make nicer error msg for unknown hosts
if isinstance(value, socket.error) and value[0] == -2:
@ -417,7 +414,7 @@ class UrlBase (object):
log.debug(LOG_CHECK, "checking content")
try:
self.check_content(warningregex)
except tuple(linkcheck.checker.const.ExcList):
except tuple(ExcList):
value = self.handle_exception()
self.set_result(unicode_safe(value), valid=False)
# check HTML/CSS syntax
@ -437,7 +434,7 @@ class UrlBase (object):
self.parse_url()
# check content size
self.check_size()
except tuple(linkcheck.checker.const.ExcList):
except tuple(ExcList):
value = self.handle_exception()
self.add_warning(_("could not get content: %r") % str(value),
tag=WARN_URL_ERROR_GETTING_CONTENT)
@ -463,7 +460,7 @@ class UrlBase (object):
etype, value, tb = sys.exc_info()
log.debug(LOG_CHECK, "exception %s", traceback.format_tb(tb))
# note: etype must be the exact class, not a subclass
if (etype in linkcheck.checker.const.ExcNoCacheList) or \
if (etype in ExcNoCacheList) or \
(etype == socket.error and value[0]==errno.EBADF) or \
not value:
# EBADF occurs when operating on an already socket
@ -473,7 +470,7 @@ class UrlBase (object):
# use Exception class name
errmsg += ": %s" % str(value)
# limit length to 240
return linkcheck.strformat.limit(errmsg, length=240)
return strformat.limit(errmsg, length=240)
def check_connection (self):
"""
@ -520,8 +517,8 @@ class UrlBase (object):
if not (self.is_http() or self.is_file()):
return True
# construct parser object
handler = linkcheck.linkparse.MetaRobotsFinder()
parser = linkcheck.HtmlParser.htmlsax.parser(handler)
handler = linkparse.MetaRobotsFinder()
parser = htmlsax.parser(handler)
handler.parser = parser
# parse
parser.feed(self.get_content())
@ -541,9 +538,9 @@ class UrlBase (object):
# do not bother
return
log.debug(LOG_CHECK, "checking anchor %r", self.anchor)
handler = linkcheck.linkparse.LinkFinder(self.get_content(),
handler = linkparse.LinkFinder(self.get_content(),
tags={'a': [u'name'], None: [u'id']})
parser = linkcheck.HtmlParser.htmlsax.parser(handler)
parser = htmlsax.parser(handler)
handler.parser = parser
# parse
parser.feed(self.get_content())
@ -620,8 +617,8 @@ class UrlBase (object):
if maxbytes is not None and self.dlsize >= maxbytes:
self.add_warning(
_("Content size %(dlsize)s is larger than %(maxbytes)s.") %
{"dlsize": linkcheck.strformat.strsize(self.dlsize),
"maxbytes": linkcheck.strformat.strsize(maxbytes)},
{"dlsize": strformat.strsize(self.dlsize),
"maxbytes": strformat.strsize(maxbytes)},
tag=WARN_URL_CONTENT_TOO_LARGE)
def check_html (self):
@ -664,7 +661,7 @@ class UrlBase (object):
csslog = logging.getLogger('cssutils')
csslog.propagate = 0
del csslog.handlers[:]
handler = linkcheck.checker.StoringHandler()
handler = StoringHandler()
csslog.addHandler(handler)
csslog.setLevel(logging.WARN)
cssparser = cssutils.CSSParser(log=csslog)
@ -770,8 +767,8 @@ class UrlBase (object):
"""
log.debug(LOG_CHECK, "Parsing HTML %s", self)
# construct parser object
handler = linkcheck.linkparse.LinkFinder(self.get_content())
parser = linkcheck.HtmlParser.htmlsax.parser(handler)
handler = linkparse.LinkFinder(self.get_content())
parser = htmlsax.parser(handler)
handler.parser = parser
# parse
parser.feed(self.get_content())
@ -784,8 +781,8 @@ class UrlBase (object):
base_ref = codebase
else:
base_ref = handler.base_ref
base_ref = linkcheck.url.url_norm(base_ref)[0]
url_data = linkcheck.checker.get_url_from(url,
base_ref = urlutil.url_norm(base_ref)[0]
url_data = get_url_from(url,
self.recursion_level+1, self.aggregate, parent_url=self.url,
base_ref=base_ref, line=line, column=column, name=name)
self.aggregate.urlqueue.put(url_data)
@ -805,9 +802,9 @@ class UrlBase (object):
elif line.startswith("URL="):
url = line[4:]
if url:
url_data = linkcheck.checker.get_url_from(url,
self.recursion_level+1, self.aggregate,
parent_url=self.url, line=lineno, name=name)
url_data = get_url_from(url, self.recursion_level+1,
self.aggregate, parent_url=self.url,
line=lineno, name=name)
self.aggregate.urlqueue.put(url_data)
name = ""
@ -823,7 +820,7 @@ class UrlBase (object):
line = line.strip()
if not line or line.startswith('#'):
continue
url_data = linkcheck.checker.get_url_from(line,
url_data = get_url_from(line,
self.recursion_level+1, self.aggregate,
parent_url=self.url, line=lineno)
self.aggregate.urlqueue.put(url_data)
@ -834,24 +831,24 @@ class UrlBase (object):
"""
log.debug(LOG_CHECK, "Parsing CSS %s", self)
lineno = 0
linkfinder = linkcheck.linkparse.css_url_re.finditer
strip_comments = linkcheck.linkparse.strip_c_comments
linkfinder = linkparse.css_url_re.finditer
strip_comments = linkparse.strip_c_comments
for line in strip_comments(self.get_content()).splitlines():
lineno += 1
for mo in linkfinder(line):
column = mo.start("url")
url = linkcheck.strformat.unquote(mo.group("url").strip())
url_data = linkcheck.checker.get_url_from(url,
url = strformat.unquote(mo.group("url").strip())
url_data = get_url_from(url,
self.recursion_level+1, self.aggregate,
parent_url=self.url, line=lineno, column=column)
self.aggregate.urlqueue.put(url_data)
def parse_swf (self):
"""Parse a SWF file for URLs."""
linkfinder = linkcheck.linkparse.swf_url_re.finditer
linkfinder = linkparse.swf_url_re.finditer
for mo in linkfinder(self.get_content()):
url = mo.group()
url_data = linkcheck.checker.get_url_from(url,
url_data = get_url_from(url,
self.recursion_level+1, self.aggregate,
parent_url=self.url)
self.aggregate.urlqueue.put(url_data)

View file

@ -23,9 +23,8 @@ import os
import logging.config
import urllib
import _linkchecker_configdata
from .. import log, LOG_CHECK, LOG
import linkcheck.containers
import confparse
from .. import log, LOG_CHECK, LOG, ansicolor, lognames
from . import confparse
Version = _linkchecker_configdata.version
AppName = u"LinkChecker"
@ -162,7 +161,7 @@ class Configuration (dict):
config_dir = _linkchecker_configdata.config_dir
filename = normpath(os.path.join(config_dir, "logging.conf"))
logging.config.fileConfig(filename)
handler = linkcheck.ansicolor.ColoredStreamHandler(strm=sys.stderr)
handler = ansicolor.ColoredStreamHandler(strm=sys.stderr)
handler.setFormatter(logging.Formatter("%(levelname)s %(message)s"))
logging.getLogger(LOG).addHandler(handler)
self.set_debug(debug)
@ -174,12 +173,12 @@ class Configuration (dict):
return
# set debugging on given logger names
if 'all' in debug:
debug = linkcheck.lognames.keys()
debug = lognames.keys()
# disable threading if no thread debugging
if "thread" not in debug:
self['threads'] = 0
for name in debug:
logname = linkcheck.lognames[name]
logname = lognames[name]
logging.getLogger(logname).setLevel(logging.DEBUG)
def logger_new (self, loggertype, **kwargs):
@ -189,7 +188,8 @@ class Configuration (dict):
args = {}
args.update(self[loggertype])
args.update(kwargs)
return linkcheck.Loggers[loggertype](**args)
from ..logger import Loggers
return Loggers[loggertype](**args)
def logger_add (self, loggertype, loggerclass, loggerargs=None):
"""
@ -197,7 +197,8 @@ class Configuration (dict):
"""
if loggerargs is None:
loggerargs = {}
linkcheck.Loggers[loggertype] = loggerclass
from ..logger import Loggers
Loggers[loggertype] = loggerclass
self[loggertype] = loggerargs
def read (self, files=None):
@ -229,8 +230,8 @@ class Configuration (dict):
if self["anchors"]:
if not self["warnings"]:
self["warnings"] = True
self["ignorewarnings"] = linkcheck.checker.Warnings.keys()
from ..checker import Warnings
self["ignorewarnings"] = Warnings.keys()
if 'url-anchor-not-found' in self["ignorewarnings"]:
self["ignorewarnings"].remove('url-anchor-not-found')
self['logger'] = self.logger_new(self['output'])

View file

@ -18,8 +18,7 @@
import ConfigParser
import re
import linkcheck
from .. import log, LOG_CHECK
from .. import log, LOG_CHECK, LinkCheckerError, get_link_pat
def read_multiline (value):
@ -53,13 +52,14 @@ class LCConfigParser (ConfigParser.RawConfigParser, object):
self.read_authentication_config()
self.read_filtering_config()
except Exception, msg:
raise linkcheck.LinkCheckerError(
raise LinkCheckerError(
"Error parsing configuration: %s", str(msg))
def read_output_config (self):
"""Read configuration options in section "output"."""
section = "output"
for key in linkcheck.Loggers.iterkeys():
from ..logger import Loggers
for key in Loggers.iterkeys():
if self.has_section(key):
for opt in self.options(key):
self.config[key][opt] = self.get(key, opt)
@ -91,8 +91,7 @@ class LCConfigParser (ConfigParser.RawConfigParser, object):
for val in filelist:
val = val.strip()
# no file output for the blacklist and none Logger
if val in linkcheck.Loggers and \
val not in ("blacklist", "none"):
if val in Loggers and val not in ("blacklist", "none"):
output = self.config.logger_new(val, fileoutput=1)
self.config['fileoutput'].append(output)
if self.has_option(section, "interactive"):
@ -107,7 +106,7 @@ class LCConfigParser (ConfigParser.RawConfigParser, object):
if self.has_option(section, "timeout"):
num = self.getint(section, "timeout")
if num < 0:
raise linkcheck.LinkCheckerError(
raise LinkCheckerError(
_("invalid negative value for timeout: %d\n"), num)
self.config['timeout'] = num
if self.has_option(section, "anchors"):
@ -167,7 +166,7 @@ class LCConfigParser (ConfigParser.RawConfigParser, object):
for val in read_multiline(self.get(section, "entry")):
auth = val.split()
if len(auth) != 3:
raise linkcheck.LinkCheckerError(LOG_CHECK,
raise LinkCheckerError(LOG_CHECK,
_("missing auth part in entry %(val)r") % \
{"val": val})
self.config["authentication"].insert(0,
@ -186,7 +185,7 @@ class LCConfigParser (ConfigParser.RawConfigParser, object):
_("the entry%(num)d syntax is deprecated; use " \
"the new multiline configuration syntax") % {"num": i})
if len(auth) != 3:
raise linkcheck.LinkCheckerError(LOG_CHECK,
raise LinkCheckerError(LOG_CHECK,
_("missing auth part in entry %(val)r") % \
{"val": val})
self.config["authentication"].insert(0,
@ -202,7 +201,7 @@ class LCConfigParser (ConfigParser.RawConfigParser, object):
section = "filtering"
if self.has_option(section, "nofollow"):
for line in read_multiline(self.get(section, "nofollow")):
pat = linkcheck.get_link_pat(line, strict=0)
pat = get_link_pat(line, strict=0)
self.config["externlinks"].append(pat)
# backward compatibility
i = 1
@ -214,7 +213,7 @@ class LCConfigParser (ConfigParser.RawConfigParser, object):
log.warn(LOG_CHECK,
_("the nofollow%(num)d syntax is deprecated; use " \
"the new multiline configuration syntax") % {"num": i})
pat = linkcheck.get_link_pat(val, strict=0)
pat = get_link_pat(val, strict=0)
self.config["externlinks"].append(pat)
i += 1
if self.has_option(section, "noproxyfor"):
@ -237,7 +236,7 @@ class LCConfigParser (ConfigParser.RawConfigParser, object):
self.get(section, 'ignorewarnings').split(',')]
if self.has_option(section, "ignore"):
for line in read_multiline(self.get(section, "ignore")):
pat = linkcheck.get_link_pat(line, strict=1)
pat = get_link_pat(line, strict=1)
self.config["externlinks"].append(pat)
# backward compatibility
i = 1
@ -250,9 +249,9 @@ class LCConfigParser (ConfigParser.RawConfigParser, object):
log.warn(LOG_CHECK,
_("the ignore%(num)d syntax is deprecated; use " \
"the new multiline configuration syntax") % {"num": i})
pat = linkcheck.get_link_pat(val, strict=1)
pat = get_link_pat(val, strict=1)
self.config["externlinks"].append(pat)
i += 1
if self.has_option(section, "internlinks"):
pat = linkcheck.get_link_pat(self.get(section, "internlinks"))
pat = get_link_pat(self.get(section, "internlinks"))
self.config["internlinks"].append(pat)

View file

@ -73,7 +73,7 @@ class TestConfig (unittest.TestCase):
self.assertTrue(config["interactive"])
self.assertTrue(linkcheck.log.is_debug(linkcheck.LOG_THREAD))
self.assertFalse(config["status"])
self.assertTrue(isinstance(config["logger"], linkcheck.Loggers["xml"]))
self.assertTrue(isinstance(config["logger"], linkcheck.logger.Loggers["xml"]))
self.assertTrue(config["verbose"])
self.assertTrue(config["warnings"])
self.assertFalse(config["quiet"])

View file

@ -32,7 +32,7 @@ import time
import re
import Cookie
import cookielib
import cStringIO as StringIO
from cStringIO import StringIO
import rfc822
from . import strformat
@ -345,7 +345,7 @@ def from_headers (strheader):
@return: tuple (headers, scheme, host, path)
@raises: ValueError for incomplete or invalid data
"""
fp = StringIO.StringIO(strheader)
fp = StringIO(strheader)
headers = rfc822.Message(fp, seekable=True)
if "Host" not in headers:
raise ValueError("Required header 'Host:' missing")

View file

@ -21,12 +21,8 @@ import time
import os
import thread
from .. import log, LOG_CHECK
import linkcheck.cache.urlqueue
import linkcheck.cache.robots_txt
import linkcheck.cache.cookie
import linkcheck.cache.connection
import aggregator
import console
from ..cache import urlqueue, robots_txt, cookie, connection
from . import aggregator, console
def check_urls (aggregate):
@ -59,7 +55,7 @@ def check_url (aggregate):
try:
aggregate.urlqueue.join(timeout=1)
break
except linkcheck.cache.urlqueue.Timeout:
except urlqueue.Timeout:
# Since urlqueue.join() is not interruptable, add a timeout
# and a one-second slumber.
time.sleep(1)
@ -110,9 +106,9 @@ def abort (aggregate):
def get_aggregate (config):
"""Get an aggregator instance with given configuration."""
urlqueue = linkcheck.cache.urlqueue.UrlQueue()
connections = linkcheck.cache.connection.ConnectionPool(wait=config["wait"])
cookies = linkcheck.cache.cookie.CookieJar()
robots_txt = linkcheck.cache.robots_txt.RobotsTxt()
return aggregator.Aggregate(config, urlqueue, connections,
cookies, robots_txt)
_urlqueue = urlqueue.UrlQueue()
connections = connection.ConnectionPool(wait=config["wait"])
cookies = cookie.CookieJar()
_robots_txt = robots_txt.RobotsTxt()
return aggregator.Aggregate(config, _urlqueue, connections,
cookies, _robots_txt)

View file

@ -20,12 +20,9 @@ Aggregate needed object instances for checker threads.
import time
import threading
from .. import log, LOG_CHECK
from linkcheck.decorators import synchronized
import linkcheck.director
import logger
import status
import checker
import cleanup
from ..decorators import synchronized
from ..cache import urlqueue
from . import logger, status, checker, cleanup
_lock = threading.Lock()
@ -66,7 +63,7 @@ class Aggregate (object):
self.urlqueue.do_shutdown()
try:
self.urlqueue.join(timeout=self.config["timeout"])
except linkcheck.cache.urlqueue.Timeout:
except urlqueue.Timeout:
log.warn(LOG_CHECK, "Abort timed out")
def remove_stopped_threads (self):

View file

@ -18,8 +18,8 @@
URL checking functions.
"""
import time
import task
import linkcheck.cache.urlqueue
from . import task
from ..cache import urlqueue
def check_url (urlqueue, logger):
@ -61,7 +61,7 @@ class Checker (task.CheckedTask):
finally:
self.urlqueue.task_done(url_data)
self.setName(self.origname)
except linkcheck.cache.urlqueue.Empty:
except urlqueue.Empty:
time.sleep(0.1)
def check_url_data (self, url_data):

View file

@ -16,7 +16,7 @@
# Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
"""Cleanup task."""
import time
import task
from . import task
class Cleanup (task.CheckedTask):

View file

@ -21,12 +21,11 @@ import sys
import os
import codecs
import traceback
import linkcheck.i18n
import linkcheck.configuration
from .. import i18n, configuration
# All output goes to stderr here, making sure the console gets correct
# encoded messages.
_encoding = linkcheck.i18n.default_encoding
_encoding = i18n.default_encoding
stderr = codecs.getwriter(_encoding)(sys.stderr, errors="ignore")
def internal_error ():
@ -47,7 +46,7 @@ or send mail to %s and include the following information:
Not disclosing some of the information above due to privacy reasons is ok.
I will try to help you nonetheless, but you have to give me something
I can work with ;) .
""") % linkcheck.configuration.Email
""") % configuration.Email
etype, value = sys.exc_info()[:2]
print >> stderr, etype, value
traceback.print_exc()
@ -61,7 +60,7 @@ def print_app_info ():
Print system and application info to stderr.
"""
print >> stderr, _("System info:")
print >> stderr, linkcheck.configuration.App
print >> stderr, configuration.App
print >> stderr, _("Python %(version)s on %(platform)s") % \
{"version": sys.version, "platform": sys.platform}
for key in ("LC_ALL", "LC_MESSAGES", "http_proxy", "ftp_proxy"):

View file

@ -16,7 +16,7 @@
# Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
"""Logger for aggregator instances"""
import threading
from linkcheck.decorators import synchronized
from ..decorators import synchronized
_lock = threading.Lock()

View file

@ -16,11 +16,9 @@
# Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
"""Status message handling"""
import time
import linkcheck.i18n
import linkcheck.strformat
import linkcheck.configuration
import task
from console import stderr
from .. import strformat
from . import task
from .console import stderr
class Status (task.CheckedTask):
@ -53,5 +51,5 @@ class Status (task.CheckedTask):
print >> stderr, msg,
msg = _n("%4d URL checked,", "%4d URLs checked,", checked) % checked
print >> stderr, msg,
msg = _("runtime %s") % linkcheck.strformat.strduration_long(duration)
msg = _("runtime %s") % strformat.strduration_long(duration)
print >> stderr, msg

View file

@ -15,13 +15,12 @@
# along with this program; if not, write to the Free Software
# Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
import thread
import linkcheck.decorators
from .. import log, LOG_CHECK
import linkcheck.threader
import console
from ..decorators import notimplemented
from .. import log, LOG_CHECK, threader
from . import console
class CheckedTask (linkcheck.threader.StoppableThread):
class CheckedTask (threader.StoppableThread):
"""Stoppable URL check task, handling error conditions while running."""
def run (self):
@ -34,7 +33,7 @@ class CheckedTask (linkcheck.threader.StoppableThread):
except Exception:
console.internal_error()
@linkcheck.decorators.notimplemented
@notimplemented
def run_checked (self):
"""Overload in subclass."""
pass

View file

@ -14,7 +14,7 @@
# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
import cStringIO as StringIO
from cStringIO import StringIO
import random
import struct
import sys
@ -165,7 +165,7 @@ class Message(object):
@rtype: string
"""
s = StringIO.StringIO()
s = StringIO()
print >> s, 'id %d' % self.id
print >> s, 'opcode %s' % \
linkcheck.dns.opcode.to_text(linkcheck.dns.opcode.from_flags(self.flags))

View file

@ -16,7 +16,7 @@
"""DNS nodes. A node is a set of rdatasets."""
import StringIO
from cStringIO import StringIO
import linkcheck.dns.rdataset
import linkcheck.dns.rdatatype
@ -48,7 +48,7 @@ class Node(object):
@rtype: string
"""
s = StringIO.StringIO()
s = StringIO()
for rds in self.rdatasets:
print >> s, rds.to_text(name, **kw)
return s.getvalue()[:-1]

View file

@ -17,7 +17,7 @@
"""DNS rdatasets (an rdataset is a set of rdatas of a given type and class)"""
import random
import StringIO
from cStringIO import StringIO
import struct
import linkcheck.dns.exception
@ -188,7 +188,7 @@ class Rdataset(linkcheck.dns.set.Set):
else:
ntext = ''
pad = ''
s = StringIO.StringIO()
s = StringIO()
if not override_rdclass is None:
rdclass = override_rdclass
else:

View file

@ -14,7 +14,7 @@
# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
import cStringIO
from cStringIO import StringIO
import struct
import linkcheck.dns.exception
@ -120,7 +120,7 @@ class CERT(linkcheck.dns.rdata.Rdata):
from_wire = classmethod(from_wire)
def _cmp(self, other):
f = cStringIO.StringIO()
f = StringIO()
self.to_wire(f)
wire1 = f.getvalue()
f.seek(0)

View file

@ -14,7 +14,7 @@
# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
import cStringIO
from cStringIO import StringIO
import struct
import linkcheck.dns.exception
@ -312,7 +312,7 @@ class LOC(linkcheck.dns.rdata.Rdata):
from_wire = classmethod(from_wire)
def _cmp(self, other):
f = cStringIO.StringIO()
f = StringIO()
self.to_wire(f)
wire1 = f.getvalue()
f.seek(0)

View file

@ -14,7 +14,7 @@
# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
import cStringIO
from cStringIO import StringIO
import linkcheck.dns.exception
import linkcheck.dns.rdata
@ -127,12 +127,12 @@ class NSEC(linkcheck.dns.rdata.Rdata):
def _cmp(self, other):
v = cmp(self.next, other.next)
if v == 0:
b1 = cStringIO.StringIO()
b1 = StringIO()
for (window, bitmap) in self.windows:
b1.write(chr(window))
b1.write(chr(len(bitmap)))
b1.write(bitmap)
b2 = cStringIO.StringIO()
b2 = StringIO()
for (window, bitmap) in other.windows:
b2.write(chr(window))
b2.write(chr(len(bitmap)))

View file

@ -14,7 +14,7 @@
# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
import cStringIO
from cStringIO import StringIO
import struct
import linkcheck.dns.exception
@ -158,7 +158,7 @@ class APL(linkcheck.dns.rdata.Rdata):
from_wire = classmethod(from_wire)
def _cmp(self, other):
f = cStringIO.StringIO()
f = StringIO()
self.to_wire(f)
wire1 = f.getvalue()
f.seek(0)

View file

@ -16,7 +16,7 @@
"""Help for building DNS wire format messages"""
import cStringIO as StringIO
from cStringIO import StringIO
import random
import struct
import time
@ -52,7 +52,7 @@ class Renderer(object):
wire = r.get_wire()
@ivar output: where rendering is written
@type output: StringIO.StringIO object
@type output: StringIO object
@ivar id: the message id
@type id: int
@ivar flags: the message flags
@ -87,7 +87,7 @@ class Renderer(object):
@type origin: linkcheck.dns.name.Namem or None.
"""
self.output = StringIO.StringIO()
self.output = StringIO()
if id is None:
self.id = random.randint(0, 65535)
else:

View file

@ -15,7 +15,7 @@
# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
import unittest
import cStringIO as StringIO
from cStringIO import StringIO
import linkcheck.dns.name
@ -358,14 +358,14 @@ class TestName (unittest.TestCase):
def testToWire1(self):
n = linkcheck.dns.name.from_text('FOO.bar')
f = StringIO.StringIO()
f = StringIO()
compress = {}
n.to_wire(f, compress)
self.assertEqual(f.getvalue(), '\x03FOO\x03bar\x00')
def testToWire2(self):
n = linkcheck.dns.name.from_text('FOO.bar')
f = StringIO.StringIO()
f = StringIO()
compress = {}
n.to_wire(f, compress)
n.to_wire(f, compress)
@ -374,7 +374,7 @@ class TestName (unittest.TestCase):
def testToWire3(self):
n1 = linkcheck.dns.name.from_text('FOO.bar')
n2 = linkcheck.dns.name.from_text('foo.bar')
f = StringIO.StringIO()
f = StringIO()
compress = {}
n1.to_wire(f, compress)
n2.to_wire(f, compress)
@ -383,7 +383,7 @@ class TestName (unittest.TestCase):
def testToWire4(self):
n1 = linkcheck.dns.name.from_text('FOO.bar')
n2 = linkcheck.dns.name.from_text('a.foo.bar')
f = StringIO.StringIO()
f = StringIO()
compress = {}
n1.to_wire(f, compress)
n2.to_wire(f, compress)
@ -392,7 +392,7 @@ class TestName (unittest.TestCase):
def testToWire5(self):
n1 = linkcheck.dns.name.from_text('FOO.bar')
n2 = linkcheck.dns.name.from_text('a.foo.bar')
f = StringIO.StringIO()
f = StringIO()
compress = {}
n1.to_wire(f, compress)
n2.to_wire(f, None)
@ -402,7 +402,7 @@ class TestName (unittest.TestCase):
def testBadToWire(self):
def bad():
n = linkcheck.dns.name.from_text('FOO.bar', None)
f = StringIO.StringIO()
f = StringIO()
compress = {}
n.to_wire(f, compress)
self.assertRaises(linkcheck.dns.name.NeedAbsoluteNameOrOrigin, bad)

View file

@ -14,7 +14,7 @@
# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
import cStringIO as StringIO
from cStringIO import StringIO
import sys
import time
import unittest
@ -51,7 +51,7 @@ class TestResolver (unittest.TestCase):
if sys.platform != 'win32':
def testRead(self):
f = StringIO.StringIO(resolv_conf)
f = StringIO(resolv_conf)
r = linkcheck.dns.resolver.Resolver(f)
self.assertEqual(r.nameservers, ['10.0.0.1', '10.0.0.2'])
self.assertEqual(r.domain, linkcheck.dns.name.from_text('foo'))

View file

@ -15,7 +15,7 @@
# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
from __future__ import with_statement
import cStringIO as StringIO
from cStringIO import StringIO
import os
import unittest
import difflib
@ -155,7 +155,7 @@ class TestZone (unittest.TestCase):
def testFromText(self):
z = linkcheck.dns.zone.from_text(example_text, 'example.', relativize=True)
f = StringIO.StringIO()
f = StringIO()
names = z.nodes.keys()
names.sort()
for n in names:
@ -168,7 +168,7 @@ class TestZone (unittest.TestCase):
# for each RR in the zone, convert the rdata into wire format
# and then back out, and see if we get equal rdatas.
#
f = StringIO.StringIO()
f = StringIO()
o = linkcheck.dns.name.from_text('example.')
z = linkcheck.dns.zone.from_file(fname('example'), o)
for (name, node) in z.iteritems():

View file

@ -16,7 +16,7 @@
"""Tokenize DNS master file format"""
import cStringIO
from cStringIO import StringIO
import sys
import linkcheck.dns.exception
@ -89,7 +89,7 @@ class Tokenizer(object):
"""
if isinstance(f, str):
f = cStringIO.StringIO(f)
f = StringIO(f)
if filename is None:
filename = '<string>'
else:

View file

@ -34,7 +34,7 @@ import sys
import socket
import errno
import cgi
import cStringIO as StringIO
from cStringIO import StringIO
# Set various FastCGI constants
# Maximum number of requests that can be handled
@ -429,8 +429,8 @@ class FCGI (object):
data += r.content
# end of while remaining:
self.stdin = sys.stdin = StringIO.StringIO(stdin)
self.data = StringIO.StringIO(data)
self.stdin = sys.stdin = StringIO(stdin)
self.data = StringIO(data)
r = Record()
r.rec_type = FCGI_STDERR
r.req_id = self.request_id

View file

@ -26,12 +26,9 @@ import time
import urlparse
import types
import linkcheck.configuration
import linkcheck.url
import linkcheck.i18n
import linkcheck.strformat
import linkcheck.checker
import linkcheck.director
from . import configuration, strformat, checker, director
from . import add_intern_pattern, get_link_pat, init_i18n
from . import url as urlutil
_logfile = None
_supported_langs = ('de', 'C')
@ -79,7 +76,7 @@ def checklink (out=sys.stdout, form=None, env=os.environ):
logit(form, env)
print_error(out, why)
return
config = linkcheck.configuration.Configuration()
config = configuration.Configuration()
config["recursionlevel"] = int(form["level"].value)
config["logger"] = config.logger_new('html', fd=out)
config["threads"] = 0
@ -88,22 +85,22 @@ def checklink (out=sys.stdout, form=None, env=os.environ):
if "errors" not in form:
config["verbose"] = True
# avoid checking of local files or other nasty stuff
pat = "!^%s$" % linkcheck.url.safe_url_pattern
config["externlinks"].append(linkcheck.get_link_pat(pat, strict=True))
pat = "!^%s$" % urlutil.safe_url_pattern
config["externlinks"].append(get_link_pat(pat, strict=True))
# start checking
aggregate = linkcheck.director.get_aggregate(config)
get_url_from = linkcheck.checker.get_url_from
aggregate = director.get_aggregate(config)
get_url_from = checker.get_url_from
url = form["url"].value
url_data = get_url_from(url, 0, aggregate)
try:
linkcheck.add_intern_pattern(url_data, config)
add_intern_pattern(url_data, config)
except UnicodeError:
logit({}, env)
print_error(out,
u"URL has unparsable domain name: %s" % sys.exc_info()[1])
return
aggregate.urlqueue.put(url_data)
linkcheck.director.check_urls(aggregate)
director.check_urls(aggregate)
def get_host_name (form):
@ -120,7 +117,7 @@ def checkform (form):
lang = form['language'].value
if lang in _supported_langs:
locale.setlocale(locale.LC_ALL, lang_locale[lang])
linkcheck.init_i18n()
init_i18n()
else:
raise FormError(_("unsupported language"))
# check url syntax
@ -128,7 +125,7 @@ def checkform (form):
url = form["url"].value
if not url or url == "http://":
raise FormError(_("empty url was given"))
if not linkcheck.url.is_safe_url(url):
if not urlutil.is_safe_url(url):
raise FormError(_("disallowed url was given"))
else:
raise FormError(_("no url was given"))
@ -150,7 +147,7 @@ def logit (form, env):
return
elif type(_logfile) == types.StringType:
_logfile = file(_logfile, "a")
_logfile.write("\n"+linkcheck.strformat.strtime(time.time())+"\n")
_logfile.write("\n" + strformat.strtime(time.time())+"\n")
for var in ("HTTP_USER_AGENT", "REMOTE_ADDR",
"REMOTE_HOST", "REMOTE_PORT"):
if var in env:

View file

@ -19,9 +19,7 @@ Find link tags in HTML text.
"""
import re
from . import strformat, log, LOG_CHECK
import linkcheck.linkname
import linkcheck.url
from . import strformat, log, LOG_CHECK, linkname, url as urlutil
MAX_NAMELEN = 256
unquote = strformat.unquote
@ -63,7 +61,7 @@ LinkTags = {
refresh_re = re.compile(ur"(?i)^\d+;\s*url=(?P<url>.+)$")
_quoted_pat = ur"('[^']+'|\"[^\"]+\"|[^\)\s]+)"
css_url_re = re.compile(ur"url\(\s*(?P<url>%s)\s*\)" % _quoted_pat)
swf_url_re = re.compile("(?i)%s" % linkcheck.url.safe_url_pattern)
swf_url_re = re.compile("(?i)%s" % urlutil.safe_url_pattern)
c_comment_re = re.compile(ur"/\*.*?\*/", re.DOTALL)
def strip_c_comments (text):
@ -182,7 +180,7 @@ class LinkFinder (TagFinder):
# position, to limit the amount of data to encode.
data = self.content[pos:pos+MAX_NAMELEN]
data = data.decode(self.parser.encoding, "ignore")
name = linkcheck.linkname.href_name(data)
name = linkname.href_name(data)
elif tag == 'img':
name = unquote(attrs.get_true('alt', u''))
if not name:

View file

@ -21,7 +21,7 @@ Logging and debug functions.
import logging
import os
import inspect
import cStringIO as StringIO
from cStringIO import StringIO
# memory leak debugging
#import gc
@ -35,7 +35,7 @@ def _stack_format (stack):
@return: formatted stack message
@rtype: string
"""
s = StringIO.StringIO()
s = StringIO()
s.write('Traceback:')
s.write(os.linesep)
for frame, fname, lineno, method, lines, dummy in reversed(stack):

View file

@ -21,12 +21,8 @@ Output logging support for different formats.
import sys
import os
import datetime
import linkcheck.strformat
import linkcheck.i18n
import linkcheck.decorators
import linkcheck.dummy
from .. import log, LOG_CHECK
from ..decorators import notimplemented
from .. import log, LOG_CHECK, strformat, i18n, dummy
_ = lambda x: x
Fields = dict(
@ -76,8 +72,7 @@ class Logger (object):
# number of warnings that were printed
self.warnings_printed = 0
# encoding of output
default = linkcheck.i18n.default_encoding
self.output_encoding = args.get("encoding", default)
self.output_encoding = args.get("encoding", i18n.default_encoding)
def init_fileoutput (self, args):
"""
@ -105,7 +100,7 @@ class Logger (object):
log.warn(LOG_CHECK,
"Could not open file %r for writing: %s\n"
"Disabling log output of %s", self.filename, msg, self)
self.fd = linkcheck.dummy.Dummy()
self.fd = dummy.Dummy()
self.filename = None
def close_fileoutput (self):
@ -168,7 +163,7 @@ class Logger (object):
"""
sep = os.linesep+os.linesep
text = sep.join(lines)
return linkcheck.strformat.wrap(text, width,
return strformat.wrap(text, width,
subsequent_indent=" "*self.max_indent,
initial_indent=" "*self.max_indent).lstrip()
@ -245,14 +240,14 @@ class Logger (object):
self.warnings_printed += num_warnings
self.log_url(url_data)
@linkcheck.decorators.notimplemented
@notimplemented
def log_url (self, url_data):
"""
Log a new url with this logger.
"""
pass
@linkcheck.decorators.notimplemented
@notimplemented
def end_output (self):
"""
End of output, used for cleanup (eg output buffer flushing).
@ -282,3 +277,34 @@ class Logger (object):
self.fd.flush()
except IOError:
pass
# note: don't confuse URL loggers with application logs above
from .text import TextLogger
from .html import HtmlLogger
from .gml import GMLLogger
from .dot import DOTLogger
from .sql import SQLLogger
from .csvlog import CSVLogger
from .blacklist import BlacklistLogger
from .gxml import GraphXMLLogger
from .customxml import CustomXMLLogger
from .none import NoneLogger
# default link logger classes
Loggers = {
"text": TextLogger,
"html": HtmlLogger,
"gml": GMLLogger,
"dot": DOTLogger,
"sql": SQLLogger,
"csv": CSVLogger,
"blacklist": BlacklistLogger,
"gxml": GraphXMLLogger,
"xml": CustomXMLLogger,
"none": NoneLogger,
}
# for easy printing: a comma separated logger list
LoggerKeys = ", ".join(repr(name) for name in Loggers)

View file

@ -20,10 +20,10 @@ A blacklist logger.
from __future__ import with_statement
import os
import linkcheck.logger
from . import Logger
class BlacklistLogger (linkcheck.logger.Logger):
class BlacklistLogger (Logger):
"""
Updates a blacklist of wrong links. If a link on the blacklist
is working (again), it is removed from the list. So after n days

View file

@ -17,16 +17,14 @@
"""
A CSV logger.
"""
import time
import csv
import os
import linkcheck.logger
import linkcheck.configuration
from . import Logger
from .. import strformat, configuration
class CSVLogger (linkcheck.logger.Logger):
class CSVLogger (Logger):
"""
CSV output, consisting of one line per entry. Entries are
separated by a semicolon.
@ -57,12 +55,12 @@ class CSVLogger (linkcheck.logger.Logger):
row = []
if self.has_part("intro"):
self.comment(_("created by %(app)s at %(time)s") %
{"app": linkcheck.configuration.AppName,
"time": linkcheck.strformat.strtime(self.starttime)})
{"app": configuration.AppName,
"time": strformat.strtime(self.starttime)})
self.comment(_("Get the newest version at %(url)s") %
{'url': linkcheck.configuration.Url})
{'url': configuration.Url})
self.comment(_("Write comments and bugs to %(email)s") %
{'email': linkcheck.configuration.Email})
{'email': configuration.Email})
self.check_date()
self.comment(_("Format of the entries:"))
for s in (u"urlname",
@ -120,6 +118,6 @@ class CSVLogger (linkcheck.logger.Logger):
if self.has_part("outro"):
duration = self.stoptime - self.starttime
self.comment(_("Stopped checking at %(time)s (%(duration)s)") %
{"time": linkcheck.strformat.strtime(self.stoptime),
"duration": linkcheck.strformat.strduration_long(duration)})
{"time": strformat.strtime(self.stoptime),
"duration": strformat.strduration_long(duration)})
self.close_fileoutput()

View file

@ -17,9 +17,8 @@
"""
An XML logger.
"""
import xmllog
import linkcheck.strformat
from . import xmllog
from .. import strformat
class CustomXMLLogger (xmllog.XMLLogger):
@ -33,7 +32,7 @@ class CustomXMLLogger (xmllog.XMLLogger):
"""
super(CustomXMLLogger, self).start_output()
self.xml_start_output()
attrs = {"created": linkcheck.strformat.strtime(self.starttime)}
attrs = {"created": strformat.strtime(self.starttime)}
self.xml_starttag(u'linkchecker', attrs)
self.flush()

View file

@ -18,13 +18,12 @@
A DOT graph format logger. The specification has been taken from
http://www.graphviz.org/doc/info/lang.html
"""
import time
import linkcheck.configuration
from . import Logger
from .. import configuration, strformat
class DOTLogger (linkcheck.logger.Logger):
class DOTLogger (Logger):
"""
Generates .dot sitemap graphs. Use graphviz to see the sitemap graph.
"""
@ -46,12 +45,12 @@ class DOTLogger (linkcheck.logger.Logger):
self.starttime = time.time()
if self.has_part("intro"):
self.comment(_("created by %(app)s at %(time)s") %
{"app": linkcheck.configuration.AppName,
"time": linkcheck.strformat.strtime(self.starttime)})
{"app": configuration.AppName,
"time": strformat.strtime(self.starttime)})
self.comment(_("Get the newest version at %(url)s") %
{'url': linkcheck.configuration.Url})
{'url': configuration.Url})
self.comment(_("Write comments and bugs to %(email)s") %
{'email': linkcheck.configuration.Email})
{'email': configuration.Email})
self.check_date()
self.writeln()
self.writeln(u"digraph G {")
@ -112,8 +111,8 @@ class DOTLogger (linkcheck.logger.Logger):
self.stoptime = time.time()
duration = self.stoptime - self.starttime
self.comment(_("Stopped checking at %(time)s (%(duration)s)") %
{"time": linkcheck.strformat.strtime(self.stoptime),
"duration": linkcheck.strformat.strduration_long(duration)})
{"time": strformat.strtime(self.stoptime),
"duration": strformat.strduration_long(duration)})
self.close_fileoutput()

View file

@ -17,13 +17,12 @@
"""
A gml logger.
"""
import time
import linkcheck.configuration
from . import Logger
from .. import configuration, strformat
class GMLLogger (linkcheck.logger.Logger):
class GMLLogger (Logger):
"""
GML means Graph Modeling Language. Use a GML tool to see
the sitemap graph.
@ -46,12 +45,12 @@ class GMLLogger (linkcheck.logger.Logger):
self.starttime = time.time()
if self.has_part("intro"):
self.comment(_("created by %(app)s at %(time)s") %
{"app": linkcheck.configuration.AppName,
"time": linkcheck.strformat.strtime(self.starttime)})
{"app": configuration.AppName,
"time": strformat.strtime(self.starttime)})
self.comment(_("Get the newest version at %(url)s") %
{'url': linkcheck.configuration.Url})
{'url': configuration.Url})
self.comment(_("Write comments and bugs to %(email)s") %
{'email': linkcheck.configuration.Email})
{'email': configuration.Email})
self.check_date()
self.writeln()
self.writeln(u"graph [")
@ -116,6 +115,6 @@ class GMLLogger (linkcheck.logger.Logger):
self.stoptime = time.time()
duration = self.stoptime - self.starttime
self.comment(_("Stopped checking at %(time)s (%(duration)s)") %
{"time": linkcheck.strformat.strtime(self.stoptime),
"duration": linkcheck.strformat.strduration_long(duration)})
{"time": strformat.strtime(self.stoptime),
"duration": strformat.strduration_long(duration)})
self.close_fileoutput()

View file

@ -18,7 +18,7 @@
A GraphXML logger.
"""
import xmllog
from . import xmllog
class GraphXMLLogger (xmllog.XMLLogger):

View file

@ -17,14 +17,11 @@
"""
A HTML logger.
"""
import time
import cgi
import os
import linkcheck.logger
import linkcheck.strformat
import linkcheck.configuration
from . import Logger
from .. import strformat, configuration
# ss=1 enables show source
@ -52,7 +49,7 @@ HTML_HEADER = """<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN"
<body bgcolor="%(body)s" link="%(link)s" vlink="%(vlink)s" alink="%(alink)s">
"""
class HtmlLogger (linkcheck.logger.Logger):
class HtmlLogger (Logger):
"""
Logger with HTML output.
"""
@ -91,20 +88,20 @@ class HtmlLogger (linkcheck.logger.Logger):
self.starttime = time.time()
header = {
"encoding": self.output_encoding,
"title": linkcheck.configuration.App,
"title": configuration.App,
"body": self.colorbackground,
"link": self.colorlink,
"vlink": self.colorlink,
"alink": self.colorlink,
}
self.write(HTML_HEADER % header)
self.comment("Generated by %s" % linkcheck.configuration.App)
self.comment("Generated by %s" % configuration.App)
if self.has_part('intro'):
self.write(u"<center><h2>"+linkcheck.configuration.App+
self.write(u"<center><h2>"+configuration.App+
"</h2></center><br><blockquote>"+
linkcheck.configuration.Freeware+"<br><br>"+
configuration.Freeware+"<br><br>"+
(_("Start checking at %s") %
linkcheck.strformat.strtime(self.starttime))+
strformat.strtime(self.starttime))+
os.linesep+"<br>")
self.check_date()
self.flush()
@ -227,7 +224,7 @@ class HtmlLogger (linkcheck.logger.Logger):
Write url_data.dlsize.
"""
self.writeln(u"<tr><td>"+self.part("dlsize")+u"</td><td>"+
linkcheck.strformat.strsize(url_data.dlsize)+
strformat.strsize(url_data.dlsize)+
u"</td></tr>")
def write_checktime (self, url_data):
@ -300,15 +297,15 @@ class HtmlLogger (linkcheck.logger.Logger):
self.stoptime = time.time()
duration = self.stoptime - self.starttime
self.writeln(_("Stopped checking at %(time)s (%(duration)s)") %
{"time": linkcheck.strformat.strtime(self.stoptime),
"duration": linkcheck.strformat.strduration_long(duration)})
{"time": strformat.strtime(self.stoptime),
"duration": strformat.strduration_long(duration)})
self.writeln(u'</blockquote><br><hr noshade size="1"><small>'+
linkcheck.configuration.HtmlAppInfo+u"<br>")
configuration.HtmlAppInfo+u"<br>")
self.writeln(_("Get the newest version at %s") %
(u'<a href="'+linkcheck.configuration.Url+u'" target="_top">'+
linkcheck.configuration.Url+u"</a>.<br>"))
(u'<a href="'+configuration.Url+u'" target="_top">'+
configuration.Url+u"</a>.<br>"))
self.writeln(_("Write comments and bugs to %s") %
(u'<a href="mailto:'+linkcheck.configuration.Email+u'">'+
linkcheck.configuration.Email+u"</a>."))
(u'<a href="mailto:'+configuration.Email+u'">'+
configuration.Email+u"</a>."))
self.writeln(u"</small></body></html>")
self.close_fileoutput()

View file

@ -17,11 +17,10 @@
"""
A dummy logger.
"""
import linkcheck.logger
from . import Logger
class NoneLogger (linkcheck.logger.Logger):
class NoneLogger (Logger):
"""
Dummy logger printing nothing.
"""

View file

@ -20,9 +20,8 @@ A SQL logger.
import time
import os
import linkcheck.logger
import linkcheck.configuration
from . import Logger
from .. import configuration, strformat, url as urlutil
def sqlify (s):
@ -48,7 +47,7 @@ def intify (s):
return 0
class SQLLogger (linkcheck.logger.Logger):
class SQLLogger (Logger):
"""
SQL output, should work with any SQL database (not tested).
"""
@ -73,16 +72,16 @@ class SQLLogger (linkcheck.logger.Logger):
"""
Write start of checking info as sql comment.
"""
linkcheck.logger.Logger.start_output(self)
Logger.start_output(self)
self.starttime = time.time()
if self.has_part("intro"):
self.comment(_("created by %(app)s at %(time)s") %
{"app": linkcheck.configuration.AppName,
"time": linkcheck.strformat.strtime(self.starttime)})
{"app": configuration.AppName,
"time": strformat.strtime(self.starttime)})
self.comment(_("Get the newest version at %s") %
linkcheck.configuration.Url)
configuration.Url)
self.comment(_("Write comments and bugs to %s") %
linkcheck.configuration.Email)
configuration.Email)
self.check_date()
self.writeln()
self.flush()
@ -122,7 +121,7 @@ class SQLLogger (linkcheck.logger.Logger):
'result': sqlify(url_data.result),
'warning': sqlify(os.linesep.join(log_warnings)),
'info': sqlify(os.linesep.join(log_infos)),
'url': sqlify(linkcheck.url.url_quote(url_data.url or u"")),
'url': sqlify(urlutil.url_quote(url_data.url or u"")),
'line': url_data.line,
'column': url_data.column,
'name': sqlify(url_data.name),
@ -142,6 +141,6 @@ class SQLLogger (linkcheck.logger.Logger):
self.stoptime = time.time()
duration = self.stoptime - self.starttime
self.comment(_("Stopped checking at %(time)s (%(duration)s)") %
{"time": linkcheck.strformat.strtime(self.stoptime),
"duration": linkcheck.strformat.strduration_long(duration)})
{"time": strformat.strtime(self.stoptime),
"duration": strformat.strduration_long(duration)})
self.close_fileoutput()

View file

@ -17,16 +17,12 @@
"""
The default text logger.
"""
import time
import linkcheck.ansicolor
import linkcheck.logger
import linkcheck.strformat
import linkcheck.configuration
from . import Logger
from .. import ansicolor, strformat, configuration
class TextLogger (linkcheck.logger.Logger):
class TextLogger (Logger):
"""
A text logger, colorizing the output if possible.
@ -70,7 +66,7 @@ class TextLogger (linkcheck.logger.Logger):
super(TextLogger, self).__init__(**args)
self.init_fileoutput(args)
if self.fd is not None:
self.fd = linkcheck.ansicolor.Colorizer(self.fd)
self.fd = ansicolor.Colorizer(self.fd)
self.colorparent = args['colorparent']
self.colorurl = args['colorurl']
self.colorname = args['colorname']
@ -86,7 +82,7 @@ class TextLogger (linkcheck.logger.Logger):
def start_fileoutput (self):
super(TextLogger, self).start_fileoutput()
self.fd = linkcheck.ansicolor.Colorizer(self.fd)
self.fd = ansicolor.Colorizer(self.fd)
def start_output (self):
"""
@ -95,16 +91,16 @@ class TextLogger (linkcheck.logger.Logger):
super(TextLogger, self).start_output()
self.starttime = time.time()
if self.has_part('intro'):
self.writeln(linkcheck.configuration.AppInfo)
self.writeln(linkcheck.configuration.Freeware)
self.writeln(configuration.AppInfo)
self.writeln(configuration.Freeware)
self.writeln(_("Get the newest version at %(url)s") %
{'url': linkcheck.configuration.Url})
{'url': configuration.Url})
self.writeln(_("Write comments and bugs to %(email)s") %
{'email': linkcheck.configuration.Email})
{'email': configuration.Email})
self.check_date()
self.writeln()
self.writeln(_("Start checking at %s") %
linkcheck.strformat.strtime(self.starttime))
strformat.strtime(self.starttime))
self.flush()
def log_url (self, url_data):
@ -191,7 +187,7 @@ class TextLogger (linkcheck.logger.Logger):
Write url_data.dlsize.
"""
self.write(self.part("dlsize") + self.spaces("dlsize"))
self.writeln(linkcheck.strformat.strsize(url_data.dlsize),
self.writeln(strformat.strsize(url_data.dlsize),
color=self.colordlsize)
def write_checktime (self, url_data):
@ -257,6 +253,6 @@ class TextLogger (linkcheck.logger.Logger):
self.stoptime = time.time()
duration = self.stoptime - self.starttime
self.writeln(_("Stopped checking at %(time)s (%(duration)s)") %
{"time": linkcheck.strformat.strtime(self.stoptime),
"duration": linkcheck.strformat.strduration_long(duration)})
{"time": strformat.strtime(self.stoptime),
"duration": strformat.strduration_long(duration)})
self.close_fileoutput()

View file

@ -20,9 +20,8 @@ Base class for XML loggers.
import time
import xml.sax.saxutils
import linkcheck.logger
import linkcheck.configuration
from . import Logger
from .. import configuration, strformat
xmlattr_entities = {
@ -47,7 +46,7 @@ def xmlquoteattr (s):
return xml.sax.saxutils.escape(s, xmlattr_entities)
class XMLLogger (linkcheck.logger.Logger):
class XMLLogger (Logger):
"""
XML output mirroring the GML structure. Easy to parse with any XML
tool.
@ -80,12 +79,12 @@ class XMLLogger (linkcheck.logger.Logger):
(xmlquoteattr(version), xmlquoteattr(encoding)))
if self.has_part("intro"):
self.comment(_("created by %(app)s at %(time)s") %
{"app": linkcheck.configuration.AppName,
"time": linkcheck.strformat.strtime(self.starttime)})
{"app": configuration.AppName,
"time": strformat.strtime(self.starttime)})
self.comment(_("Get the newest version at %(url)s") %
{'url': linkcheck.configuration.Url})
{'url': configuration.Url})
self.comment(_("Write comments and bugs to %(email)s") %
{'email': linkcheck.configuration.Email})
{'email': configuration.Email})
self.check_date()
self.writeln()
@ -97,8 +96,8 @@ class XMLLogger (linkcheck.logger.Logger):
self.stoptime = time.time()
duration = self.stoptime - self.starttime
self.comment(_("Stopped checking at %(time)s (%(duration)s)") %
{"time": linkcheck.strformat.strtime(self.stoptime),
"duration": linkcheck.strformat.strduration_long(duration)})
{"time": strformat.strtime(self.stoptime),
"duration": strformat.strduration_long(duration)})
def xml_starttag (self, name, attrs=None):
"""

View file

@ -20,7 +20,7 @@ Test decorators.
import unittest
import time
import cStringIO as StringIO
from cStringIO import StringIO
import linkcheck.decorators
@ -36,7 +36,7 @@ class TestDecorators (unittest.TestCase):
self.assertEquals(f(), 42)
def test_timeit2 (self):
log = StringIO.StringIO()
log = StringIO()
@linkcheck.decorators.timed(log=log, limit=0)
def f ():
time.sleep(1)

View file

@ -20,7 +20,7 @@ Test html parsing.
import linkcheck.HtmlParser.htmlsax
import linkcheck.HtmlParser.htmllib
import cStringIO as StringIO
from cStringIO import StringIO
import unittest
@ -195,7 +195,7 @@ class TestParser (unittest.TestCase):
Parse all test patterns in one go.
"""
for _in, _out in parsetests:
out = StringIO.StringIO()
out = StringIO()
handler = linkcheck.HtmlParser.htmllib.HtmlPrettyPrinter(out)
self.htmlparser.handler = handler
self.htmlparser.feed(_in)
@ -217,7 +217,7 @@ class TestParser (unittest.TestCase):
Parse all test patterns sequentially.
"""
for _in, _out in parsetests:
out = StringIO.StringIO()
out = StringIO()
handler = linkcheck.HtmlParser.htmllib.HtmlPrettyPrinter(out)
self.htmlparser.handler = handler
for c in _in:
@ -229,8 +229,8 @@ class TestParser (unittest.TestCase):
Parse all test patterns on two parsers interwoven.
"""
for _in, _out in parsetests:
out = StringIO.StringIO()
out2 = StringIO.StringIO()
out = StringIO()
out2 = StringIO()
handler = linkcheck.HtmlParser.htmllib.HtmlPrettyPrinter(out)
self.htmlparser.handler = handler
handler2 = linkcheck.HtmlParser.htmllib.HtmlPrettyPrinter(out2)
@ -243,8 +243,8 @@ class TestParser (unittest.TestCase):
def test_handler (self):
for _in, _out in parsetests:
out = StringIO.StringIO()
out2 = StringIO.StringIO()
out = StringIO()
out2 = StringIO()
handler = linkcheck.HtmlParser.htmllib.HtmlPrinter(out)
self.htmlparser.handler = handler
handler2 = linkcheck.HtmlParser.htmllib.HtmlPrinter(out2)
@ -259,7 +259,7 @@ class TestParser (unittest.TestCase):
Test parser flushing.
"""
for _in, _out in flushtests:
out = StringIO.StringIO()
out = StringIO()
handler = linkcheck.HtmlParser.htmllib.HtmlPrettyPrinter(out)
self.htmlparser.handler = handler
self.htmlparser.feed(_in)

View file

@ -32,13 +32,12 @@ import linkcheck
# override optparse gettext method with the one from linkcheck.init_i18n()
optparse._ = _
# now import the rest of the linkchecker gang
from linkcheck import log, LOG_CMDLINE
import linkcheck.i18n
from linkcheck import log, LOG_CMDLINE, i18n, strformat
import linkcheck.checker
import linkcheck.director
import linkcheck.configuration
import linkcheck.fileutil
import linkcheck.strformat
import linkcheck.logger
# optional modules
has_optcomplete = linkcheck.fileutil.has_module("optcomplete")
has_profile = linkcheck.fileutil.has_module("profile")
@ -276,7 +275,7 @@ class LCHelpFormatter (optparse.IndentedHelpFormatter):
indent_first = 0
result.append(opts)
if option.help:
text = linkcheck.strformat.wrap(option.help, self.help_width)
text = strformat.wrap(option.help, self.help_width)
help_lines = text.splitlines()
result.append("%*s%s\n" % (indent_first, "", help_lines[0]))
result.extend(["%*s%s\n" % (self.help_position, "", line)
@ -306,9 +305,9 @@ class LCOptionParser (optparse.OptionParser, object):
s = u"%s\n%s\n%s\n%s\n%s\n%s\n%s\n%s\n%s" % (self.format_help(),
Examples, LoggerTypes, RegularExpressions, CookieFormat,
ProxySupport, Notes, Retval, Warnings)
s = s.encode(linkcheck.i18n.default_encoding, "replace")
s = s.encode(i18n.default_encoding, "replace")
if os.name != 'posix':
linkcheck.strformat.paginate(s)
strformat.paginate(s)
else:
print s
sys.exit(0)
@ -392,7 +391,7 @@ The ENCODING specifies the output encoding, the default is that of your
locale.
Valid encodings are listed at """ \
"""http://docs.python.org/lib/standard-encodings.html.""") % \
{'loggertypes': linkcheck.LoggerKeys})
{'loggertypes': linkcheck.logger.LoggerKeys})
group.add_option("-F", "--file-output", type="string", action="append",
dest="fileoutput", metavar="TYPE[/ENCODING][/FILENAME]",
help=_(
@ -407,7 +406,7 @@ You can specify this option more than once. Valid file output types
are %(loggertypes)s. You can specify this option multiple times to output
to more than one file. Default is no file output. Note that you can
suppress all console output with the option '-o none'.""") % \
{'loggertypes': linkcheck.LoggerKeys})
{'loggertypes': linkcheck.logger.LoggerKeys})
group.add_option("--no-status", action="store_false", dest="status",
default=True, help=_(
"""Do not print check status messages."""))
@ -576,8 +575,8 @@ if options.output:
if "/" in options.output:
logtype, encoding = options.output.split("/", 1)
else:
logtype, encoding = options.output, linkcheck.i18n.default_encoding
if not linkcheck.Loggers.has_key(logtype.lower()):
logtype, encoding = options.output, i18n.default_encoding
if not linkcheck.logger.Loggers.has_key(logtype.lower()):
print_usage(
_("Unknown logger type %(type)r in %(output)r for option %(option)s") % \
{"type": logtype, "output": options.output, "option": "'-o, --output'"})
@ -608,7 +607,7 @@ if options.fileoutput:
ns['filename'] = suffix
else:
ns['filename'] = suffix
if not linkcheck.Loggers.has_key(ftype):
if not linkcheck.logger.Loggers.has_key(ftype):
print_usage(
_("Unknown logger type %(type)r in %(output)r for option %(option)s") % \
{"type": ftype, "output": options.output,