Require and use Python 2.5

Use Python 2.5 features and get rid of old compat code. Also some
code cleanups have been made.


git-svn-id: https://linkchecker.svn.sourceforge.net/svnroot/linkchecker/trunk/linkchecker@3737 e7d03fd6-7b0d-0410-9947-9c21f3af8025
This commit is contained in:
calvin 2008-04-27 11:39:21 +00:00
parent b9a5215b6d
commit 3eac1be9ab
61 changed files with 572 additions and 1167 deletions

View file

@ -34,7 +34,7 @@ try:
form=req.getFieldStorage(),
env=req.env)
req.Finish()
except:
except Exception:
import traceback
traceback.print_exc(file = open('traceback', 'a'))

View file

@ -11,7 +11,7 @@ Produces custom HTML and machine-parseable navigation info
try:
import locale
locale.setlocale(locale.LC_ALL, '')
except:
except Exception:
pass
from docutils.core import publish_cmdline, default_description

View file

@ -15,15 +15,16 @@ import sys
if not sys.platform.startswith('win'):
# not for us
sys.exit()
if not hasattr(sys, "version_info"):
raise SystemExit, "This program requires Python 2.4 or later."
if sys.version_info < (2, 4, 0, 'final', 0):
raise SystemExit, "This program requires Python 2.4 or later."
if not (hasattr(sys, 'version_info') or
sys.version_info < (2, 5, 0, 'final', 0)):
raise SystemExit("This program requires Python 2.5 or later.")
from __future__ import with_statement
import os
import re
import platform
# releases supporting our special .bat files
# XXX what is platform.release() on Vista?
win_bat_releases = ['NT', 'XP', '2000', '2003Server']
# path retrieving functions
@ -92,20 +93,17 @@ def create_shortcuts ():
def fix_configdata ():
"""
Fix install and config paths in the config file.
"""
"""Fix install and config paths in the config file."""
name = "_linkchecker_configdata.py"
conffile = os.path.join(sys.prefix, "Lib", "site-packages", name)
lines = []
for line in file(conffile):
if line.startswith("install_") or line.startswith("config_"):
if line.startswith(("install_", "config_")):
lines.append(fix_install_path(line))
else:
lines.append(line)
f = file(conffile, "w")
f.write("".join(lines))
f.close()
with file(conffile, "w") as f:
f.write("".join(lines))
# Windows install path scheme for python >= 2.3.
# Snatched from PC/bdist_wininst/install.c.
@ -122,10 +120,8 @@ win_path_scheme = {
}
def fix_install_path (line):
"""
Replace placeholders written by bdist_wininst with those specified
in windows install path scheme.
"""
"""Replace placeholders written by bdist_wininst with those specified
in windows install path scheme."""
key, eq, val = line.split()
# unescape string (do not use eval())
val = val[1:-1].replace("\\\\", "\\")

View file

@ -125,7 +125,7 @@ class HtmlPrettyPrinter (object):
"""
tag = tag.encode(self.encoding, "ignore")
self.fd.write("<%s" % tag.replace("/", ""))
for key, val in attrs.iteritems():
for key, val in attrs.items():
key = key.encode(self.encoding, "ignore")
if val is None:
self.fd.write(" %s" % key)

View file

@ -22,20 +22,13 @@
#include "Python.h"
/* require Python >= 2.4 */
/* require Python >= 2.5 */
#ifndef PY_VERSION_HEX
#error please install Python >= 2.4
#error please install Python >= 2.5
#endif
#if PY_VERSION_HEX < 0x02040000
#error please install Python >= 2.4
#endif
/* See http://www.python.org/dev/peps/pep-0353/#conversion-guidelines */
#if PY_VERSION_HEX < 0x02050000 && !defined(PY_SSIZE_T_MIN)
typedef int Py_ssize_t;
#define PY_SSIZE_T_MAX INT_MAX
#define PY_SSIZE_T_MIN INT_MIN
#if PY_VERSION_HEX < 0x02050000
#error please install Python >= 2.5
#endif
/* user_data type for SAX calls */

View file

@ -5,29 +5,6 @@
*/
#include "Python.h"
#ifndef Py_MEMCPY
/* Py_MEMCPY can be used instead of memcpy in cases where the copied blocks
* are often very short. While most platforms have highly optimized code for
* large transfers, the setup costs for memcpy are often quite high. MEMCPY
* solves this by doing short copies "in line".
*/
#if defined(_MSC_VER)
#define Py_MEMCPY(target, source, length) do { \
size_t i_, n_ = (length); \
char *t_ = (void*) (target); \
const char *s_ = (void*) (source); \
if (n_ >= 16) \
memcpy(t_, s_, n_); \
else \
for (i_ = 0; i_ < n_; i_++) \
t_[i_] = s_[i_]; \
} while (0)
#else
#define Py_MEMCPY memcpy
#endif
#endif
#if !defined(HAVE_STRLCPY)
/**
* strlcpy - Copy a %NUL terminated string into a sized buffer

View file

@ -20,13 +20,13 @@ Main function module for link checking.
# imports and checks
import sys
if not hasattr(sys, 'version_info') or \
sys.version_info < (2, 4, 0, 'final', 0):
raise SystemExit("This program requires Python 2.4 or later.")
if not (hasattr(sys, 'version_info') or
sys.version_info < (2, 5, 0, 'final', 0)):
raise SystemExit("This program requires Python 2.5 or later.")
import os
import re
import i18n
from . import i18n
import _linkchecker_configdata as configdata
# application log areas
@ -45,33 +45,27 @@ lognames = {
"dns": LOG_DNS,
"thread": LOG_THREAD,
"all": LOG,
}
lognamelist = ", ".join(["%r"%name for name in lognames.iterkeys()])
}
lognamelist = ", ".join(repr(name) for name in lognames)
import log
from . import log
class LinkCheckerError (StandardError):
"""
Exception to be raised on linkchecker-specific check errors.
"""
"""Exception to be raised on linkchecker-specific check errors."""
pass
def add_intern_pattern (url_data, config):
"""
Add intern URL regex to config.
"""
"""Add intern URL regex to config."""
pat = url_data.get_intern_pattern()
if pat:
assert None == log.debug(LOG_CHECK,
"Add intern pattern %r", pat)
log.debug(LOG_CHECK, "Add intern pattern %r", pat)
config['internlinks'].append(get_link_pat(pat))
def get_link_pat (arg, strict=False):
"""
Get a link pattern matcher for intern/extern links.
"""Get a link pattern matcher for intern/extern links.
Returns a compiled pattern and a negate and strict option.
@param arg: pattern from config
@ -81,7 +75,7 @@ def get_link_pat (arg, strict=False):
@return: dictionary with keys 'pattern', 'negate' and 'strict'
@rtype: dict
"""
assert None == log.debug(LOG_CHECK, "Link pattern %r", arg)
log.debug(LOG_CHECK, "Link pattern %r", arg)
if arg.startswith('!'):
pattern = arg[1:]
negate = True
@ -96,38 +90,37 @@ def get_link_pat (arg, strict=False):
# note: don't confuse URL loggers with application logs above
import logger.text
import logger.html
import logger.gml
import logger.dot
import logger.sql
import logger.csvlog
import logger.blacklist
import logger.gxml
import logger.customxml
import logger.none
from .logger.text import TextLogger
from .logger.html import HtmlLogger
from .logger.gml import GMLLogger
from .logger.dot import DOTLogger
from .logger.sql import SQLLogger
from .logger.csvlog import CSVLogger
from .logger.blacklist import BlacklistLogger
from .logger.gxml import GraphXMLLogger
from .logger.customxml import CustomXMLLogger
from .logger.none import NoneLogger
# default link logger classes
Loggers = {
"text": logger.text.TextLogger,
"html": logger.html.HtmlLogger,
"gml": logger.gml.GMLLogger,
"dot": logger.dot.DOTLogger,
"sql": logger.sql.SQLLogger,
"csv": logger.csvlog.CSVLogger,
"blacklist": logger.blacklist.BlacklistLogger,
"gxml": logger.gxml.GraphXMLLogger,
"xml": logger.customxml.CustomXMLLogger,
"none": logger.none.NoneLogger,
"text": TextLogger,
"html": HtmlLogger,
"gml": GMLLogger,
"dot": DOTLogger,
"sql": SQLLogger,
"csv": CSVLogger,
"blacklist": BlacklistLogger,
"gxml": GraphXMLLogger,
"xml": CustomXMLLogger,
"none": NoneLogger,
}
# for easy printing: a comma separated logger list
LoggerKeys = ", ".join(["%r" % name for name in Loggers.iterkeys()])
LoggerKeys = ", ".join(repr(name) for name in Loggers)
def init_i18n ():
"""
Initialize i18n with the configured locale dir. The environment
"""Initialize i18n with the configured locale dir. The environment
variable LOCPATH can also specify a locale dir.
@return: None

View file

@ -162,9 +162,7 @@ AnsiReset = esc_ansicolor(default)
def has_colors (fp):
"""
Test if given file is an ANSI color enabled tty.
"""
"""Test if given file is an ANSI color enabled tty."""
# The isatty() function ensures that we do not colorize
# redirected streams, as this is almost never what we want
if not (hasattr(fp, "isatty") and fp.isatty()):
@ -188,9 +186,7 @@ def has_colors (fp):
def _write_color_nt (fp, text, color):
"""
Assumes WConio has been imported at module level.
"""
"""Assumes WConio has been imported at module level."""
oldcolor = WConio.gettextinfo()[4]
oldtextcolor = oldcolor & 0x000F
if ";" in color:
@ -201,9 +197,7 @@ def _write_color_nt (fp, text, color):
def _write_color_ansi (fp, text, color):
"""
Colorize text with given color.
"""
"""Colorize text with given color."""
fp.write('%s%s%s' % (esc_ansicolor(color), text, AnsiReset))
@ -234,14 +228,10 @@ else:
class Colorizer (object):
"""
Prints colored messages to streams.
"""
"""Prints colored messages to streams."""
def __init__ (self, fp):
"""
Initialize with given stream (file-like object).
"""
"""Initialize with given stream (file-like object)."""
super(Colorizer, self).__init__()
self.fp = fp
if has_colors(fp):
@ -250,24 +240,18 @@ class Colorizer (object):
self.write = self._write
def _write (self, text, color=None):
"""
Print text as-is.
"""
"""Print text as-is."""
self.fp.write(text)
def _write_color (self, text, color=None):
"""
Print text with given color. If color is None, print text as-is.
"""
"""Print text with given color. If color is None, print text as-is."""
if color is None:
self.fp.write(text)
else:
write_color(self.fp, text, color)
def __getattr__ (self, name):
"""
Delegate attribute access to the stored stream object.
"""
"""Delegate attribute access to the stored stream object."""
return getattr(self.fp, name)
@ -275,8 +259,7 @@ class ColoredStreamHandler (logging.StreamHandler, object):
"""Send colored log messages to streams (file-like objects)."""
def __init__ (self, strm=None):
"""
Log to given stream (a file-like object) or to stderr if
"""Log to given stream (a file-like object) or to stderr if
strm is None.
"""
super(ColoredStreamHandler, self).__init__(strm=strm)
@ -290,14 +273,12 @@ class ColoredStreamHandler (logging.StreamHandler, object):
}
def get_color (self, record):
"""
Get appropriate color according to log level.
"""Get appropriate color according to log level.
"""
return self.colors.get(record.levelno, 'default')
def emit (self, record):
"""
Emit a record.
"""Emit a record.
If a formatter is specified, it is used to format the record.
The record is then written to the stream with a trailing newline

View file

@ -20,7 +20,7 @@ Store and retrieve open connections.
import time
import linkcheck.lock
import linkcheck.log
from .. import log, LOG_CACHE
from linkcheck.decorators import synchronized
_lock = linkcheck.lock.get_lock("connection")
@ -74,7 +74,7 @@ class ConnectionPool (object):
due_time = self.times[host]
if due_time > t:
wait = due_time - t
assert None == linkcheck.log.debug(linkcheck.LOG_CACHE,
log.debug(LOG_CACHE,
"waiting for %.01f seconds on connection to %s", wait, host)
time.sleep(wait)
t = time.time()
@ -119,7 +119,7 @@ class ConnectionPool (object):
"""Remove expired connections from this pool."""
t = time.time()
to_delete = []
for key, conn_data in self.connections.iteritems():
for key, conn_data in self.connections.items():
if conn_data[1] == 'available' and t > conn_data[2]:
to_delete.append(key)
for key in to_delete:
@ -132,7 +132,7 @@ class ConnectionPool (object):
del self.connections[key]
try:
conn_data[1].close()
except:
except Exception:
# ignore close errors
pass

View file

@ -18,7 +18,7 @@
Store and retrieve cookies.
"""
from linkcheck.decorators import synchronized
import linkcheck.log
from .. import log, LOG_CACHE
import linkcheck.lock
import linkcheck.cookies
@ -46,7 +46,7 @@ class CookieJar (object):
c = linkcheck.cookies.NetscapeCookie(h, scheme, host, path)
jar.add(c)
except linkcheck.cookies.CookieError:
assert None == linkcheck.log.debug(linkcheck.LOG_CACHE,
log.debug(LOG_CACHE,
"Invalid cookie header for %s:%s%s: %r", scheme, host, path, h)
for h in headers.getallmatchingheaders("Set-Cookie2"):
# RFC 2965 cookie type
@ -54,7 +54,7 @@ class CookieJar (object):
c = linkcheck.cookies.Rfc2965Cookie(h, scheme, host, path)
jar.add(c)
except linkcheck.cookies.CookieError:
assert None == linkcheck.log.debug(linkcheck.LOG_CACHE,
log.debug(LOG_CACHE,
"Invalid cookie2 header for %s:%s%s: %r", scheme, host, path, h)
self.cache[host] = jar
return jar
@ -64,8 +64,7 @@ class CookieJar (object):
"""
Cookie cache getter function.
"""
assert None == linkcheck.log.debug(linkcheck.LOG_CACHE,
"Get cookies for host %r path %r", host, path)
log.debug(LOG_CACHE, "Get cookies for host %r path %r", host, path)
jar = self.cache.setdefault(host, set())
return [x for x in jar if x.check_expired() and \
x.is_valid_for(scheme, host, port, path)]

View file

@ -17,10 +17,11 @@
"""
Handle a queue of URLs to check.
"""
from __future__ import with_statement
import threading
import collections
from time import time as _time
import linkcheck.log
from .. import log, LOG_CACHE
class Timeout (StandardError):
@ -79,11 +80,8 @@ class UrlQueue (object):
return it. If no such url is available return None. The
url might be already cached.
"""
self.not_empty.acquire()
try:
with self.not_empty:
return self._get(timeout)
finally:
self.not_empty.release()
def _get (self, timeout):
if timeout is None:
@ -126,20 +124,16 @@ class UrlQueue (object):
is immediately available, else raise the Full exception ('timeout'
is ignored in that case).
"""
self.mutex.acquire()
try:
with self.mutex:
self._put(item)
self.not_empty.notify()
finally:
self.mutex.release()
def _put (self, url_data):
"""Put URL in queue, increase number of unfished tasks."""
if self.shutdown:
# don't accept more URLs
return
assert None == linkcheck.log.debug(linkcheck.LOG_CACHE,
"queueing %s", url_data)
log.debug(LOG_CACHE, "queueing %s", url_data)
key = url_data.cache_url_key
if key in self.checked:
# Put at beginning of queue to get consumed quickly.
@ -171,10 +165,8 @@ class UrlQueue (object):
Raises a ValueError if called more times than there were items
placed in the queue.
"""
self.all_tasks_done.acquire()
try:
assert None == linkcheck.log.debug(linkcheck.LOG_CACHE,
"task_done %s", url_data)
with self.all_tasks_done:
log.debug(LOG_CACHE, "task_done %s", url_data)
if url_data is not None:
key = url_data.cache_url_key
if key is not None and key not in self.checked:
@ -188,13 +180,10 @@ class UrlQueue (object):
raise ValueError('task_done() called too many times')
self.all_tasks_done.notifyAll()
self.unfinished_tasks = unfinished
finally:
self.all_tasks_done.release()
def _cache_url (self, key, url_data):
"""Put URL result data into cache."""
assert None == linkcheck.log.debug(linkcheck.LOG_CACHE,
"Caching %r", key)
log.debug(LOG_CACHE, "Caching %r", key)
assert key in self.in_progress, \
"%r not in %s" % (key, self.in_progress)
del self.in_progress[key]
@ -206,8 +195,7 @@ class UrlQueue (object):
for key in url_data.aliases:
if key in self.checked or key in self.in_progress:
continue
assert None == linkcheck.log.debug(linkcheck.LOG_CACHE,
"Caching alias %r", key)
log.debug(LOG_CACHE, "Caching alias %r", key)
self.checked[key] = data
def _sort (self):
@ -237,8 +225,7 @@ class UrlQueue (object):
When the count of unfinished tasks drops to zero, join() unblocks.
"""
self.all_tasks_done.acquire()
try:
with self.all_tasks_done:
if timeout is None:
while self.unfinished_tasks:
self.all_tasks_done.wait()
@ -251,13 +238,10 @@ class UrlQueue (object):
if remaining <= 0.0:
raise Timeout()
self.all_tasks_done.wait(remaining)
finally:
self.all_tasks_done.release()
def do_shutdown (self):
"""Shutdown the queue by not accepting any more URLs."""
self.mutex.acquire()
try:
with self.mutex:
unfinished = self.unfinished_tasks - len(self.queue)
self.queue.clear()
if unfinished <= 0:
@ -266,19 +250,14 @@ class UrlQueue (object):
self.all_tasks_done.notifyAll()
self.unfinished_tasks = unfinished
self.shutdown = True
finally:
self.mutex.release()
def status (self):
"""
Get tuple (finished tasks, in progress, queue size).
"""
self.mutex.acquire()
try:
with self.mutex:
return (self.finished_tasks,
len(self.in_progress), len(self.queue))
finally:
self.mutex.release()
def checked_redirect (self, redirect, url_data):
"""
@ -287,11 +266,8 @@ class UrlQueue (object):
If the redirect URL is found in the cache, the result data is
already copied.
"""
self.mutex.acquire()
try:
with self.mutex:
if redirect in self.checked:
url_data.copy_from_cache(self.checked[redirect])
return True
return False
finally:
self.mutex.release()

View file

@ -113,9 +113,7 @@ def get_urlclass_from (url):
klass = linkcheck.checker.gopherurl.GopherUrl
elif url.startswith("https:"):
klass = linkcheck.checker.httpsurl.HttpsUrl
elif url.startswith("nntp:") or \
url.startswith("news:") or \
url.startswith("snews:"):
elif url.startswith(("nntp:", "news:", "snews:")):
klass = linkcheck.checker.nntpurl.NntpUrl
elif linkcheck.checker.unknownurl.is_unknown_url(url):
# unknown url

View file

@ -26,7 +26,7 @@ import urllib
import urllib2
import urlbase
import linkcheck.log
from .. import log, LOG_CHECK
import linkcheck.checker
import linkcheck.fileutil
from const import WARN_FILE_MISSING_SLASH, WARN_FILE_SYSTEM_PATH, \
@ -68,8 +68,7 @@ def get_nt_filename (path):
for fname in os.listdir(head):
if fname.lower() == tail.lower():
return os.path.join(get_nt_filename(head), fname)
linkcheck.log.error(linkcheck.LOG_CHECK, "could not find %r in %r",
tail, head)
log.error(LOG_CHECK, "could not find %r in %r", tail, head)
return path
@ -234,12 +233,12 @@ class FileUrl (urlbase.UrlBase):
if self.is_directory():
return True
# guess by extension
for ro in PARSE_EXTENSIONS.itervalues():
for ro in PARSE_EXTENSIONS.values():
if ro.search(self.url):
return True
# try to read content (can fail, so catch error)
try:
for ro in PARSE_CONTENTS.itervalues():
for ro in PARSE_CONTENTS.values():
if ro.search(self.get_content()[:30]):
return True
except IOError:
@ -253,11 +252,11 @@ class FileUrl (urlbase.UrlBase):
if self.is_directory():
self.parse_html()
return
for key, ro in PARSE_EXTENSIONS.iteritems():
for key, ro in PARSE_EXTENSIONS.items():
if ro.search(self.url):
getattr(self, "parse_"+key)()
return
for key, ro in PARSE_CONTENTS.iteritems():
for key, ro in PARSE_CONTENTS.items():
if ro.search(self.get_content()[:30]):
getattr(self, "parse_"+key)()
return

View file

@ -23,6 +23,7 @@ import time
import urllib
import cStringIO as StringIO
from .. import log, LOG_CHECK
import linkcheck
import proxysupport
import httpurl
@ -97,7 +98,7 @@ class FtpUrl (internpaturl.InternPatternUrl, proxysupport.ProxySupport):
self.aggregate.connections.wait_for_host(host)
try:
self.url_connection = ftplib.FTP()
if linkcheck.log.is_debug(linkcheck.LOG_CHECK):
if log.is_debug(LOG_CHECK):
self.url_connection.set_debuglevel(1)
self.url_connection.connect(host)
if _user is None:
@ -136,8 +137,7 @@ class FtpUrl (internpaturl.InternPatternUrl, proxysupport.ProxySupport):
if not self.filename:
return
files = self.get_files()
assert None == linkcheck.log.debug(linkcheck.LOG_CHECK,
"FTP files %s", str(files))
log.debug(LOG_CHECK, "FTP files %s", str(files))
if self.filename in files:
# file found
return
@ -161,8 +161,7 @@ class FtpUrl (internpaturl.InternPatternUrl, proxysupport.ProxySupport):
"""
Parse list line and add the entry it points to to the file list.
"""
assert None == linkcheck.log.debug(linkcheck.LOG_CHECK,
"Directory entry %r", line)
log.debug(LOG_CHECK, "Directory entry %r", line)
try:
fpo = ftpparse.parse(line)
name = fpo.name
@ -171,8 +170,7 @@ class FtpUrl (internpaturl.InternPatternUrl, proxysupport.ProxySupport):
if fpo.trycwd or fpo.tryretr:
files.append(name)
except (ValueError, AttributeError), msg:
assert None == linkcheck.log.debug(linkcheck.LOG_CHECK,
"%s (%s)", str(msg), line)
log.debug(LOG_CHECK, "%s (%s)", str(msg), line)
self.url_connection.dir(add_entry)
return files
@ -194,7 +192,7 @@ class FtpUrl (internpaturl.InternPatternUrl, proxysupport.ProxySupport):
"""
if self.is_directory():
return True
for ro in PARSE_EXTENSIONS.itervalues():
for ro in PARSE_EXTENSIONS.values():
if ro.search(self.url):
return True
return False
@ -212,7 +210,7 @@ class FtpUrl (internpaturl.InternPatternUrl, proxysupport.ProxySupport):
if self.is_directory():
self.parse_html()
return
for key, ro in PARSE_EXTENSIONS.iteritems():
for key, ro in PARSE_EXTENSIONS.items():
if ro.search(self.url):
getattr(self, "parse_"+key)()

View file

@ -27,6 +27,7 @@ import socket
import cStringIO as StringIO
import Cookie
from .. import log, LOG_CHECK
import linkcheck.url
import linkcheck.strformat
import linkcheck.robotparser2
@ -197,8 +198,7 @@ class HttpUrl (internpaturl.InternPatternUrl, proxysupport.ProxySupport):
newurl = urlparse.urlunsplit(self.urlparts)
if self.url != newurl:
if self.warn_redirect:
linkcheck.log.warn(linkcheck.LOG_CHECK,
_("""URL %s has been redirected.
log.warn(LOG_CHECK, _("""URL %s has been redirected.
Use URL %s instead for checking."""), self.url, newurl)
self.url = newurl
# check response
@ -230,10 +230,9 @@ Use URL %s instead for checking."""), self.url, newurl)
raise
if response.reason:
response.reason = unicode_safe(response.reason)
assert None == linkcheck.log.debug(linkcheck.LOG_CHECK,
log.debug(LOG_CHECK,
"Response: %s %s", response.status, response.reason)
assert None == linkcheck.log.debug(linkcheck.LOG_CHECK,
"Headers: %s", self.headers)
log.debug(LOG_CHECK, "Headers: %s", self.headers)
# proxy enforcement (overrides standard proxy)
if response.status == 305 and self.headers:
oldproxy = (self.proxy, self.proxyauth)
@ -260,8 +259,7 @@ Use URL %s instead for checking."""), self.url, newurl)
continue
raise
if tries == -1:
assert None == linkcheck.log.debug(linkcheck.LOG_CHECK,
"already handled")
log.debug(LOG_CHECK, "already handled")
response.close()
return None
if tries >= self.max_redirects:
@ -281,7 +279,7 @@ Use URL %s instead for checking."""), self.url, newurl)
_user, _password = self.get_user_password()
self.auth = "Basic " + \
base64.encodestring("%s:%s" % (_user, _password))
assert None == linkcheck.log.debug(linkcheck.LOG_CHECK,
log.debug(LOG_CHECK,
"Authentication %s/%s", _user, _password)
continue
elif response.status >= 400:
@ -312,8 +310,7 @@ Use URL %s instead for checking."""), self.url, newurl)
"""
Follow all redirections of http response.
"""
assert None == linkcheck.log.debug(linkcheck.LOG_CHECK,
"follow all redirections")
log.debug(LOG_CHECK, "follow all redirections")
redirected = self.url
tries = 0
while response.status in [301, 302] and self.headers and \
@ -323,15 +320,13 @@ Use URL %s instead for checking."""), self.url, newurl)
# make new url absolute and unicode
newurl = urlparse.urljoin(redirected, newurl)
newurl = unicode_safe(newurl)
assert None == linkcheck.log.debug(linkcheck.LOG_CHECK,
"Redirected to %r", newurl)
log.debug(LOG_CHECK, "Redirected to %r", newurl)
self.add_info(_("Redirected to %(url)s.") % {'url': newurl})
# norm base url - can raise UnicodeError from url.idna_encode()
redirected, is_idn = linkcheck.checker.urlbase.url_norm(newurl)
if is_idn:
pass # XXX warn about idn use
assert None == linkcheck.log.debug(linkcheck.LOG_CHECK,
"Norm redirected to %r", redirected)
log.debug(LOG_CHECK, "Norm redirected to %r", redirected)
urlparts = linkcheck.strformat.url_unicode_split(redirected)
# check extern filter again
self.set_extern(redirected)
@ -458,8 +453,7 @@ Use URL %s instead for checking."""), self.url, newurl)
else:
host = self.urlparts[1]
scheme = self.urlparts[0]
assert None == linkcheck.log.debug(linkcheck.LOG_CHECK,
"Connecting to %r", host)
log.debug(LOG_CHECK, "Connecting to %r", host)
# close/release a previous connection
self.close_connection()
self.url_connection = self.get_http_object(host, scheme)
@ -538,8 +532,7 @@ Use URL %s instead for checking."""), self.url, newurl)
key = (scheme, self.urlparts[1], _user, _password)
conn = self.aggregate.connections.get(key)
if conn is not None:
assert None == linkcheck.log.debug(linkcheck.LOG_CHECK,
"reuse cached HTTP(S) connection %s", conn)
log.debug(LOG_CHECK, "reuse cached HTTP(S) connection %s", conn)
return conn
self.aggregate.connections.wait_for_host(host)
if scheme == "http":
@ -549,7 +542,7 @@ Use URL %s instead for checking."""), self.url, newurl)
else:
msg = _("Unsupported HTTP url scheme %r") % scheme
raise linkcheck.LinkCheckerError(msg)
if linkcheck.log.is_debug(linkcheck.LOG_CHECK):
if log.is_debug(LOG_CHECK):
h.set_debuglevel(1)
h.connect()
return h
@ -691,7 +684,7 @@ Use URL %s instead for checking."""), self.url, newurl)
else:
try:
self.url_connection.close()
except:
except Exception:
# ignore close errors
pass
self.url_connection = None

View file

@ -24,7 +24,7 @@ import smtplib
import email.Utils
import urlbase
import linkcheck.log
from .. import log, LOG_CHECK
import linkcheck.strformat
import linkcheck.dns.resolver
from const import WARN_MAIL_NO_ADDRESSES, WARN_MAIL_NO_MX_HOST, \
@ -75,8 +75,7 @@ class MailtoUrl (urlbase.UrlBase):
username, domain = _split_address(addr)
if not linkcheck.url.is_safe_domain(domain):
raise linkcheck.LinkCheckerError(_("Invalid mail syntax"))
assert None == linkcheck.log.debug(linkcheck.LOG_CHECK,
"addresses: %s", self.addresses)
log.debug(LOG_CHECK, "addresses: %s", self.addresses)
def cutout_addresses (self):
"""
@ -111,7 +110,7 @@ class MailtoUrl (urlbase.UrlBase):
if i < (len(url) - 1):
try:
headers = cgi.parse_qs(url[(i+1):], strict_parsing=True)
for key, val in headers.iteritems():
for key, val in headers.items():
self.headers.setdefault(key.lower(), []).extend(val)
except ValueError, err:
self.add_warning(_("Error parsing CGI values: %s") % str(err))
@ -148,14 +147,10 @@ class MailtoUrl (urlbase.UrlBase):
"""
Check a single mail address.
"""
assert None == linkcheck.log.debug(linkcheck.LOG_CHECK,
"checking mail address %r", mail)
assert None == linkcheck.log.debug(linkcheck.LOG_CHECK,
"splitting address")
log.debug(LOG_CHECK, "checking mail address %r", mail)
mail = linkcheck.strformat.ascii_safe(mail)
username, domain = _split_address(mail)
assert None == linkcheck.log.debug(linkcheck.LOG_CHECK,
"looking up MX mailhost %r", domain)
log.debug(LOG_CHECK, "looking up MX mailhost %r", domain)
try:
answers = linkcheck.dns.resolver.query(domain, 'MX')
except linkcheck.dns.resolver.NoAnswer:
@ -180,10 +175,9 @@ class MailtoUrl (urlbase.UrlBase):
# host should be preferred)
mxdata.sort()
# debug output
assert None == linkcheck.log.debug(linkcheck.LOG_CHECK,
"found %d MX mailhosts:", len(answers))
log.debug(LOG_CHECK, "found %d MX mailhosts:", len(answers))
for preference, host in mxdata:
assert None == linkcheck.log.debug(linkcheck.LOG_CHECK,
log.debug(LOG_CHECK,
"MX host %r, preference %d", host, preference)
# connect
self.check_smtp_connect(mxdata, username, domain)
@ -200,19 +194,17 @@ class MailtoUrl (urlbase.UrlBase):
smtpconnect = 0
for preference, host in mxdata:
try:
assert None == linkcheck.log.debug(linkcheck.LOG_CHECK,
log.debug(LOG_CHECK,
"SMTP check for %r (preference %d)", host, preference)
self.url_connection = smtplib.SMTP()
if linkcheck.log.is_debug(linkcheck.LOG_CHECK):
if log.is_debug(LOG_CHECK):
self.url_connection.set_debuglevel(1)
self.url_connection.connect(host)
assert None == linkcheck.log.debug(linkcheck.LOG_CHECK,
"SMTP connected!")
log.debug(LOG_CHECK, "SMTP connected!")
smtpconnect = 1
self.url_connection.helo()
info = self.url_connection.verify("%s@%s" % (username, domain))
assert None == linkcheck.log.debug(linkcheck.LOG_CHECK,
"SMTP user info %r", info)
log.debug(LOG_CHECK, "SMTP user info %r", info)
d = {'info': str(info[1])}
if info[0] == 250:
self.add_info(_("Verified address: %(info)s.") % d)

View file

@ -26,7 +26,6 @@ import random
import linkcheck
import urlbase
import linkcheck.log
from const import WARN_NNTP_NO_SERVER, WARN_NNTP_NO_NEWSGROUP, \
WARN_NNTP_BUSY

View file

@ -22,7 +22,7 @@ import telnetlib
import urllib
import urlbase
import linkcheck.log
from .. import log, LOG_CHECK
class TelnetUrl (urlbase.UrlBase):
@ -60,7 +60,7 @@ class TelnetUrl (urlbase.UrlBase):
label is "login: ", expected password label is "Password: ".
"""
self.url_connection = telnetlib.Telnet()
if linkcheck.log.is_debug(linkcheck.LOG_CHECK):
if log.is_debug(LOG_CHECK):
self.url_connection.set_debuglevel(1)
self.url_connection.open(self.host, self.port)
if self.user:

View file

@ -83,7 +83,7 @@ class TestLogger (linkcheck.logger.Logger):
for warning in url_data.warnings:
self.result.append(u"warning %s" % warning[1])
if self.has_part('result'):
self.result.append(url_data.valid and u"valid" or u"error")
self.result.append(u"valid" if url_data.valid else u"error")
# note: do not append url_data.result since this is
# platform dependent

View file

@ -37,7 +37,7 @@ import linkcheck.director
import linkcheck.director.status
import linkcheck.strformat
import linkcheck.containers
import linkcheck.log
from .. import log, LOG_CHECK, LOG_CACHE
import linkcheck.httplib2
import linkcheck.HtmlParser.htmlsax
from const import WARN_URL_EFFECTIVE_URL, WARN_URL_UNICODE_DOMAIN, \
@ -167,16 +167,14 @@ class UrlBase (object):
Set result string and validity.
"""
if self.has_result:
linkcheck.log.warn(linkcheck.LOG_CHECK,
log.warn(LOG_CHECK,
"Double result %r (previous %r) for %s", msg, self.result, self)
else:
self.has_result = True
if not isinstance(msg, unicode):
linkcheck.log.warn(linkcheck.LOG_CHECK,
"Non-unicode result for %s: %r", self, msg)
log.warn(LOG_CHECK, "Non-unicode result for %s: %r", self, msg)
elif not msg:
linkcheck.log.warn(linkcheck.LOG_CHECK,
"Empty result for %s", self)
log.warn(LOG_CHECK, "Empty result for %s", self)
self.result = msg
self.valid = valid
@ -260,8 +258,7 @@ class UrlBase (object):
# URLs with different anchors to have the same content
self.cache_content_key = urlparse.urlunsplit(self.urlparts[:4]+[u''])
assert isinstance(self.cache_content_key, unicode), self
assert None == linkcheck.log.debug(linkcheck.LOG_CACHE,
"Content cache key %r", self.cache_content_key)
log.debug(LOG_CACHE, "Content cache key %r", self.cache_content_key)
# construct cache key
if self.aggregate.config["anchorcaching"] and \
self.aggregate.config["anchors"]:
@ -273,8 +270,7 @@ class UrlBase (object):
# no anchor caching
self.cache_url_key = self.cache_content_key
assert isinstance(self.cache_url_key, unicode), self
assert None == linkcheck.log.debug(linkcheck.LOG_CACHE,
"URL cache key %r", self.cache_url_key)
log.debug(LOG_CACHE, "URL cache key %r", self.cache_url_key)
def check_syntax (self):
"""
@ -286,8 +282,7 @@ class UrlBase (object):
@return: True if syntax is correct, else False.
@rtype: bool
"""
assert None == linkcheck.log.debug(linkcheck.LOG_CHECK,
"checking syntax")
log.debug(LOG_CHECK, "checking syntax")
if self.base_url is None:
self.set_result(_("URL is missing"), valid=False)
return
@ -370,16 +365,15 @@ class UrlBase (object):
if self.aggregate.config["trace"]:
linkcheck.trace.trace_on()
try:
try:
self.local_check()
except (socket.error, select.error):
# on Unix, ctrl-c can raise
# error: (4, 'Interrupted system call')
etype, value = sys.exc_info()[:2]
if etype == errno.EINTR:
raise KeyboardInterrupt(value)
else:
raise
self.local_check()
except (socket.error, select.error):
# on Unix, ctrl-c can raise
# error: (4, 'Interrupted system call')
etype, value = sys.exc_info()[:2]
if etype == errno.EINTR:
raise KeyboardInterrupt(value)
else:
raise
finally:
# close/release possible open connection
self.close_connection()
@ -394,8 +388,7 @@ class UrlBase (object):
def local_check (self):
"""Local check function can be overridden in subclasses."""
assert None == linkcheck.log.debug(linkcheck.LOG_CHECK,
"Checking %s", self)
log.debug(LOG_CHECK, "Checking %s", self)
# start time for check
check_start = time.time()
self.set_extern(self.url)
@ -404,8 +397,7 @@ class UrlBase (object):
return
# check connection
assert None == linkcheck.log.debug(linkcheck.LOG_CHECK,
"checking connection")
log.debug(LOG_CHECK, "checking connection")
try:
self.check_connection()
self.add_country_info()
@ -424,8 +416,7 @@ class UrlBase (object):
# check content
warningregex = self.aggregate.config["warningregex"]
if warningregex and self.valid:
assert None == linkcheck.log.debug(linkcheck.LOG_CHECK,
"checking content")
log.debug(LOG_CHECK, "checking content")
try:
self.check_content(warningregex)
except tuple(linkcheck.checker.const.ExcList):
@ -458,7 +449,7 @@ class UrlBase (object):
return
try:
self.url_connection.close()
except:
except Exception:
# ignore close errors
pass
self.url_connection = None
@ -468,8 +459,7 @@ class UrlBase (object):
An exception occurred. Log it and set the cache flag.
"""
etype, value, tb = sys.exc_info()
assert None == linkcheck.log.debug(linkcheck.LOG_CHECK,
"exception %s", traceback.format_tb(tb))
log.debug(LOG_CHECK, "exception %s", traceback.format_tb(tb))
# note: etype must be the exact class, not a subclass
if (etype in linkcheck.checker.const.ExcNoCacheList) or \
(etype == socket.error and value[0]==errno.EBADF) or \
@ -494,36 +484,28 @@ class UrlBase (object):
"""
Return True iff we can recurse into the url's content.
"""
assert None == linkcheck.log.debug(linkcheck.LOG_CHECK,
"checking recursion of %r ...", self.url)
log.debug(LOG_CHECK, "checking recursion of %r ...", self.url)
# Test self.valid before self.is_parseable().
if not self.valid:
assert None == linkcheck.log.debug(linkcheck.LOG_CHECK,
"... no, invalid.")
log.debug(LOG_CHECK, "... no, invalid.")
return False
if not self.is_parseable():
assert None == linkcheck.log.debug(linkcheck.LOG_CHECK,
".. no, not parseable.")
log.debug(LOG_CHECK, "... no, not parseable.")
return False
if not self.can_get_content():
assert None == linkcheck.log.debug(linkcheck.LOG_CHECK,
"... no, cannot get content.")
log.debug(LOG_CHECK, "... no, cannot get content.")
return False
rec_level = self.aggregate.config["recursionlevel"]
if rec_level >= 0 and self.recursion_level >= rec_level:
assert None == linkcheck.log.debug(linkcheck.LOG_CHECK,
"... no, maximum recursion level reached.")
log.debug(LOG_CHECK, "... no, maximum recursion level reached.")
return False
if self.extern[0]:
assert None == linkcheck.log.debug(linkcheck.LOG_CHECK,
"... no, extern.")
log.debug(LOG_CHECK, "... no, extern.")
return False
if not self.content_allows_robots():
assert None == linkcheck.log.debug(linkcheck.LOG_CHECK,
"... no, robots.")
log.debug(LOG_CHECK, "... no, robots.")
return False
assert None == linkcheck.log.debug(linkcheck.LOG_CHECK,
"... yes, recursion.")
log.debug(LOG_CHECK, "... yes, recursion.")
return True
def content_allows_robots (self):
@ -556,8 +538,7 @@ class UrlBase (object):
self.can_get_content()):
# do not bother
return
assert None == linkcheck.log.debug(linkcheck.LOG_CHECK,
"checking anchor %r", self.anchor)
log.debug(LOG_CHECK, "checking anchor %r", self.anchor)
handler = linkcheck.linkparse.LinkFinder(self.get_content(),
tags={'a': [u'name'], None: [u'id']})
parser = linkcheck.HtmlParser.htmlsax.parser(handler)
@ -585,20 +566,17 @@ class UrlBase (object):
match = entry['pattern'].search(url)
if (entry['negate'] and not match) or \
(match and not entry['negate']):
assert None == linkcheck.log.debug(linkcheck.LOG_CHECK,
"Extern URL %r", url)
log.debug(LOG_CHECK, "Extern URL %r", url)
self.extern = (1, entry['strict'])
return
for entry in self.aggregate.config["internlinks"]:
match = entry['pattern'].search(url)
if (entry['negate'] and not match) or \
(match and not entry['negate']):
assert None == linkcheck.log.debug(linkcheck.LOG_CHECK,
"Intern URL %r", url)
log.debug(LOG_CHECK, "Intern URL %r", url)
self.extern = (0, 0)
return
assert None == linkcheck.log.debug(linkcheck.LOG_CHECK,
"Explicit extern URL %r", url)
log.debug(LOG_CHECK, "Explicit extern URL %r", url)
self.extern = (1, 0)
return
@ -649,8 +627,7 @@ class UrlBase (object):
try:
import tidy
except ImportError:
linkcheck.log.warn(linkcheck.LOG_CHECK,
_("warning: tidy module is not available; " \
log.warn(LOG_CHECK, _("warning: tidy module is not available; " \
"download from http://utidylib.berlios.de/"))
return
options = dict(output_html=0, show_warnings=1, quiet=True,
@ -660,11 +637,11 @@ class UrlBase (object):
errors = filter_tidy_errors(doc.errors)
for err in errors:
self.add_warning("HTMLTidy: %s" % err)
except:
except Exception:
# catch _all_ exceptions since we dont want third party module
# errors to propagate into this library
err = str(sys.exc_info()[1])
linkcheck.log.warn(linkcheck.LOG_CHECK,
log.warn(LOG_CHECK,
_("warning: tidy HTML parsing caused error: %s ") % err)
def check_css (self):
@ -672,7 +649,7 @@ class UrlBase (object):
try:
import cssutils
except ImportError:
linkcheck.log.warn(linkcheck.LOG_CHECK,
log.warn(LOG_CHECK,
_("warning: cssutils module is not available; " \
"download from http://cthedot.de/cssutils/"))
return
@ -687,12 +664,11 @@ class UrlBase (object):
cssparser.parseString(self.get_content(), href=self.url)
for record in handler.storage:
self.add_warning("cssutils: %s" % record.getMessage())
except:
raise
except Exception:
# catch _all_ exceptions since we dont want third party module
# errors to propagate into this library
err = str(sys.exc_info()[1])
linkcheck.log.warn(linkcheck.LOG_CHECK,
log.warn(LOG_CHECK,
_("warning: cssutils parsing caused error: %s ") % err)
def parse_url (self):
@ -717,8 +693,7 @@ class UrlBase (object):
Parse into HTML content and search for URLs to check.
Found URLs are added to the URL queue.
"""
assert None == linkcheck.log.debug(linkcheck.LOG_CHECK,
"Parsing HTML %s", self)
log.debug(LOG_CHECK, "Parsing HTML %s", self)
# construct parser object
handler = linkcheck.linkparse.LinkFinder(self.get_content())
parser = linkcheck.HtmlParser.htmlsax.parser(handler)
@ -744,8 +719,7 @@ class UrlBase (object):
"""
Parse an opera bookmark file.
"""
assert None == linkcheck.log.debug(linkcheck.LOG_CHECK,
"Parsing Opera bookmarks %s", self)
log.debug(LOG_CHECK, "Parsing Opera bookmarks %s", self)
name = ""
lineno = 0
for line in self.get_content().splitlines():
@ -767,8 +741,7 @@ class UrlBase (object):
Parse a text file with on url per line; comment and blank
lines are ignored.
"""
assert None == linkcheck.log.debug(linkcheck.LOG_CHECK,
"Parsing text %s", self)
log.debug(LOG_CHECK, "Parsing text %s", self)
lineno = 0
for line in self.get_content().splitlines():
lineno += 1
@ -784,8 +757,7 @@ class UrlBase (object):
"""
Parse a CSS file for url() patterns.
"""
assert None == linkcheck.log.debug(linkcheck.LOG_CHECK,
"Parsing CSS %s", self)
log.debug(LOG_CHECK, "Parsing CSS %s", self)
lineno = 0
linkfinder = linkcheck.linkparse.css_url_re.finditer
strip_comments = linkcheck.linkparse.strip_c_comments

View file

@ -23,7 +23,7 @@ import os
import logging.config
import urllib
import _linkchecker_configdata
import linkcheck.log
from .. import log, LOG_CHECK, LOG
import linkcheck.containers
import confparse
@ -162,7 +162,7 @@ class Configuration (dict):
logging.config.fileConfig(filename)
handler = linkcheck.ansicolor.ColoredStreamHandler(strm=sys.stderr)
handler.setFormatter(logging.Formatter("%(levelname)s %(message)s"))
logging.getLogger(linkcheck.LOG).addHandler(handler)
logging.getLogger(LOG).addHandler(handler)
self.set_debug(debug)
def set_debug (self, debug):
@ -218,8 +218,7 @@ class Configuration (dict):
cfiles.append(path)
# weed out invalid files
cfiles = [f for f in cfiles if os.path.isfile(f)]
assert None == linkcheck.log.debug(linkcheck.LOG_CHECK,
"reading configuration from %s", cfiles)
log.debug(LOG_CHECK, "reading configuration from %s", cfiles)
confparse.LCConfigParser(self).read(cfiles)
self.sanitize()

View file

@ -18,7 +18,8 @@
import ConfigParser
import re
import linkcheck.log
import linkcheck
from .. import log, LOG_CHECK
def read_multiline (value):
@ -52,7 +53,7 @@ class LCConfigParser (ConfigParser.RawConfigParser, object):
self.read_authentication_config()
self.read_filtering_config()
except Exception, msg:
raise linkcheck.LinkCheckerError(linkcheck.LOG_CHECK,
raise linkcheck.LinkCheckerError(
"Error parsing configuration: %s", str(msg))
def read_output_config (self):
@ -106,7 +107,7 @@ class LCConfigParser (ConfigParser.RawConfigParser, object):
if self.has_option(section, "timeout"):
num = self.getint(section, "timeout")
if num < 0:
raise linkcheck.LinkCheckerError(linkcheck.LOG_CHECK,
raise linkcheck.LinkCheckerError(
_("invalid negative value for timeout: %d\n"), num)
self.config['timeout'] = num
if self.has_option(section, "anchors"):
@ -136,7 +137,7 @@ class LCConfigParser (ConfigParser.RawConfigParser, object):
try:
import tidy
except ImportError:
linkcheck.log.warn(linkcheck.LOG_CHECK,
log.warn(LOG_CHECK,
_("warning: tidy module is not available; " \
"download from http://utidylib.berlios.de/"))
val = False
@ -147,7 +148,7 @@ class LCConfigParser (ConfigParser.RawConfigParser, object):
try:
import cssutils
except ImportError:
linkcheck.log.warn(linkcheck.LOG_CHECK,
log.warn(LOG_CHECK,
_("warning: cssutils module is not available; " \
"download from http://cthedot.de/cssutils/"))
val = False
@ -160,7 +161,7 @@ class LCConfigParser (ConfigParser.RawConfigParser, object):
for val in read_multiline(self.get(section, "entry")):
auth = val.split()
if len(auth) != 3:
raise linkcheck.LinkCheckerError(linkcheck.LOG_CHECK,
raise linkcheck.LinkCheckerError(LOG_CHECK,
_("missing auth part in entry %(val)r") % \
{"val": val})
self.config["authentication"].insert(0,
@ -175,11 +176,11 @@ class LCConfigParser (ConfigParser.RawConfigParser, object):
break
val = self.get(section, key)
auth = val.split()
linkcheck.log.warn(linkcheck.LOG_CHECK,
log.warn(LOG_CHECK,
_("the entry%(num)d syntax is deprecated; use " \
"the new multiline configuration syntax") % {"num": i})
if len(auth) != 3:
raise linkcheck.LinkCheckerError(linkcheck.LOG_CHECK,
raise linkcheck.LinkCheckerError(LOG_CHECK,
_("missing auth part in entry %(val)r") % \
{"val": val})
self.config["authentication"].insert(0,
@ -204,7 +205,7 @@ class LCConfigParser (ConfigParser.RawConfigParser, object):
if not self.has_option(section, key):
break
val = self.get(section, key)
linkcheck.log.warn(linkcheck.LOG_CHECK,
log.warn(LOG_CHECK,
_("the nofollow%(num)d syntax is deprecated; use " \
"the new multiline configuration syntax") % {"num": i})
pat = linkcheck.get_link_pat(val, strict=0)
@ -219,7 +220,7 @@ class LCConfigParser (ConfigParser.RawConfigParser, object):
key = "noproxyfor%d" % i
if not self.has_option(section, key):
break
linkcheck.log.warn(linkcheck.LOG_CHECK,
log.warn(LOG_CHECK,
_("the noproxyfor%(num)d syntax is deprecated; use " \
"the new multiline configuration syntax") % {"num": i})
val = self.get(section, key)
@ -240,7 +241,7 @@ class LCConfigParser (ConfigParser.RawConfigParser, object):
break
# backwards compatibility: split and ignore second part
val = self.get(section, key).split()[0]
linkcheck.log.warn(linkcheck.LOG_CHECK,
log.warn(LOG_CHECK,
_("the ignore%(num)d syntax is deprecated; use " \
"the new multiline configuration syntax") % {"num": i})
pat = linkcheck.get_link_pat(val, strict=1)

View file

@ -56,8 +56,7 @@ class SetList (list):
class ListDict (dict):
"""
A dictionary whose iterators reflect the order in which elements
"""A dictionary whose iterators reflect the order in which elements
were added.
"""
@ -128,8 +127,7 @@ class ListDict (dict):
super(ListDict, self).clear()
def get_true (self, key, default):
"""
Return default element if key is not in the dict, or if self[key]
"""Return default element if key is not in the dict, or if self[key]
evaluates to False. Useful for example if value is None, but
default value should be an empty string.
"""

View file

@ -27,19 +27,18 @@ And a cookie storage class is provided.
[2] http://www.faqs.org/rfcs/rfc2109.html
"""
from __future__ import with_statement
import time
import re
import Cookie
import cookielib
import cStringIO as StringIO
import rfc822
import strformat
from . import strformat
class CookieError (StandardError):
"""
Thrown for invalid cookie syntax or conflicting/impossible values.
"""
"""Thrown for invalid cookie syntax or conflicting/impossible values."""
pass
@ -66,11 +65,9 @@ CookiePattern = re.compile(r"""
class HttpCookie (object):
"""
A cookie consists of one name-value pair with attributes.
"""A cookie consists of one name-value pair with attributes.
Each attribute consists of a predefined name (see attribute_names)
and a value (which is optional for some attributes).
"""
and a value (which is optional for some attributes)."""
# A mapping from the lowercase variant on the left to the
# appropriate traditional formatting on the right.
@ -132,10 +129,8 @@ class HttpCookie (object):
self.name, self.value, attrs)
def is_valid_for (self, scheme, host, port, path):
"""
Check validity of this cookie against the desired scheme,
host and path.
"""
"""Check validity of this cookie against the desired scheme,
host and path."""
if self.check_expired() and \
self.check_domain(host) and \
self.check_port(port) and \
@ -274,8 +269,8 @@ class HttpCookie (object):
def server_header_value (self):
parts = ["%s=%s" % (self.name, quote(self.value))]
parts += ["%s=%s"% (self.attribute_names[k], self.quote(k, v)) \
for k, v in self.attributes.iteritems()]
parts.extend(["%s=%s"% (self.attribute_names[k], self.quote(k, v)) \
for k, v in self.attributes.items()])
return "; ".join(parts)
def client_header_value (self):
@ -283,15 +278,13 @@ class HttpCookie (object):
if "version" in self.attributes:
parts.append("$Version=%s" % quote(self.attributes["version"]))
parts.append("%s=%s" % (self.name, quote(self.value)))
parts += ["$%s=%s"% (self.attribute_names[k], self.quote(k, v)) \
for k, v in self.attributes.iteritems() if k != "version"]
parts.extend(["$%s=%s"% (self.attribute_names[k], self.quote(k, v)) \
for k, v in self.attributes.items() if k != "version"])
return "; ".join(parts)
class NetscapeCookie (HttpCookie):
"""
Parses RFC 2109 (Netscape) cookies.
"""
"""Parses RFC 2109 (Netscape) cookies."""
def __init__ (self, text, scheme, host, path):
self.parse(text)
@ -327,14 +320,12 @@ class Rfc2965Cookie (HttpCookie):
def from_file (filename):
"""
Parse cookie data from a text file in HTTP header format.
"""Parse cookie data from a text file in HTTP header format.
@return: list of tuples (headers, scheme, host, path)
"""
entries = []
fd = open(filename)
try:
with open(filename) as fd:
lines = []
for line in fd.readlines():
line = line.rstrip()
@ -347,13 +338,10 @@ def from_file (filename):
if lines:
entries.append(from_headers("\r\n".join(lines)))
return entries
finally:
fd.close()
def from_headers (strheader):
"""
Parse cookie data from a string in HTTP header (RFC 822) format.
"""Parse cookie data from a string in HTTP header (RFC 822) format.
@return: tuple (headers, scheme, host, path)
@raises: ValueError for incomplete or invalid data

View file

@ -35,6 +35,7 @@ def h ():
pass
"""
from __future__ import with_statement
import warnings
import signal
import os
@ -43,8 +44,7 @@ import time
def update_func_meta (fake_func, real_func):
"""
Set meta information (eg. __doc__) of fake function to that
"""Set meta information (eg. __doc__) of fake function to that
of the real function.
@return fake_func
"""
@ -56,14 +56,10 @@ def update_func_meta (fake_func, real_func):
def deprecated (func):
"""
A decorator which can be used to mark functions as deprecated.
It emits a warning when the function is called.
"""
"""A decorator which can be used to mark functions as deprecated.
It emits a warning when the function is called."""
def newfunc (*args, **kwargs):
"""
Print deprecated warning and execute original function.
"""
"""Print deprecated warning and execute original function."""
warnings.warn("Call to deprecated function %s." % func.__name__,
category=DeprecationWarning)
return func(*args, **kwargs)
@ -71,8 +67,7 @@ def deprecated (func):
def signal_handler (signal_number):
"""
From http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/410666
"""From http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/410666
A decorator to set the specified function as handler for a signal.
This function is the 'outer' decorator, called with only the
@ -82,9 +77,7 @@ def signal_handler (signal_number):
"""
# create the 'real' decorator which takes only a function as an argument
def newfunc (function):
"""
Register function as signal handler.
"""
"""Register function as signal handler."""
# note: actually the kill(2) function uses the signal number of 0
# for a special case, but for signal(2) only positive integers
# are allowed
@ -96,49 +89,32 @@ def signal_handler (signal_number):
def synchronize (lock, func):
"""
Return synchronized function acquiring the given lock.
"""
"""Return synchronized function acquiring the given lock."""
def newfunc (*args, **kwargs):
"""
Execute function synchronized.
"""
lock.acquire()
try:
"""Execute function synchronized."""
with lock:
return func(*args, **kwargs)
finally:
lock.release()
return update_func_meta(newfunc, func)
def synchronized (lock):
"""
A decorator calling a function with aqcuired lock.
"""
"""A decorator calling a function with aqcuired lock."""
return lambda func: synchronize(lock, func)
def notimplemented (func):
"""
Raises a NotImplementedError if the function is called.
"""Raises a NotImplementedError if the function is called."""
def newfunc (*args, **kwargs):
"""
def newfunc (*args, **kwargs):
"""
Raise NotImplementedError
"""
"""Raise NotImplementedError"""
raise NotImplementedError("%s not implemented" % func.__name__)
return update_func_meta(newfunc, func)
def timeit (func, log, limit):
"""
Print execution time of the function. For quick'n'dirty profiling.
"""
"""Print execution time of the function. For quick'n'dirty profiling."""
def newfunc (*args, **kwargs):
"""
Execute function and print execution time.
"""
"""Execute function and print execution time."""
t = time.time()
res = func(*args, **kwargs)
duration = time.time() - t
@ -155,11 +131,10 @@ def timed (log=sys.stderr, limit=2.0):
class memoized (object):
"""
Decorator that caches a function's return value each time it is called.
"""Decorator that caches a function's return value each time it is called.
If called later with the same arguments, the cached value is returned, and
not re-evaluated.
"""
not re-evaluated."""
def __init__(self, func):
self.func = func
self.cache = {}
@ -181,11 +156,9 @@ class memoized (object):
class curried (object):
"""
Decorator that returns a function that keeps returning functions
"""Decorator that returns a function that keeps returning functions
until all arguments are supplied; then the original function is
evaluated.
"""
evaluated."""
def __init__(self, func, *a):
self.func = func
self.args = a

View file

@ -18,8 +18,9 @@
Management of checking a queue of links with several threads.
"""
import time
import os
import thread
import linkcheck.log
from .. import log, LOG_CHECK
import linkcheck.cache.urlqueue
import linkcheck.cache.robots_txt
import linkcheck.cache.cookie
@ -43,11 +44,11 @@ def check_urls (aggregate):
except KeyboardInterrupt:
interrupt(aggregate)
except thread.error:
linkcheck.log.warn(linkcheck.LOG_CHECK,
log.warn(LOG_CHECK,
_("Could not start a new thread. Check that the current user" \
" is allowed to start new threads."))
abort(aggregate)
except:
except Exception:
console.internal_error()
abort(aggregate)
@ -72,9 +73,9 @@ def interrupt (aggregate):
interrupts."""
while True:
try:
linkcheck.log.warn(linkcheck.LOG_CHECK,
log.warn(LOG_CHECK,
_("keyboard interrupt; waiting for active threads to finish"))
linkcheck.log.warn(linkcheck.LOG_CHECK,
log.warn(LOG_CHECK,
_("another keyboard interrupt will exit immediately"))
print_active_threads(aggregate)
abort(aggregate)
@ -86,11 +87,11 @@ def interrupt (aggregate):
def print_active_threads (aggregate):
if not aggregate.threads:
return
linkcheck.log.info(linkcheck.LOG_CHECK, _("These URLs are still active:"))
log.info(LOG_CHECK, _("These URLs are still active:"))
for t in aggregate.threads:
name = t.getName()
if name.startswith("Check-"):
linkcheck.log.info(linkcheck.LOG_CHECK, name[6:])
log.info(LOG_CHECK, name[6:])
def abort (aggregate):
@ -102,20 +103,9 @@ def abort (aggregate):
aggregate.logger.end_log_output()
break
except KeyboardInterrupt:
linkcheck.log.warn(linkcheck.LOG_CHECK, _("keyboard interrupt; force shutdown"))
force_shutdown()
def force_shutdown ():
"""Force shutdown, not finishing anything."""
import os
if os.name == "posix":
# POSIX systems seem to do fine with sys.exit()
import sys
sys.exit(1)
else:
# forced exit without cleanup
os._exit(1)
log.warn(LOG_CHECK, _("keyboard interrupt; force shutdown"))
# forced exit without cleanup
os._exit(1)
def get_aggregate (config):

View file

@ -17,7 +17,7 @@
"""
Aggregate needed object instances for checker threads.
"""
import linkcheck.log
from .. import log, LOG_CHECK
import linkcheck.director
import logger
import status
@ -61,7 +61,7 @@ class Aggregate (object):
try:
self.urlqueue.join(timeout=self.config["timeout"])
except linkcheck.cache.urlqueue.Timeout:
linkcheck.log.warn(linkcheck.LOG_CHECK, "Abort timed out")
log.warn(LOG_CHECK, "Abort timed out")
def remove_stopped_threads (self):
"Remove the stopped threads from the internal thread list."""
@ -74,5 +74,5 @@ class Aggregate (object):
t.stop()
t.join(2)
if t.isAlive():
linkcheck.log.warn(linkcheck.LOG_CHECK, "Thread %s still active", t)
log.warn(LOG_CHECK, "Thread %s still active", t)
self.connections.clear()

View file

@ -16,7 +16,7 @@
# Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
import thread
import linkcheck.decorators
import linkcheck.log
from .. import log, LOG_CHECK
import linkcheck.threader
import console
@ -29,10 +29,9 @@ class CheckedTask (linkcheck.threader.StoppableThread):
try:
self.run_checked()
except KeyboardInterrupt:
linkcheck.log.warn(linkcheck.LOG_CHECK,
"interrupt did not reach the main thread")
log.warn(LOG_CHECK, "interrupt did not reach the main thread")
thread.interrupt_main()
except:
except Exception:
console.internal_error()
@linkcheck.decorators.notimplemented

View file

@ -7,7 +7,7 @@ import errno
import array
import fcntl
import struct
import linkcheck.log
from .. import log, LOG_DNS
class IfConfig (object):
@ -49,7 +49,7 @@ class IfConfig (object):
try:
result = self._ioctl(func, ifreq)
except IOError, msg:
linkcheck.log.warn(linkcheck.LOG_DNS,
log.warn(LOG_DNS,
"error getting addr for interface %r: %s", ifname, msg)
return None
return socket.inet_ntoa(result[20:24])
@ -97,7 +97,7 @@ class IfConfig (object):
try:
result = self._ioctl(self.SIOCGIFFLAGS, ifreq)
except IOError, msg:
linkcheck.log.warn(linkcheck.LOG_DNS,
log.warn(LOG_DNS,
"error getting flags for interface %r: %s", ifname, msg)
return 0
# extract the interface's flags from the return value

View file

@ -25,7 +25,7 @@ import sys
import time
import encodings.idna
import linkcheck.log
from .. import log, LOG_DNS
import linkcheck.dns.exception
import linkcheck.dns.message
import linkcheck.dns.name
@ -727,8 +727,7 @@ def query(qname, rdtype=linkcheck.dns.rdatatype.A, rdclass=linkcheck.dns.rdatacl
object to make the query.
@see: L{linkcheck.dns.resolver.Resolver.query} for more information on the
parameters."""
assert None == linkcheck.log.debug(linkcheck.LOG_DNS,
"Query %s %s %s", qname, rdtype, rdclass)
log.debug(LOG_DNS, "Query %s %s %s", qname, rdtype, rdclass)
if resolver is None:
resolver = get_default_resolver()
return resolver.query(qname, rdtype, rdclass, tcp)

View file

@ -14,6 +14,7 @@
# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
from __future__ import with_statement
import cStringIO as StringIO
import os
import unittest
@ -108,12 +109,9 @@ ns2 1w1D1h1m1S a 10.0.0.2
def get_file_lines (fn):
res = []
fd = open(fn)
try:
with open(fn) as fd:
for line in fd:
res.append(line)
finally:
fd.close()
return res

View file

@ -127,22 +127,22 @@ class Zone(object):
del self.nodes[key]
def __iter__(self):
return self.nodes.iterkeys()
return self.nodes.keys()
def iterkeys(self):
return self.nodes.iterkeys()
return self.nodes.keys()
def keys(self):
return self.nodes.keys()
def itervalues(self):
return self.nodes.itervalues()
return self.nodes.values()
def values(self):
return self.nodes.values()
def iteritems(self):
return self.nodes.iteritems()
return self.nodes.items()
def items(self):
return self.nodes.items()
@ -419,7 +419,7 @@ class Zone(object):
rdtype = linkcheck.dns.rdatatype.from_text(rdtype)
if isinstance(covers, str):
covers = linkcheck.dns.rdatatype.from_text(covers)
for (name, node) in self.iteritems():
for (name, node) in self.items():
for rds in node:
if rdtype == linkcheck.dns.rdatatype.ANY or \
(rds.rdtype == rdtype and rds.covers == covers):
@ -442,7 +442,7 @@ class Zone(object):
rdtype = linkcheck.dns.rdatatype.from_text(rdtype)
if isinstance(covers, str):
covers = linkcheck.dns.rdatatype.from_text(covers)
for (name, node) in self.iteritems():
for (name, node) in self.items():
for rds in node:
if rdtype == linkcheck.dns.rdatatype.ANY or \
(rds.rdtype == rdtype and rds.covers == covers):
@ -486,7 +486,7 @@ class Zone(object):
names = self.keys()
names.sort()
else:
names = self.iterkeys()
names = self.keys()
for n in names:
l = self[n].to_text(n, origin=self.origin,
relativize=relativize)

View file

@ -19,9 +19,7 @@ Dummy objects.
"""
class Dummy (object):
"""
A dummy object ignores all access to it. Useful for testing.
"""
"""A dummy object ignores all access to it. Useful for testing."""
def __init__ (self, *args, **kwargs):
pass
@ -64,7 +62,5 @@ class Dummy (object):
def dummy (*args, **kwargs):
"""
Ignore any positional or keyword arguments, return None.
"""
"""Ignore any positional or keyword arguments, return None."""
pass

View file

@ -88,32 +88,24 @@ error = 'fcgi.error'
# anywhere at the moment
def _error (msg):
"""
Append a string to /tmp/err.
"""
"""Append a string to /tmp/err."""
errf = file('/tmp/err', 'a+')
errf.write(msg+'\n')
errf.close()
class Record (object):
"""
Class representing FastCGI records.
"""
"""Class representing FastCGI records."""
def __init__ (self):
"""
Initialize record data.
"""
"""Initialize record data."""
self.version = FCGI_VERSION_1
self.rec_type = FCGI_UNKNOWN_TYPE
self.req_id = FCGI_NULL_REQUEST_ID
self.content = ""
def read_record (self, sock):
"""
Read a FastCGI record from socket.
"""
"""Read a FastCGI record from socket."""
s = [ord(x) for x in sock.recv(8)]
self.version, self.rec_type, padding_length = s[0], s[1], s[6]
self.req_id, content_length = (s[2]<<8)+s[3], (s[4]<<8)+s[5]
@ -145,9 +137,7 @@ class Record (object):
self.protocolStatus = ord(c[4])
def write_record (self, sock):
"""
Write a FastCGI request to socket.
"""
"""Write a FastCGI request to socket."""
content = self.content
if self.rec_type == FCGI_BEGIN_REQUEST:
content = chr(self.role>>8) + chr(self.role & 255) + \
@ -178,7 +168,7 @@ class Record (object):
c_len & 255,
pad_len,
0]
hdr = ''.join([chr(x) for x in hdr])
hdr = ''.join(chr(x) for x in hdr)
sock.send(hdr + content + pad_len*'\000')
@ -231,79 +221,59 @@ def HandleManTypes (r, conn):
class FastCGIWriter (object):
"""
File-like object writing FastCGI requests. All read operations
return empty data.
"""
"""File-like object writing FastCGI requests. All read operations
return empty data."""
def __init__ (self, rec, conn):
"""
Initialize with given record and connection.
"""
"""Initialize with given record and connection."""
self.record = rec
self.conn = conn
self.closed = False
def close (self):
"""
Close this writer.
"""
"""Close this writer."""
if not self.closed:
self.closed = True
self.record.content = ""
self.record.write_record(self.conn)
def isatty (self):
"""
Returns False.
"""
"""Returns False."""
if self.closed:
raise ValueError, "I/O operation on closed file"
return False
def seek (self, pos, mode=0):
"""
Does nothing.
"""
"""Does nothing."""
if self.closed:
raise ValueError, "I/O operation on closed file"
def tell (self):
"""
Return zero.
"""
"""Return zero."""
if self.closed:
raise ValueError, "I/O operation on closed file"
return 0
def read (self, n=-1):
"""
Return empty string.
"""
"""Return empty string."""
if self.closed:
raise ValueError, "I/O operation on closed file"
return ""
def readline (self, length=None):
"""
Return empty string.
"""
"""Return empty string."""
if self.closed:
raise ValueError, "I/O operation on closed file"
return ""
def readlines (self):
"""
Return empty list.
"""
"""Return empty list."""
if self.closed:
raise ValueError, "I/O operation on closed file"
return []
def write (self, s):
"""
Write data in record for record to connection.
"""
"""Write data in record for record to connection."""
if self.closed:
raise ValueError, "I/O operation on closed file"
while s:
@ -312,23 +282,17 @@ class FastCGIWriter (object):
self.record.write_record(self.conn)
def get_next_chunk (self, data):
"""
Return tuple (chunk of data, newdata).
"""
"""Return tuple (chunk of data, newdata)."""
chunk = data[:8192]
data = data[8192:]
return chunk, data
def writelines (self, lines):
"""
Write given lines to the connection.
"""
"""Write given lines to the connection."""
self.write(''.join(lines))
def flush (self):
"""
Does nothing.
"""
"""Raises ValueError if called with closed file."""
if self.closed:
raise ValueError, "I/O operation on closed file"

View file

@ -25,8 +25,7 @@ import fnmatch
def write_file (filename, content, backup=False, callback=None):
"""
Overwrite a possibly existing file with new content. Do this
"""Overwrite a possibly existing file with new content. Do this
in a manner that does not leave truncated or broken files behind.
@param filename: name of file to write
@type filename: string
@ -55,8 +54,7 @@ def write_file (filename, content, backup=False, callback=None):
def has_module (name):
"""
Test if given module can be imported.
"""Test if given module can be imported.
@return: flag if import is successful
@rtype: bool
"""
@ -68,23 +66,17 @@ def has_module (name):
class GlobDirectoryWalker (object):
"""
A forward iterator that traverses a directory tree.
"""
"""A forward iterator that traverses a directory tree."""
def __init__ (self, directory, pattern="*"):
"""
Set start directory and pattern matcher.
"""
"""Set start directory and pattern matcher."""
self.stack = [directory]
self.pattern = pattern
self.files = []
self.index = 0
def __getitem__ (self, index):
"""
Search for next filename.
"""
"""Search for next filename."""
while True:
try:
filename = self.files[self.index]
@ -108,35 +100,25 @@ rglob = GlobDirectoryWalker
class Buffer (object):
"""
Holds buffered data
"""
"""Holds buffered data"""
def __init__ (self, empty=''):
"""
Initialize buffer.
"""
"""Initialize buffer."""
self.empty = self.buf = empty
self.tmpbuf = []
self.pos = 0
def __len__ (self):
"""
Buffer length.
"""
"""Buffer length."""
return self.pos
def write (self, data):
"""
Write data to buffer.
"""
"""Write data to buffer."""
self.tmpbuf.append(data)
self.pos += len(data)
def flush (self, overlap=0):
"""
Flush buffered data and return it.
"""
"""Flush buffered data and return it."""
self.buf += self.empty.join(self.tmpbuf)
self.tmpbuf = []
if overlap and overlap < self.pos:
@ -149,9 +131,7 @@ class Buffer (object):
def get_mtime (filename):
"""
Return modification time of filename or zero on errors.
"""
"""Return modification time of filename or zero on errors."""
try:
return os.stat(filename)[stat.ST_MTIME]
except os.error:
@ -177,10 +157,8 @@ def pathencode (path):
# cache for modified check {absolute filename -> mtime}
_mtime_cache = {}
def has_changed (filename):
"""
Check if filename has changed since the last check. If this
is the first check, assume the file is changed.
"""
"""Check if filename has changed since the last check. If this
is the first check, assume the file is changed."""
key = os.path.abspath(filename)
mtime = get_mtime(key)
if key not in _mtime_cache:

View file

@ -863,7 +863,7 @@ class HTTPConnection:
print "Cannot stat!!"
if thelen is not None:
self.putheader('Content-Length',thelen)
for hdr, value in headers.iteritems():
for hdr, value in headers.items():
self.putheader(hdr, value)
self.endheaders()

View file

@ -33,9 +33,7 @@ if default_encoding is None:
default_encoding = "ascii"
def install_builtin (translator, do_unicode):
"""
Install _() and _n() gettext methods into default namespace.
"""
"""Install _() and _n() gettext methods into default namespace."""
import __builtin__
if do_unicode:
__builtin__.__dict__['_'] = translator.ugettext
@ -47,36 +45,26 @@ def install_builtin (translator, do_unicode):
__builtin__.__dict__['_n'] = translator.ngettext
class Translator (gettext.GNUTranslations):
"""
A translation class always installing its gettext methods into the
default namespace.
"""
"""A translation class always installing its gettext methods into the
default namespace."""
def install (self, do_unicode):
"""
Install gettext methods into the default namespace.
"""
"""Install gettext methods into the default namespace."""
install_builtin(self, do_unicode)
class NullTranslator (gettext.NullTranslations):
"""
A dummy translation class always installing its gettext methods into
the default namespace.
"""
"""A dummy translation class always installing its gettext methods into
the default namespace."""
def install (self, do_unicode):
"""
Install gettext methods into the default namespace.
"""
"""Install gettext methods into the default namespace."""
install_builtin(self, do_unicode)
def init (domain, directory):
"""
Initialize this gettext i18n module. Searches for supported languages
and installs the gettext translator class.
"""
"""Initialize this gettext i18n module. Searches for supported languages
and installs the gettext translator class."""
global default_language, default_encoding
if os.path.isdir(directory):
# get supported languages
@ -98,9 +86,7 @@ def init (domain, directory):
def get_translator (domain, directory, languages=None,
translatorklass=Translator, fallback=False,
fallbackklass=NullTranslator):
"""
Search the appropriate GNUTranslations class.
"""
"""Search the appropriate GNUTranslations class."""
translator = gettext.translation(domain, localedir=directory,
languages=languages, class_=translatorklass, fallback=fallback)
if not isinstance(translator, gettext.GNUTranslations) and fallbackklass:
@ -109,18 +95,14 @@ def get_translator (domain, directory, languages=None,
def get_lang (lang):
"""
Return lang if it is supported, or the default language.
"""
"""Return lang if it is supported, or the default language."""
if lang in supported_languages:
return lang
return default_language
def get_headers_lang (headers):
"""
Return preferred supported language in given HTTP headers.
"""
"""Return preferred supported language in given HTTP headers."""
if 'Accept-Language' not in headers:
return default_language
languages = headers['Accept-Language'].split(",")
@ -145,17 +127,10 @@ def get_headers_lang (headers):
def get_locale ():
"""
Return current configured locale.
"""
"""Return current configured locale."""
loc = None
encoding = 'ascii'
try:
loc, encoding = locale.getlocale(category=locale.LC_ALL)
except ValueError:
# XXX ignore Python bug
# http://bugs.python.org/issue1158909
pass
loc, encoding = locale.getlocale(category=locale.LC_ALL)
if loc is None:
return ('C', 'ascii')
loc = locale.normalize(loc)
@ -182,14 +157,10 @@ lang_transis = {
}
def lang_name (lang):
"""
Return full name of given language.
"""
"""Return full name of given language."""
return lang_names[lang]
def lang_trans (lang, curlang):
"""
Return translated full name of given language.
"""
"""Return translated full name of given language."""
return lang_transis[lang][curlang]

View file

@ -44,25 +44,19 @@ lang_locale = {
_is_level = re.compile(r'^(0|1|2|3|-1)$').match
class FormError (StandardError):
"""
Form related errors.
"""
"""Form related errors."""
pass
def startoutput (out=sys.stdout):
"""
Print leading HTML headers to given output stream.
"""
"""Print leading HTML headers to given output stream."""
out.write("Content-type: text/html\r\n"
"Cache-Control: no-cache\r\n"
"Pragma: no-cache\r\n"
"\r\n")
def checkaccess (out=sys.stdout, hosts=None, servers=None, env=os.environ):
"""
See if remote addr is allowed to access the CGI interface.
"""
"""See if remote addr is allowed to access the CGI interface."""
if hosts is None:
hosts = []
if servers is None:
@ -76,9 +70,7 @@ def checkaccess (out=sys.stdout, hosts=None, servers=None, env=os.environ):
def checklink (out=sys.stdout, form=None, env=os.environ):
"""
Main cgi function, check the given links and print out the result.
"""
"""Main cgi function, check the given links and print out the result."""
if form is None:
form = {}
try:
@ -115,18 +107,14 @@ def checklink (out=sys.stdout, form=None, env=os.environ):
def get_host_name (form):
"""
Return host name of given URL.
"""
"""Return host name of given URL."""
return urlparse.urlparse(form["url"].value)[1]
def checkform (form):
"""
Check form data. throw exception on error
"""Check form data. throw exception on error
Be sure to NOT print out any user-given data as HTML code, so use
only plain strings as exception text.
"""
only plain strings as exception text."""
# check lang support
if "language" in form:
lang = form['language'].value
@ -156,9 +144,7 @@ def checkform (form):
raise FormError(_("invalid %s option syntax") % option)
def logit (form, env):
"""
Log form errors.
"""
"""Log form errors."""
global _logfile
if not _logfile:
return
@ -175,9 +161,7 @@ def logit (form, env):
def print_error (out, why):
"""
Print standard error page.
"""
"""Print standard error page."""
out.write(_("""<html><head>
<title>LinkChecker Online Error</title></head>
<body text=#192c83 bgcolor=#fff7e5 link=#191c83 vlink=#191c83 alink=#191c83>

View file

@ -19,8 +19,8 @@ Parse name of common link types.
"""
import re
import linkcheck.HtmlParser
import linkcheck.strformat
from . import HtmlParser
from . import strformat
imgtag_re = re.compile(r"(?i)\s+alt\s*=\s*"+\
@ -29,27 +29,20 @@ img_re = re.compile(r"""(?i)<\s*img\s+("[^"\n]*"|'[^'\n]*'|[^>])+>""")
endtag_re = re.compile(r"""(?i)</a\s*>""")
def _unquote (txt):
"""
Resolve entities and markup from txt.
"""
return linkcheck.HtmlParser.resolve_entities(
linkcheck.strformat.remove_markup(txt))
"""Resolve entities and markup from txt."""
return HtmlParser.resolve_entities(strformat.remove_markup(txt))
def image_name (txt):
"""
Return the alt part of the first <img alt=""> tag in txt.
"""
"""Return the alt part of the first <img alt=""> tag in txt."""
mo = imgtag_re.search(txt)
if mo:
name = linkcheck.strformat.unquote(mo.group('name').strip())
name = strformat.unquote(mo.group('name').strip())
return _unquote(name)
return u''
def href_name (txt):
"""
Return the name part of the first <a href="">name</a> link in txt.
"""
"""Return the name part of the first <a href="">name</a> link in txt."""
name = u""
endtag = endtag_re.search(txt)
if not endtag:

View file

@ -19,13 +19,12 @@ Find link tags in HTML text.
"""
import re
import linkcheck.strformat
from . import strformat, log, LOG_CHECK
import linkcheck.linkname
import linkcheck.log
import linkcheck.url
MAX_NAMELEN = 256
unquote = linkcheck.strformat.unquote
unquote = strformat.unquote
# ripped mainly from HTML::Tagset.pm
LinkTags = {
@ -74,53 +73,38 @@ def strip_c_comments (text):
class TagFinder (object):
"""
Base class storing HTML parse messages in a list.
TagFinder instances are to be used as HtmlParser handlers.
"""
"""Base class storing HTML parse messages in a list.
TagFinder instances are to be used as HtmlParser handlers."""
def __init__ (self):
"""
Initialize local variables.
"""
"""Initialize local variables."""
super(TagFinder, self).__init__()
# parser object will be initialized when it is used as
# a handler object
self.parser = None
def start_element (self, tag, attrs):
"""
Does nothing, override in a subclass.
"""
"""Does nothing, override in a subclass."""
pass
def start_end_element (self, tag, attrs):
"""
Delegate a combined start/end element (eg. <br/>) to
the start_element method. Ignore the end element part.
"""
"""Delegate a combined start/end element (eg. <br/>) to
the start_element method. Ignore the end element part."""
self.start_element(tag, attrs)
class MetaRobotsFinder (TagFinder):
"""
Class for finding robots.txt meta values in HTML.
"""
"""Class for finding robots.txt meta values in HTML."""
def __init__ (self):
"""
Initialize flags.
"""
"""Initialize flags."""
super(MetaRobotsFinder, self).__init__()
self.follow = True
self.index = True
assert None == linkcheck.log.debug(linkcheck.LOG_CHECK,
"meta robots finder")
log.debug(LOG_CHECK, "meta robots finder")
def start_element (self, tag, attrs):
"""
Search for meta robots.txt "nofollow" and "noindex" flags.
"""
"""Search for meta robots.txt "nofollow" and "noindex" flags."""
if tag == 'meta':
if attrs.get('name') == 'robots':
val = attrs.get_true('content', u'').lower().split(u',')
@ -129,9 +113,7 @@ class MetaRobotsFinder (TagFinder):
def is_meta_url (attr, attrs):
"""
Check if the meta attributes contain a URL.
"""
"""Check if the meta attributes contain a URL."""
res = False
if attr == "content":
equiv = attrs.get_true('http-equiv', u'').lower()
@ -144,16 +126,12 @@ def is_meta_url (attr, attrs):
class LinkFinder (TagFinder):
"""
Find a list of links. After parsing, self.urls
"""Find a list of links. After parsing, self.urls
will be a list of parsed links entries with the format
(url, lineno, column, name, codebase).
"""
(url, lineno, column, name, codebase)."""
def __init__ (self, content, tags=None):
"""
Store content in buffer and initialize URL list.
"""
"""Store content in buffer and initialize URL list."""
super(LinkFinder, self).__init__()
self.content = content
if tags is None:
@ -162,16 +140,12 @@ class LinkFinder (TagFinder):
self.tags = tags
self.urls = []
self.base_ref = u''
assert None == linkcheck.log.debug(linkcheck.LOG_CHECK, "link finder")
log.debug(LOG_CHECK, "link finder")
def start_element (self, tag, attrs):
"""
Search for links and store found URLs in a list.
"""
assert None == linkcheck.log.debug(linkcheck.LOG_CHECK,
"LinkFinder tag %s attrs %s", tag, attrs)
assert None == linkcheck.log.debug(linkcheck.LOG_CHECK,
"line %d col %d old line %d old col %d",
"""Search for links and store found URLs in a list."""
log.debug(LOG_CHECK, "LinkFinder tag %s attrs %s", tag, attrs)
log.debug(LOG_CHECK, "line %d col %d old line %d old col %d",
self.parser.lineno(), self.parser.column(),
self.parser.last_lineno(), self.parser.last_column())
if tag == "base" and not self.base_ref:
@ -196,13 +170,10 @@ class LinkFinder (TagFinder):
value = unquote(attrs.get(attr))
# add link to url list
self.add_link(tag, attr, value, name, codebase)
assert None == linkcheck.log.debug(linkcheck.LOG_CHECK,
"LinkFinder finished tag %s", tag)
log.debug(LOG_CHECK, "LinkFinder finished tag %s", tag)
def get_link_name (self, tag, attrs, attr):
"""
Parse attrs for link name. Return name of link.
"""
"""Parse attrs for link name. Return name of link."""
if tag == 'a' and attr == 'href':
name = unquote(attrs.get_true('title', u''))
if not name:
@ -221,9 +192,7 @@ class LinkFinder (TagFinder):
return name
def add_link (self, tag, attr, url, name, base):
"""
Add given url data to url list.
"""
"""Add given url data to url list."""
assert isinstance(tag, unicode), repr(tag)
assert isinstance(attr, unicode), repr(attr)
assert isinstance(name, unicode), repr(name)
@ -248,7 +217,7 @@ class LinkFinder (TagFinder):
return
for u in urls:
assert isinstance(u, unicode) or u is None, repr(u)
assert None == linkcheck.log.debug(linkcheck.LOG_CHECK,
log.debug(LOG_CHECK,
u"LinkParser add link %r %r %r %r %r", tag, attr, u, name, base)
self.urls.append((u, self.parser.last_lineno(),
self.parser.last_column(), name, base))

View file

@ -18,8 +18,7 @@
Locking utility class.
"""
import threading
import linkcheck
import log
from . import log, LOG_THREAD
def get_lock (name):
return threading.Lock()
@ -28,29 +27,20 @@ def get_lock (name):
class DebugLock (object):
"""
Debugging lock class.
"""
"""Debugging lock class."""
def __init__ (self, lock, name):
self.lock = lock
self.name = name
def acquire (self, blocking=1):
"""
Acquire lock.
"""
"""Acquire lock."""
threadname = threading.currentThread().getName()
assert None == log.debug(linkcheck.LOG_THREAD,
"Acquire %s for %s", self.name, threadname)
log.debug(LOG_THREAD, "Acquire %s for %s", self.name, threadname)
self.lock.acquire(blocking)
assert None == log.debug(linkcheck.LOG_THREAD,
"...acquired %s for %s", self.name, threadname)
log.debug(LOG_THREAD, "...acquired %s for %s", self.name, threadname)
def release (self):
"""
Release lock.
"""
"""Release lock."""
threadname = threading.currentThread().getName()
assert None == log.debug(linkcheck.LOG_THREAD,
"Release %s for %s", self.name, threadname)
log.debug(LOG_THREAD, "Release %s for %s", self.name, threadname)
self.lock.release()

View file

@ -30,8 +30,7 @@ import cStringIO as StringIO
PRINT_LOCALVARS = False
def _stack_format (stack):
"""
Format a stack trace to a message.
"""Format a stack trace to a message.
@return: formatted stack message
@rtype: string
@ -44,7 +43,7 @@ def _stack_format (stack):
s.write(os.linesep)
s.write(' %s' % lines[0].lstrip())
if PRINT_LOCALVARS:
for key, value in frame.f_locals.iteritems():
for key, value in frame.f_locals.items():
s.write(" %s = " % key)
# be careful not to cause a new error in the error output
try:
@ -56,8 +55,7 @@ def _stack_format (stack):
def _log (fun, msg, args, tb=False):
"""
Log a message with given function and an optional traceback.
"""Log a message with given function and an optional traceback.
@return: None
"""
@ -68,8 +66,7 @@ def _log (fun, msg, args, tb=False):
def debug (logname, msg, *args, **kwargs):
"""
Log a debug message.
"""Log a debug message.
return: None
"""
@ -79,8 +76,7 @@ def debug (logname, msg, *args, **kwargs):
def info (logname, msg, *args, **kwargs):
"""
Log an informational message.
"""Log an informational message.
return: None
"""
@ -90,8 +86,7 @@ def info (logname, msg, *args, **kwargs):
def warn (logname, msg, *args, **kwargs):
"""
Log a warning.
"""Log a warning.
return: None
"""
@ -101,8 +96,7 @@ def warn (logname, msg, *args, **kwargs):
def error (logname, msg, *args, **kwargs):
"""
Log an error.
"""Log an error.
return: None
"""
@ -112,8 +106,7 @@ def error (logname, msg, *args, **kwargs):
def critical (logname, msg, *args, **kwargs):
"""
Log a critical error.
"""Log a critical error.
return: None
"""
@ -123,8 +116,7 @@ def critical (logname, msg, *args, **kwargs):
def exception (logname, msg, *args, **kwargs):
"""
Log an exception.
"""Log an exception.
return: None
"""
@ -134,14 +126,10 @@ def exception (logname, msg, *args, **kwargs):
def is_debug (logname):
"""
See if logger is on debug level.
"""
"""See if logger is on debug level."""
return logging.getLogger(logname).isEnabledFor(logging.DEBUG)
def shutdown ():
"""
Flush and close all log handlers.
"""
"""Flush and close all log handlers."""
logging.shutdown()

View file

@ -26,7 +26,7 @@ import linkcheck.strformat
import linkcheck.i18n
import linkcheck.decorators
import linkcheck.dummy
import linkcheck.log
from .. import log, LOG_CHECK
_ = lambda x: x
Fields = dict(
@ -102,7 +102,7 @@ class Logger (object):
self.close_fd = True
except IOError:
msg = sys.exc_info()[1]
linkcheck.log.warn(linkcheck.LOG_CHECK,
log.warn(LOG_CHECK,
"Could not open file %r for writing: %s\n"
"Disabling log output of %s", self.filename, msg, self)
self.fd = linkcheck.dummy.Dummy()
@ -180,7 +180,7 @@ class Logger (object):
self.start_fileoutput()
if self.fd is None:
# Happens when aborting threads times out
linkcheck.log.warn(linkcheck.LOG_CHECK,
log.warn(LOG_CHECK,
"writing to unitialized or closed file")
else:
self.fd.write(self.encode(s), **args)

View file

@ -18,8 +18,8 @@
A blacklist logger.
"""
from __future__ import with_statement
import os
import linkcheck.logger
@ -71,23 +71,20 @@ class BlacklistLogger (linkcheck.logger.Logger):
"""
Read a previously stored blacklist from file fd.
"""
fd = open(self.filename)
try:
with open(self.filename) as fd:
for line in fd:
line = self.decode(line.rstrip())
if line.startswith('#') or not line:
continue
value, key = line.split(None, 1)
self.blacklist[key] = int(value)
finally:
fd.close()
def write_blacklist (self):
"""
Write the blacklist.
"""
oldmask = os.umask(0077)
for key, value in self.blacklist.iteritems():
for key, value in self.blacklist.items():
self.write(u"%d %s%s" % (value, key, os.linesep))
self.close_fileoutput()
# restore umask

View file

@ -58,7 +58,7 @@ class CustomXMLLogger (xmllog.XMLLogger):
if self.has_part("realurl"):
self.xml_tag(u"realurl", unicode(url_data.url))
if self.has_part("extern"):
self.xml_tag(u"extern", u"%d" % (url_data.extern[0] and 1 or 0))
self.xml_tag(u"extern", u"%d" % (1 if url_data.extern[0] else 0))
if url_data.dltime >= 0 and self.has_part("dltime"):
self.xml_tag(u"dltime", u"%f" % url_data.dltime)
if url_data.dlsize >= 0 and self.has_part("dlsize"):
@ -79,7 +79,7 @@ class CustomXMLLogger (xmllog.XMLLogger):
attrs = {}
if url_data.result:
attrs["result"] = url_data.result
self.xml_tag(u"valid", u"%d" % (url_data.valid and 1 or 0), attrs)
self.xml_tag(u"valid", u"%d" % (1 if url_data.valid else 0), attrs)
self.xml_endtag(u'urldata')
self.flush()

View file

@ -83,7 +83,7 @@ class DOTLogger (linkcheck.logger.Logger):
if node.checktime and self.has_part("checktime"):
self.writeln(u" checktime=%d," % node.checktime)
if self.has_part("extern"):
self.writeln(u" extern=%d," % (node.extern[0] and 1 or 0))
self.writeln(u" extern=%d," % (1 if node.extern[0] else 0))
self.writeln(u" ];")
def write_edges (self):
@ -91,14 +91,14 @@ class DOTLogger (linkcheck.logger.Logger):
Write all edges we can find in the graph in a brute-force
manner. Better would be a mapping of parent URLs.
"""
for node in self.nodes.itervalues():
for node in self.nodes.values():
if node.parent_url in self.nodes:
source = self.nodes[node.parent_url].id
target = node.id
self.writeln(u" %d -> %d [" % (source, target))
self.writeln(u' label="%s",' % dotedge(node.name))
if self.has_part("result"):
self.writeln(u" valid=%d," % (node.valid and 1 or 0))
self.writeln(u" valid=%d," % (1 if node.valid else 0))
self.writeln(u" ];")
self.flush()

View file

@ -85,7 +85,7 @@ class GMLLogger (linkcheck.logger.Logger):
if node.checktime and self.has_part("checktime"):
self.writeln(u" checktime %d" % node.checktime)
if self.has_part("extern"):
self.writeln(u" extern %d" % (node.extern[0] and 1 or 0))
self.writeln(u" extern %d" % (1 if node.extern[0] else 0))
self.writeln(u" ]")
def write_edges (self):
@ -93,7 +93,7 @@ class GMLLogger (linkcheck.logger.Logger):
Write all edges we can find in the graph in a brute-force
manner. Better would be a mapping of parent URLs.
"""
for node in self.nodes.itervalues():
for node in self.nodes.values():
if node.parent_url in self.nodes:
self.writeln(u" edge [")
self.writeln(u' label "%s"' % (node.base_url or u""))
@ -102,7 +102,7 @@ class GMLLogger (linkcheck.logger.Logger):
self.nodes[node.parent_url].id)
self.writeln(u" target %d" % node.id)
if self.has_part("result"):
self.writeln(u" valid %d" % (node.valid and 1 or 0))
self.writeln(u" valid %d" % (1 if node.valid else 0))
self.writeln(u" ]")
self.flush()

View file

@ -66,7 +66,7 @@ class GraphXMLLogger (xmllog.XMLLogger):
if node.checktime and self.has_part("checktime"):
self.xml_tag(u"checktime", u"%f" % node.checktime)
if self.has_part("extern"):
self.xml_tag(u"extern", u"%d" % (node.extern[0] and 1 or 0))
self.xml_tag(u"extern", u"%d" % (1 if node.extern[0] else 0))
self.xml_endtag(u"data")
self.xml_endtag(u"node")
self.write_edges()
@ -76,7 +76,7 @@ class GraphXMLLogger (xmllog.XMLLogger):
Write all edges we can find in the graph in a brute-force
manner. Better would be a mapping of parent URLs.
"""
for node in self.nodes.itervalues():
for node in self.nodes.values():
if node.parent_url in self.nodes:
attrs = {
u"source": u"%d" % self.nodes[node.parent_url].id,
@ -87,7 +87,7 @@ class GraphXMLLogger (xmllog.XMLLogger):
self.xml_tag(u"label", node.base_url or u"")
self.xml_starttag(u"data")
if self.has_part("result"):
self.xml_tag(u"valid", u"%d" % (node.valid and 1 or 0))
self.xml_tag(u"valid", u"%d" % (1 if node.valid else 0))
self.xml_endtag(u"data")
self.xml_endtag(u"edge")
self.flush()

View file

@ -107,7 +107,7 @@ class XMLLogger (linkcheck.logger.Logger):
self.write(self.indent*self.level)
self.write(u"<%s" % xmlquote(name))
if attrs:
for name, value in attrs.iteritems():
for name, value in attrs.items():
args = (xmlquote(name), xmlquoteattr(value))
self.write(u' %s="%s"' % args)
self.writeln(u">");
@ -129,7 +129,7 @@ class XMLLogger (linkcheck.logger.Logger):
self.write(self.indent*self.level)
self.write(u"<%s" % xmlquote(name))
if attrs:
for aname, avalue in attrs.iteritems():
for aname, avalue in attrs.items():
args = (xmlquote(aname), xmlquoteattr(avalue))
self.write(u' %s="%s"' % args)
self.writeln(u">%s</%s>" % (xmlquote(content), xmlquote(name)))

View file

@ -18,8 +18,7 @@ _scale = {'kB': 1024.0, 'mB': 1024.0*1024.0,
'KB': 1024.0, 'MB': 1024.0*1024.0}
def _VmB (VmKey):
"""
Parse /proc/<pid>/status file for given key.
"""Parse /proc/<pid>/status file for given key.
@return: requested number value of status entry
@rtype: float
@ -46,8 +45,7 @@ def _VmB (VmKey):
def memory (since=0.0):
"""
Get memory usage.
"""Get memory usage.
@return: memory usage in bytes
@rtype: float
@ -56,8 +54,7 @@ def memory (since=0.0):
def resident (since=0.0):
"""
Get resident memory usage.
"""Get resident memory usage.
@return: resident memory usage in bytes
@rtype: float
@ -66,8 +63,7 @@ def resident (since=0.0):
def stacksize (since=0.0):
"""
Get stack size.
"""Get stack size.
@return: stack size in bytes
@rtype: float

View file

@ -37,9 +37,7 @@ MESSAGES = {}
def usage (ecode, msg=''):
"""
Print usage and msg and exit with given code.
"""
"""Print usage and msg and exit with given code."""
print >> sys.stderr, __doc__
if msg:
print >> sys.stderr, msg
@ -47,17 +45,13 @@ def usage (ecode, msg=''):
def add (msgid, transtr, fuzzy):
"""
Add a non-fuzzy translation to the dictionary.
"""
"""Add a non-fuzzy translation to the dictionary."""
if not fuzzy and transtr and not transtr.startswith('\0'):
MESSAGES[msgid] = transtr
def generate ():
"""
Return the generated output.
"""
"""Return the generated output."""
keys = MESSAGES.keys()
# the keys are sorted in the .mo file
keys.sort()

View file

@ -32,37 +32,31 @@ import zlib
import sys
import cStringIO as StringIO
import linkcheck
import configuration
import log
from linkcheck import gzip2 as gzip
from . import configuration
from . import log, LOG_CHECK
from . import gzip2 as gzip
__all__ = ["RobotFileParser"]
class PasswordManager (object):
"""
Simple password manager storing username and password. Suitable
for use as an AuthHandler instance in urllib2.
"""
"""Simple password manager storing username and password. Suitable
for use as an AuthHandler instance in urllib2."""
def __init__ (self, user, password):
"""
Store given username and password.
"""
"""Store given username and password."""
self.user = user
self.password = password
def add_password (self, realm, uri, user, passwd):
"""
Does nothing since username and password are already stored.
"""Does nothing since username and password are already stored.
@return: None
"""
pass
def find_user_password (self, realm, authuri):
"""
Get stored username and password.
"""Get stored username and password.
@return: A tuple (user, password)
@rtype: tuple
@ -71,25 +65,19 @@ class PasswordManager (object):
class RobotFileParser (object):
"""
This class provides a set of methods to read, parse and answer
questions about a single robots.txt file.
"""
"""This class provides a set of methods to read, parse and answer
questions about a single robots.txt file."""
def __init__ (self, url='', user=None, password=None):
"""
Initialize internal entry lists and store given url and
credentials.
"""
"""Initialize internal entry lists and store given url and
credentials."""
self.set_url(url)
self.user = user
self.password = password
self._reset()
def _reset (self):
"""
Reset internal flags and entry lists.
"""
"""Reset internal flags and entry lists."""
self.entries = []
self.default_entry = None
self.disallow_all = False
@ -97,8 +85,7 @@ class RobotFileParser (object):
self.last_checked = 0
def mtime (self):
"""
Returns the time the robots.txt file was last fetched.
"""Returns the time the robots.txt file was last fetched.
This is useful for long-running web spiders that need to
check for new robots.txt files periodically.
@ -109,20 +96,17 @@ class RobotFileParser (object):
return self.last_checked
def modified (self):
"""
Set the time the robots.txt file was last fetched to the
current time.
"""
"""Set the time the robots.txt file was last fetched to the
current time."""
self.last_checked = time.time()
def set_url (self, url):
"Set the URL referring to a robots.txt file."
"""Set the URL referring to a robots.txt file."""
self.url = url
self.host, self.path = urlparse.urlparse(url)[1:3]
def get_opener (self):
"""
Construct an URL opener object. It considers the given credentials
"""Construct an URL opener object. It considers the given credentials
from the __init__() method and supports proxies.
@return: URL opener
@ -145,7 +129,7 @@ class RobotFileParser (object):
return urllib2.build_opener(*handlers)
def read (self):
"Read the robots.txt URL and feeds it to the parser."
"""Read the robots.txt URL and feeds it to the parser."""
self._reset()
headers = {
'User-Agent': configuration.UserAgent,
@ -157,12 +141,10 @@ class RobotFileParser (object):
except urllib2.HTTPError, x:
if x.code in (401, 403):
self.disallow_all = True
assert None == log.debug(linkcheck.LOG_CHECK,
"%s disallow all", self.url)
log.debug(LOG_CHECK, "%s disallow all", self.url)
else:
self.allow_all = True
assert None == log.debug(linkcheck.LOG_CHECK,
"%s allow all", self.url)
log.debug(LOG_CHECK, "%s allow all", self.url)
except socket.timeout:
raise
except urllib2.URLError:
@ -170,32 +152,24 @@ class RobotFileParser (object):
if isinstance(x.reason, socket.timeout):
raise
self.allow_all = True
assert None == log.debug(linkcheck.LOG_CHECK,
"%s allow all", self.url)
log.debug(LOG_CHECK, "%s allow all", self.url)
except (socket.gaierror, socket.error):
# no network
self.allow_all = True
assert None == log.debug(linkcheck.LOG_CHECK,
"%s allow all", self.url)
log.debug(LOG_CHECK, "%s allow all", self.url)
except IOError, msg:
self.allow_all = True
assert None == log.debug(linkcheck.LOG_CHECK,
"%s allow all", self.url)
log.debug(LOG_CHECK, "%s allow all", self.url)
except httplib.HTTPException:
self.allow_all = True
assert None == log.debug(linkcheck.LOG_CHECK,
"%s allow all", self.url)
log.debug(LOG_CHECK, "%s allow all", self.url)
except ValueError:
# XXX bug workaround:
# urllib2.AbstractDigestAuthHandler raises ValueError on
# failed authorisation
# urllib2 could raise ValueError on invalid data
self.disallow_all = True
assert None == log.debug(linkcheck.LOG_CHECK,
"%s disallow all", self.url)
log.debug(LOG_CHECK, "%s disallow all", self.url)
def _read_content (self, req):
"""
Read robots.txt content.
"""Read robots.txt content.
@raise: urllib2.HTTPError on HTTP failure codes
@raise: socket.gaierror, socket.error, urllib2.URLError on network
errors
@ -215,8 +189,7 @@ class RobotFileParser (object):
self.allow_all = True
def _add_entry (self, entry):
"""
Add a parsed entry to entry list.
"""Add a parsed entry to entry list.
@return: None
"""
@ -227,15 +200,13 @@ class RobotFileParser (object):
self.entries.append(entry)
def parse (self, lines):
"""
Parse the input lines from a robot.txt file.
"""Parse the input lines from a robot.txt file.
We allow that a user-agent: line is not preceded by
one or more blank lines.
@return: None
"""
assert None == log.debug(linkcheck.LOG_CHECK,
"%s parse lines", self.url)
log.debug(LOG_CHECK, "%s parse lines", self.url)
state = 0
linenumber = 0
entry = Entry()
@ -244,7 +215,7 @@ class RobotFileParser (object):
linenumber += 1
if not line:
if state == 1:
assert None == log.debug(linkcheck.LOG_CHECK,
log.debug(LOG_CHECK,
"%s line %d: allow or disallow directives without" \
" any user-agent line", self.url, linenumber)
entry = Entry()
@ -266,7 +237,7 @@ class RobotFileParser (object):
line[1] = urllib.unquote(line[1].strip())
if line[0] == "user-agent":
if state == 2:
assert None == log.debug(linkcheck.LOG_CHECK,
log.debug(LOG_CHECK,
"%s line %d: missing blank line before user-agent" \
" directive", self.url, linenumber)
self._add_entry(entry)
@ -275,7 +246,7 @@ class RobotFileParser (object):
state = 1
elif line[0] == "disallow":
if state == 0:
assert None == log.debug(linkcheck.LOG_CHECK,
log.debug(LOG_CHECK,
"%s line %d: missing user-agent directive before" \
" this line", self.url, linenumber)
else:
@ -283,7 +254,7 @@ class RobotFileParser (object):
state = 2
elif line[0] == "allow":
if state == 0:
assert None == log.debug(linkcheck.LOG_CHECK,
log.debug(LOG_CHECK,
"%s line %d: missing user-agent directive before" \
" this line", self.url, linenumber)
else:
@ -291,7 +262,7 @@ class RobotFileParser (object):
state = 2
elif line[0] == "crawl-delay":
if state == 0:
assert None == log.debug(linkcheck.LOG_CHECK,
log.debug(LOG_CHECK,
"%s line %d: missing user-agent directive before" \
" this line", self.url, linenumber)
else:
@ -299,33 +270,28 @@ class RobotFileParser (object):
entry.crawldelay = max(0, int(line[1]))
state = 2
except ValueError:
assert None == log.debug(linkcheck.LOG_CHECK,
"%s line %d: invalid delay number %r",
self.url, linenumber, line[1])
log.debug(LOG_CHECK,
"%s line %d: invalid delay number %r",
self.url, linenumber, line[1])
pass
else:
assert None == log.debug(linkcheck.LOG_CHECK,
"%s line %d: unknown key %s",
log.debug(LOG_CHECK, "%s line %d: unknown key %s",
self.url, linenumber, line[0])
else:
assert None == log.debug(linkcheck.LOG_CHECK,
"%s line %d: malformed line %s",
log.debug(LOG_CHECK, "%s line %d: malformed line %s",
self.url, linenumber, line)
if state in (1, 2):
self.entries.append(entry)
self.modified()
assert None == log.debug(linkcheck.LOG_CHECK,
"Parsed rules:\n%s", str(self))
log.debug(LOG_CHECK, "Parsed rules:\n%s", str(self))
def can_fetch (self, useragent, url):
"""
Using the parsed robots.txt decide if useragent can fetch url.
"""Using the parsed robots.txt decide if useragent can fetch url.
@return: True if agent can fetch url, else False
@rtype: bool
"""
assert None == log.debug(linkcheck.LOG_CHECK,
"%s check allowance for:\n" \
log.debug(LOG_CHECK, "%s check allowance for:\n" \
" user agent: %r\n url: %r", self.url, useragent, url)
if not isinstance(useragent, str):
useragent = useragent.encode("ascii", "ignore")
@ -348,8 +314,7 @@ class RobotFileParser (object):
return True
def get_crawldelay (self, useragent):
"""
Look for a configured crawl delay.
"""Look for a configured crawl delay.
@return: crawl delay in seconds or zero
@rtype: integer >= 0
@ -360,8 +325,7 @@ class RobotFileParser (object):
return 0
def __str__ (self):
"""
Constructs string representation, usable as contents of a
"""Constructs string representation, usable as contents of a
robots.txt file.
@return: robots.txt format
@ -374,15 +338,12 @@ class RobotFileParser (object):
class RuleLine (object):
"""
A rule line is a single "Allow:" (allowance==1) or "Disallow:"
"""A rule line is a single "Allow:" (allowance==1) or "Disallow:"
(allowance==0) followed by a path.
"""
def __init__ (self, path, allowance):
"""
Initialize with given path and allowance info.
"""
"""Initialize with given path and allowance info."""
if path == '' and not allowance:
# an empty value means allow all
allowance = True
@ -391,8 +352,7 @@ class RuleLine (object):
self.allowance = allowance
def applies_to (self, path):
"""
Look if given path applies to this rule.
"""Look if given path applies to this rule.
@return: True if pathname applies to this rule, else False
@rtype: bool
@ -400,31 +360,25 @@ class RuleLine (object):
return self.path == "*" or path.startswith(self.path)
def __str__ (self):
"""
Construct string representation in robots.txt format.
"""Construct string representation in robots.txt format.
@return: robots.txt format
@rtype: string
"""
return (self.allowance and "Allow" or "Disallow")+": "+self.path
return ("Allow" if self.allowance else "Disallow")+": "+self.path
class Entry (object):
"""
An entry has one or more user-agents and zero or more rulelines.
"""
"""An entry has one or more user-agents and zero or more rulelines."""
def __init__ (self):
"""
Initialize user agent and rule list.
"""
"""Initialize user agent and rule list."""
self.useragents = []
self.rulelines = []
self.crawldelay = 0
def __str__ (self):
"""
string representation in robots.txt format.
"""string representation in robots.txt format.
@return: robots.txt format
@rtype: string
@ -436,8 +390,7 @@ class Entry (object):
return "\n".join(lines)
def applies_to (self, useragent):
"""
Check if this entry applies to the specified agent.
"""Check if this entry applies to the specified agent.
@return: True if this entry applies to the agent, else False.
@rtype: bool
@ -456,8 +409,7 @@ class Entry (object):
return False
def allowance (self, path):
"""
Preconditions:
"""Preconditions:
- our agent applies to this entry
- filename is URL decoded
@ -467,8 +419,7 @@ class Entry (object):
@rtype: bool
"""
for line in self.rulelines:
assert None == log.debug(linkcheck.LOG_CHECK,
"%s %s %s", path, str(line), line.allowance)
log.debug(LOG_CHECK, "%s %s %s", path, str(line), line.allowance)
if line.applies_to(path):
return line.allowance
return True
@ -495,10 +446,8 @@ class Entry (object):
## ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS
## SOFTWARE.
def decode (page):
"""
Gunzip or deflate a compressed page.
"""
assert None == log.debug(linkcheck.LOG_CHECK,
"""Gunzip or deflate a compressed page."""
log.debug(LOG_CHECK,
"robots.txt page info %d %s", page.code, str(page.info()))
encoding = page.info().get("Content-Encoding")
if encoding in ('gzip', 'x-gzip', 'deflate'):
@ -510,8 +459,7 @@ def decode (page):
else:
fp = gzip.GzipFile('', 'rb', 9, StringIO.StringIO(content))
except zlib.error, msg:
assert None == log.debug(linkcheck.LOG_CHECK,
"uncompressing had error "
log.debug(LOG_CHECK, "uncompressing had error "
"%s, assuming non-compressed content", str(msg))
fp = StringIO.StringIO(content)
# remove content-encoding header
@ -532,26 +480,18 @@ def decode (page):
class HttpWithGzipHandler (urllib2.HTTPHandler):
"""
Support gzip encoding.
"""
"""Support gzip encoding."""
def http_open (self, req):
"""
Send request and decode answer.
"""
"""Send request and decode answer."""
return decode(urllib2.HTTPHandler.http_open(self, req))
if hasattr(httplib, 'HTTPS'):
class HttpsWithGzipHandler (urllib2.HTTPSHandler):
"""
Support gzip encoding.
"""
"""Support gzip encoding."""
def http_open (self, req):
"""
Send request and decode answer.
"""
"""Send request and decode answer."""
return decode(urllib2.HTTPSHandler.http_open(self, req))
# end of urlutils.py routines

View file

@ -34,8 +34,7 @@ import pydoc
def unicode_safe (s, encoding="iso-8859-1"):
"""
Get unicode string without raising encoding errors. Unknown
"""Get unicode string without raising encoding errors. Unknown
characters of the given encoding will be ignored.
@param s: the string to be decoded
@ -52,8 +51,7 @@ def unicode_safe (s, encoding="iso-8859-1"):
def ascii_safe (s):
"""
Get ASCII string without raising encoding errors. Unknown
"""Get ASCII string without raising encoding errors. Unknown
characters of the given encoding will be ignored.
@param s: the Unicode string to be encoded
@ -67,9 +65,7 @@ def ascii_safe (s):
def is_ascii (s):
"""
Test if a string can be encoded in ASCII.
"""
"""Test if a string can be encoded in ASCII."""
try:
s.encode('ascii', 'strict')
return True
@ -78,9 +74,7 @@ def is_ascii (s):
def is_encoding (text):
"""
Check if string is a valid encoding.
"""
"""Check if string is a valid encoding."""
try:
return codecs.lookup(text)
except (LookupError, ValueError):
@ -88,15 +82,12 @@ def is_encoding (text):
def url_unicode_split (url):
"""
Like urlparse.urlsplit(), but always returning unicode parts.
"""
"""Like urlparse.urlsplit(), but always returning unicode parts."""
return [unicode_safe(s) for s in urlparse.urlsplit(url)]
def unquote (s, matching=False):
"""
Remove leading and ending single and double quotes.
"""Remove leading and ending single and double quotes.
The quotes need to match if matching is True. Only one quote from each
end will be stripped.
@ -125,23 +116,19 @@ _para_win = r"(?:%(sep)s)(?:(?:%(sep)s)\s*)+" % {'sep': '\r\n'}
_para_ro = re.compile("%s|%s|%s" % (_para_mac, _para_posix, _para_win))
def get_paragraphs (text):
"""
A new paragraph is considered to start at a line which follows
"""A new paragraph is considered to start at a line which follows
one or more blank lines (lines containing nothing or just spaces).
The first line of the text also starts a paragraph.
"""
The first line of the text also starts a paragraph."""
if not text:
return []
return _para_ro.split(text)
def wrap (text, width, **kwargs):
"""
Adjust lines of text to be not longer than width. The text will be
"""Adjust lines of text to be not longer than width. The text will be
returned unmodified if width <= 0.
See textwrap.wrap() for a list of supported kwargs.
Returns text with lines no longer than given width.
"""
Returns text with lines no longer than given width."""
if width <= 0 or not text:
return text
ret = []
@ -151,18 +138,14 @@ def wrap (text, width, **kwargs):
def indent (text, indent_string=" "):
"""
Indent each line of text with the given indent string.
"""
"""Indent each line of text with the given indent string."""
lines = str(text).splitlines()
return os.linesep.join(["%s%s" % (indent_string, x) for x in lines])
return os.linesep.join("%s%s" % (indent_string, x) for x in lines)
def get_line_number (s, index):
r"""
Return the line number of s[index] or zero on errors.
Lines are assumed to be separated by the ASCII character '\n'.
"""
r"""Return the line number of s[index] or zero on errors.
Lines are assumed to be separated by the ASCII character '\n'."""
i = 0
if index < 0:
return 0
@ -175,18 +158,14 @@ def get_line_number (s, index):
def paginate (text):
"""
Print text in pages of lines.
"""
"""Print text in pages of lines."""
pydoc.pager(text)
_markup_re = re.compile("<.*?>", re.DOTALL)
def remove_markup (s):
"""
Remove all <*> html markup tags from s.
"""
"""Remove all <*> html markup tags from s."""
mo = _markup_re.search(s)
while mo:
s = s[0:mo.start()] + s[mo.end():]
@ -195,10 +174,8 @@ def remove_markup (s):
def strsize (b):
"""
Return human representation of bytes b. A negative number of bytes
raises a value error.
"""
"""Return human representation of bytes b. A negative number of bytes
raises a value error."""
if b < 0:
raise ValueError("Invalid negative byte number")
if b < 1024:
@ -217,9 +194,7 @@ def strsize (b):
def strtime (t):
"""
Return ISO 8601 formatted time.
"""
"""Return ISO 8601 formatted time."""
return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(t)) + \
strtimezone()
@ -283,8 +258,7 @@ def strduration_long (duration):
def strtimezone ():
"""
Return timezone info, %z on some platforms, but not supported on all.
"""Return timezone info, %z on some platforms, but not supported on all.
"""
if time.daylight:
zone = time.altzone
@ -295,17 +269,14 @@ def strtimezone ():
_sub_ws = re.compile(r"\s+").sub
def stripall (s):
"""
Remove all whitespace from given string.
"""
"""Remove all whitespace from given string."""
if not s:
return s
return _sub_ws("", s)
def limit (s, length=72):
"""
If the length of the string exceeds the given limit, it will be cut
"""If the length of the string exceeds the given limit, it will be cut
off and three dots will be appended.
@param s: the string to limit

View file

@ -54,9 +54,7 @@ if _has_win32process:
def set_thread_priority (prio):
"""
Set priority of this thread (and thus also for all spawned threads).
"""
"""Set priority of this thread (and thus also for all spawned threads)."""
if os.name == 'nt' and _has_win32process:
res = win32process.SetPriorityClass(
win32process.GetCurrentProcess(), _nt_prio_val[prio])
@ -68,10 +66,8 @@ def set_thread_priority (prio):
class StoppableThread (threading.Thread):
"""
Thread class with a stop() method. The thread itself has to check
regularly for the stopped() condition.
"""
"""Thread class with a stop() method. The thread itself has to check
regularly for the stopped() condition."""
def __init__ (self):
super(StoppableThread, self).__init__()

View file

@ -27,9 +27,7 @@ _trace_filter = set()
def trace_ignore (names):
"""
Add given names to trace ignore set, or clear set if names is None.
"""
"""Add given names to trace ignore set, or clear set if names is None."""
if names is None:
_trace_ignore.clear()
else:
@ -37,9 +35,8 @@ def trace_ignore (names):
def trace_filter (patterns):
"""
Add given patterns to trace filter set or clear set if patterns is None.
"""
"""Add given patterns to trace filter set or clear set if patterns is
None."""
if patterns is None:
_trace_filter.clear()
else:
@ -47,9 +44,7 @@ def trace_filter (patterns):
def _trace (frame, event, arg):
"""
Trace function calls.
"""
"""Trace function calls."""
if event in ('call', 'c_call'):
_trace_line(frame, event, arg)
elif event in ('return', 'c_return'):
@ -61,9 +56,7 @@ def _trace (frame, event, arg):
def _trace_full (frame, event, arg):
"""
Trace every executed line.
"""
"""Trace every executed line."""
if event == "line":
_trace_line(frame, event, arg)
else:
@ -72,9 +65,7 @@ def _trace_full (frame, event, arg):
def _trace_line (frame, event, arg):
"""
Print current executed line.
"""
"""Print current executed line."""
name = frame.f_globals["__name__"]
if name in _trace_ignore:
return _trace_line
@ -83,7 +74,7 @@ def _trace_line (frame, event, arg):
return _trace_line
lineno = frame.f_lineno
filename = frame.f_globals["__file__"]
if filename.endswith(".pyc") or filename.endswith(".pyo"):
if filename.endswith((".pyc", ".pyo")):
filename = filename[:-1]
line = linecache.getline(filename, lineno)
tid = thread.get_ident()
@ -93,9 +84,7 @@ def _trace_line (frame, event, arg):
def trace_on (full=False):
"""
Start tracing of the current thread (and the current thread only).
"""
"""Start tracing of the current thread (and the current thread only)."""
if full:
sys.settrace(_trace_full)
else:
@ -103,8 +92,6 @@ def trace_on (full=False):
def trace_off ():
"""
Stop tracing of the current thread (and the current thread only).
"""
"""Stop tracing of the current thread (and the current thread only)."""
sys.settrace(None)

View file

@ -74,8 +74,7 @@ is_safe_fragment = re.compile("(?i)^%s$" % _safe_fragment_pattern).match
# snatched form urlparse.py
def splitparams (path):
"""
Split off parameter part from path.
"""Split off parameter part from path.
Returns tuple (path-without-param, param)
"""
if '/' in path:
@ -88,8 +87,7 @@ def splitparams (path):
def is_numeric_port (portstr):
"""
return: integer port (== True) iff portstr is a valid port number,
"""return: integer port (== True) iff portstr is a valid port number,
False otherwise
"""
if portstr.isdigit():
@ -101,25 +99,20 @@ def is_numeric_port (portstr):
def safe_host_pattern (host):
"""
return regular expression pattern with given host for URL testing
"""
"""Return regular expression pattern with given host for URL testing."""
return "(?i)%s://%s%s(#%s)?" % \
(_safe_scheme_pattern, host, _safe_path_pattern, _safe_fragment_pattern)
# XXX better name/implementation for this function
def stripsite (url):
"""
remove scheme and host from URL. return host, newurl
"""
"""Remove scheme and host from URL. return host, newurl."""
url = urlparse.urlsplit(url)
return url[1], urlparse.urlunsplit((0, 0, url[2], url[3], url[4]))
def parse_qsl (qs, keep_blank_values=0, strict_parsing=0):
"""
Parse a query given as a string argument.
"""Parse a query given as a string argument.
@param qs: URL-encoded query string to be parsed
@type qs: string
@ -169,8 +162,7 @@ def parse_qsl (qs, keep_blank_values=0, strict_parsing=0):
def idna_encode (host):
"""
Encode hostname as internationalized domain name (IDN) according
"""Encode hostname as internationalized domain name (IDN) according
to RFC 3490.
@raise: UnicodeError if hostname is not properly IDN encoded.
"""
@ -185,9 +177,7 @@ def idna_encode (host):
def url_fix_host (urlparts):
"""
Unquote and fix hostname. Returns is_idn.
"""
"""Unquote and fix hostname. Returns is_idn."""
urlparts[1], is_idn = idna_encode(urllib.unquote(urlparts[1]).lower())
# a leading backslash in path causes urlsplit() to add the
# path components up to the first slash to host
@ -227,9 +217,7 @@ def url_fix_host (urlparts):
return is_idn
def url_fix_common_typos (url):
"""
Fix common typos in given URL like forgotten colon.
"""
"""Fix common typos in given URL like forgotten colon."""
if url.startswith("http//"):
url = "http://" + url[6:]
elif url.startswith("https//"):
@ -238,17 +226,13 @@ def url_fix_common_typos (url):
def url_fix_mailto_urlsplit (urlparts):
"""
Split query part of mailto url if found.
"""
"""Split query part of mailto url if found."""
if "?" in urlparts[2]:
urlparts[2], urlparts[3] = urlparts[2].split('?', 1)
def url_parse_query (query):
"""
Parse and re-join the given CGI query.
"""
"""Parse and re-join the given CGI query."""
if isinstance(query, unicode):
query = query.encode('iso8859-1', 'ignore')
# if ? is in the query, split it off, seen at msdn.microsoft.com
@ -272,8 +256,7 @@ def url_parse_query (query):
def url_norm (url):
"""
Normalize the given URL which must be quoted. Supports unicode
"""Normalize the given URL which must be quoted. Supports unicode
hostnames (IDNA encoding) according to RFC 3490.
@return: (normed url, idna flag)
@ -323,10 +306,8 @@ _samedir_ro = re.compile(r"/\./|/\.$")
_parentdir_ro = re.compile(r"^/(\.\./)+|/(?!\.\./)[^/]+/\.\.(/|$)")
_relparentdir_ro = re.compile(r"^(?!\.\./)[^/]+/\.\.(/|$)")
def collapse_segments (path):
"""
Remove all redundant segments from the given URL path.
Precondition: path is an unquoted url path
"""
"""Remove all redundant segments from the given URL path.
Precondition: path is an unquoted url path"""
# replace backslashes
# note: this is _against_ the specification (which would require
# backslashes to be left alone, and finally quoted with '%5C')
@ -361,9 +342,7 @@ url_is_absolute = re.compile("^[a-z]+:", re.I).match
def url_quote (url):
"""
Quote given URL.
"""
"""Quote given URL."""
if not url_is_absolute(url):
return document_quote(url)
urlparts = list(urlparse.urlsplit(url))
@ -385,19 +364,15 @@ def url_quote (url):
def url_quote_part (s, safechars='/'):
"""
Wrap urllib.quote() to support unicode strings. A unicode string
"""Wrap urllib.quote() to support unicode strings. A unicode string
is first converted to ISO-8859-1, invalid characters are ignored.
After that urllib.quote() is called.
"""
After that urllib.quote() is called."""
if isinstance(s, unicode):
s = s.encode("iso-8859-1", "ignore")
return urllib.quote(s, safechars)
def document_quote (document):
"""
Quote given document.
"""
"""Quote given document."""
doc, query = urllib.splitquery(document)
doc = url_quote_part(doc, '/=,')
if query:
@ -406,8 +381,7 @@ def document_quote (document):
def match_url (url, domainlist):
"""
Return True if host part of url matches an entry in given domain list.
"""Return True if host part of url matches an entry in given domain list.
"""
if not url:
return False
@ -415,9 +389,7 @@ def match_url (url, domainlist):
def match_host (host, domainlist):
"""
Return True if host matches an entry in given domain list.
"""
"""Return True if host matches an entry in given domain list."""
if not host:
return False
for domain in domainlist:
@ -435,8 +407,7 @@ if os.name == 'nt':
_safe_url_chars = re.escape(_nopathquote_chars + "_:.&#%?[]!")+"a-zA-Z0-9"
_safe_url_chars_ro = re.compile(r"^[%s]*$" % _safe_url_chars)
def url_needs_quoting (url):
"""
Check if url needs percent quoting. Note that the method does
"""Check if url needs percent quoting. Note that the method does
only check basic character sets, and not any other syntax.
The URL might still be syntactically incorrect even when
it is properly quoted.
@ -449,8 +420,7 @@ def url_needs_quoting (url):
def url_split (url):
"""
Split url in a tuple (scheme, hostname, port, document) where
"""Split url in a tuple (scheme, hostname, port, document) where
hostname is always lowercased.
Precondition: url is syntactically correct URI (eg has no whitespace)
"""
@ -470,8 +440,7 @@ def url_unsplit (parts):
def splitport (host, port=80):
"""
Split optional port number from host. If host has no port number,
"""Split optional port number from host. If host has no port number,
the given default port is returned.
@param host: host name

View file

@ -32,7 +32,7 @@ import linkcheck
# override optparse gettext method with the one from linkcheck.init_i18n()
optparse._ = _
# now import the rest of the linkchecker gang
import linkcheck.log
from linkcheck import log, LOG_CMDLINE
import linkcheck.i18n
import linkcheck.checker
import linkcheck.director
@ -203,8 +203,7 @@ def ensure_notroot ():
if os.name != 'posix':
return
if os.geteuid() == 0:
linkcheck.log.warn(linkcheck.LOG_CMDLINE,
_("Running as root, dropping to nobody."))
log.warn(LOG_CMDLINE, _("Running as root, dropping to nobody."))
import pwd
os.seteuid(pwd.getpwnam('nobody')[3])
@ -212,12 +211,12 @@ def ensure_notroot ():
def viewprof ():
"""Print profiling data and exit."""
if not has_pstats:
linkcheck.log.error(linkcheck.LOG_CMDLINE,
log.error(LOG_CMDLINE,
_("The `pstats' Python module is not installed,"
" therefore the --viewprof option is disabled."))
sys.exit(1)
if not os.path.isfile(_profile):
linkcheck.log.warn(linkcheck.LOG_CMDLINE,
log.warn(LOG_CMDLINE,
_("Could not find profiling file %r.") % _profile)
print >>sys.stderr, \
_("Please run linkchecker with --profile to generate it.")
@ -234,7 +233,7 @@ def try_compile_re (arg):
try:
return re.compile(arg)
except re.error, msg:
linkcheck.log.error(linkcheck.LOG_CMDLINE,
log.error(LOG_CMDLINE,
_("Syntax error in %(arg)r: %(msg)s") % {"arg": arg, "msg": msg})
sys.exit(1)
@ -526,8 +525,7 @@ if options.debug:
if _name not in allowed_debugs:
print_usage(_("Invalid debug level %(level)r") % {'level': _name})
config.init_logging(debug=options.debug)
assert None == linkcheck.log.debug(linkcheck.LOG_CMDLINE,
_("Python %(version)s on %(platform)s") % \
log.debug(LOG_CMDLINE, _("Python %(version)s on %(platform)s") % \
{"version": sys.version, "platform": sys.platform})
# read configuration files
try:
@ -536,7 +534,7 @@ try:
if os.path.isfile(options.configfile):
files.append(options.configfile)
else:
linkcheck.log.warn(linkcheck.LOG_CMDLINE,
log.warn(LOG_CMDLINE,
_("Unreadable config file: %r"), options.configfile)
config.read(files=files)
except linkcheck.LinkCheckerError, msg:
@ -547,8 +545,7 @@ if not options.allowroot:
ensure_notroot()
# test if running with -O
if options.debug and not __debug__:
linkcheck.log.warn(linkcheck.LOG_CMDLINE,
_("Running with python -O disables debugging."))
log.warn(LOG_CMDLINE, _("Running with python -O disables debugging."))
# apply commandline options and arguments to configuration
constructauth = False
do_profile = False
@ -680,15 +677,14 @@ if options.checkhtml is not None:
if options.checkcss is not None:
config["checkcss"] = options.checkcss
assert None == linkcheck.log.debug(linkcheck.LOG_CMDLINE,
"configuration: %s", pprint.pformat(config.items()))
log.debug(LOG_CMDLINE, "configuration: %s", pprint.pformat(config.items()))
# warn about sitemap loggers and verbose output
klasses = [c.__class__ for c in [config['logger']] + config['fileoutput']]
if (linkcheck.logger.gml.GMLLogger in klasses or \
linkcheck.logger.dot.DOTLogger in klasses or \
linkcheck.logger.gxml.GraphXMLLogger in klasses) and \
not config['verbose']:
linkcheck.log.warn(linkcheck.LOG_CMDLINE,
log.warn(LOG_CMDLINE,
_("Using DOT or GML loggers without verbose output"
" gives an incomplete sitemap graph."))
@ -699,7 +695,7 @@ if len(args) <= 0:
_("enter one or more URLs, separated by white-space\n--> "))
args = urls.split()
else:
linkcheck.log.warn(linkcheck.LOG_CMDLINE, _("no files or URLs given"))
log.warn(LOG_CMDLINE, _("no files or URLs given"))
# prepare checking queue
aggregate = linkcheck.director.get_aggregate(config)
@ -710,7 +706,7 @@ if options.cookiefile is not None:
aggregate.cookies.add(headers, scheme, host, path)
config["sendcookies"] = True
except StandardError:
linkcheck.log.error(linkcheck.LOG_CMDLINE,
log.error(LOG_CMDLINE,
_("Could not parse cookie file: %s"), sys.exc_info()[1])
sys.exit(1)
if options.trace:
@ -733,7 +729,7 @@ for url in args:
try:
linkcheck.add_intern_pattern(url_data, config)
except UnicodeError:
linkcheck.log.error(linkcheck.LOG_CMDLINE,
log.error(LOG_CMDLINE,
_("URL has unparsable domain name: %s"), sys.exc_info()[1])
sys.exit(1)
aggregate.urlqueue.put(url_data)
@ -750,9 +746,9 @@ if do_profile:
print >>sys.stderr, _("Canceled.")
sys.exit(1)
else:
linkcheck.log.warn(linkcheck.LOG_CMDLINE,
_("The `profile' Python module is not installed,"
" therefore the --profile option is disabled."))
log.warn(LOG_CMDLINE,
_("The `profile' Python module is not installed,"
" therefore the --profile option is disabled."))
do_profile = False
# finally, start checking

128
setup.py
View file

@ -20,10 +20,9 @@ Setup file for the distuils module.
"""
import sys
if not hasattr(sys, "version_info"):
raise SystemExit, "This program requires Python 2.4 or later."
if sys.version_info < (2, 4, 0, 'final', 0):
raise SystemExit, "This program requires Python 2.4 or later."
if not (hasattr(sys, 'version_info') or
sys.version_info < (2, 5, 0, 'final', 0)):
raise SystemExit("This program requires Python 2.5 or later.")
import os
import popen2
import platform
@ -57,17 +56,13 @@ win_bat_releases = ['NT', 'XP', '2000', '2003Server']
def normpath (path):
"""
Norm a path name to platform specific notation.
"""
"""Norm a path name to platform specific notation."""
return os.path.normpath(path)
def cnormpath (path):
"""
Norm a path name to platform specific notation, but honoring
the win_compiling flag.
"""
"""Norm a path name to platform specific notation, but honoring
the win_compiling flag."""
path = normpath(path)
if win_compiling:
# replace slashes with backslashes
@ -103,43 +98,18 @@ class MyInstall (install, object):
self.distribution.create_conf_file(data, directory=self.install_lib)
def get_outputs (self):
"""
Add the generated config file from distribution.create_conf_file()
to the list of outputs.
"""
"""Add the generated config file from distribution.create_conf_file()
to the list of outputs."""
outs = super(MyInstall, self).get_outputs()
outs.append(self.distribution.get_conf_filename(self.install_lib))
return outs
# compatibility bugfix for Python << 2.5, << 2.4.1, << 2.3.5
# XXX remove this method when depending on one of the above versions
def dump_dirs (self, msg):
if DEBUG:
from distutils.fancy_getopt import longopt_xlate
print msg + ":"
for opt in self.user_options:
opt_name = opt[0]
if opt_name[-1] == "=":
opt_name = opt_name[0:-1]
if opt_name in self.negative_opt:
opt_name = string.translate(self.negative_opt[opt_name],
longopt_xlate)
val = not getattr(self, opt_name)
else:
opt_name = string.translate(opt_name, longopt_xlate)
val = getattr(self, opt_name)
print " %s: %s" % (opt_name, val)
class MyInstallData (install_data, object):
"""
My own data installer to handle permissions.
"""
"""My own data installer to handle permissions."""
def run (self):
"""
Adjust permissions on POSIX systems.
"""
"""Adjust permissions on POSIX systems."""
super(MyInstallData, self).run()
if os.name == 'posix' and not self.dry_run:
# Make the data files we just installed world-readable,
@ -153,14 +123,10 @@ class MyInstallData (install_data, object):
class MyDistribution (distutils.dist.Distribution, object):
"""
Custom distribution class generating config file.
"""
"""Custom distribution class generating config file."""
def run_commands (self):
"""
Generate config file and run commands.
"""
"""Generate config file and run commands."""
cwd = os.getcwd()
data = []
data.append('config_dir = %r' % os.path.join(cwd, "config"))
@ -170,16 +136,12 @@ class MyDistribution (distutils.dist.Distribution, object):
super(MyDistribution, self).run_commands()
def get_conf_filename (self, directory):
"""
Get name for config file.
"""
"""Get name for config file."""
return os.path.join(directory, "_%s_configdata.py" % self.get_name())
def create_conf_file (self, data, directory=None):
"""
Create local config file from given data (list of lines) in
the directory (or current directory if not given).
"""
"""Create local config file from given data (list of lines) in
the directory (or current directory if not given)."""
data.insert(0, "# this file is automatically created by setup.py")
data.insert(0, "# -*- coding: iso-8859-1 -*-")
if directory is None:
@ -205,9 +167,7 @@ class MyDistribution (distutils.dist.Distribution, object):
class MyBdistWininst (bdist_wininst, object):
"""
Custom bdist_wininst command supporting cross compilation.
"""
"""Custom bdist_wininst command supporting cross compilation."""
def run (self):
if (not win_compiling and
@ -292,8 +252,7 @@ class MyBdistWininst (bdist_wininst, object):
def cc_supports_option (cc, option):
"""
Check if the given C compiler supports the given option.
"""Check if the given C compiler supports the given option.
@return: True if the compiler supports the option, else False
@rtype: bool
@ -316,15 +275,11 @@ def cc_remove_option (compiler, option):
class MyBuildExt (build_ext, object):
"""
Custom build extension command.
"""
"""Custom build extension command."""
def build_extensions (self):
"""
Add -std=gnu99 to build options if supported.
And compress extension libraries.
"""
"""Add -std=gnu99 to build options if supported.
And compress extension libraries."""
# For gcc >= 3 we can add -std=gnu99 to get rid of warnings.
extra = []
if self.compiler.compiler_type == 'unix':
@ -344,9 +299,7 @@ class MyBuildExt (build_ext, object):
self.compress_extensions()
def compress_extensions (self):
"""
Run UPX compression over built extension libraries.
"""
"""Run UPX compression over built extension libraries."""
# currently upx supports only .dll files
if os.name != 'nt':
return
@ -359,18 +312,14 @@ class MyBuildExt (build_ext, object):
def compress_library (upx, filename):
"""
Compresses a dynamic library file with upx (currently only .dll
files are supported).
"""
"""Compresses a dynamic library file with upx (currently only .dll
files are supported)."""
log.info("upx-compressing %s", filename)
os.system('%s -q --best "%s"' % (upx, filename))
def list_message_files (package, suffix=".po"):
"""
Return list of all found message files and their installation paths.
"""
"""Return list of all found message files and their installation paths."""
_files = glob.glob("po/*" + suffix)
_list = []
for _file in _files:
@ -382,14 +331,12 @@ def list_message_files (package, suffix=".po"):
def check_manifest ():
"""
Snatched from roundup.sf.net.
"""Snatched from roundup.sf.net.
Check that the files listed in the MANIFEST are present when the
source is unpacked.
"""
source is unpacked."""
try:
f = open('MANIFEST')
except:
except Exception:
print '\n*** SOURCE WARNING: The MANIFEST file is missing!'
return
try:
@ -405,14 +352,10 @@ def check_manifest ():
class MyBuild (build, object):
"""
Custom build command.
"""
"""Custom build command."""
def build_message_files (self):
"""
For each po/*.po, build .mo file in target locale directory.
"""
"""For each po/*.po, build .mo file in target locale directory."""
for (src, dst) in list_message_files(self.distribution.get_name()):
build_dst = os.path.join("build", dst)
self.mkpath(os.path.dirname(build_dst))
@ -427,9 +370,7 @@ class MyBuild (build, object):
class MyClean (clean, object):
"""
Custom clean command.
"""
"""Custom clean command."""
def run (self):
if self.all:
@ -443,11 +384,10 @@ class MyClean (clean, object):
class MySdist (sdist, object):
"""
Custom sdist command.
"""
"""Custom sdist command."""
def get_file_list (self):
"""Add MANIFEST to the file list."""
super(MySdist, self).get_file_list()
self.filelist.append("MANIFEST")
@ -533,8 +473,8 @@ setup (name = "linkchecker",
maintainer = myname,
maintainer_email = myemail,
url = "http://linkchecker.sourceforge.net/",
download_url = "http://sourceforge.net/project/showfiles.php" \
"?group_id=1913",
download_url = \
"http://sourceforge.net/project/showfiles.php?group_id=1913",
license = "GPL",
long_description = """Linkchecker features:
o recursive checking

20
test.py
View file

@ -585,18 +585,14 @@ class CustomTestResult(unittest._TextTestResult):
def get_tc_priv (testcase, attr):
"""
get mangled private variables of TestCase instances
"""
"""get mangled private variables of TestCase instances"""
if sys.version_info >= (2, 5, 0, 'alpha', 1):
return getattr(testcase, "_" + attr)
return getattr(testcase, "_TestCase__" + attr)
class CustomTestCase (unittest.TestCase):
"""
A test case with improved inequality test and resource support.
"""
"""A test case with improved inequality test and resource support."""
def denied_resources (self, cfg_resources):
resources = getattr(self, "needed_resources", [])
@ -617,9 +613,7 @@ class CustomTestCase (unittest.TestCase):
return
try:
self.setUp()
except KeyboardInterrupt:
raise
except:
except Exception:
result.addError(self, get_tc_priv(self, "exc_info")())
return
@ -629,16 +623,12 @@ class CustomTestCase (unittest.TestCase):
ok = True
except self.failureException:
result.addFailure(self, get_tc_priv(self, "exc_info")())
except KeyboardInterrupt:
raise
except:
except Exception:
result.addError(self, get_tc_priv(self, "exc_info")())
try:
self.tearDown()
except KeyboardInterrupt:
raise
except:
except Exception:
result.addError(self, get_tc_priv(self, "exc_info")())
ok = False
if ok: result.addSuccess(self)

View file

@ -21,8 +21,7 @@ import unittest
def make_suite (prefix, namespace):
"""
Add all TestCase classes starting with given prefix to a test suite.
"""Add all TestCase classes starting with given prefix to a test suite.
@return: test suite
@rtype: unittest.TestSuite

View file

@ -27,9 +27,7 @@ from tests import make_suite
pofiles = None
def get_pofiles ():
"""
Find all .po files in this source.
"""
"""Find all .po files in this source."""
global pofiles
if pofiles is None:
pofiles = []
@ -38,30 +36,22 @@ def get_pofiles ():
return pofiles
class TestPo (unittest.TestCase):
"""
Test .po file syntax.
"""
"""Test .po file syntax."""
needed_resources = ['posix', 'msgfmt']
def test_pos (self):
"""
Test .po files syntax.
"""
"""Test .po files syntax."""
for f in get_pofiles():
ret = os.system("msgfmt -c -o - %s > /dev/null" % f)
self.assertEquals(ret, 0, msg="PO-file syntax error in %r" % f)
class TestGTranslator (unittest.TestCase):
"""
GTranslator displays a middot · for a space. Unfortunately, it
gets copied with copy-and-paste, what a shame.
"""
"""GTranslator displays a middot · for a space. Unfortunately, it
gets copied with copy-and-paste, what a shame."""
def test_gtranslator (self):
"""
Test all pofiles for GTranslator brokenness.
"""
"""Test all pofiles for GTranslator brokenness."""
for f in get_pofiles():
fd = file(f)
try:
@ -70,9 +60,7 @@ class TestGTranslator (unittest.TestCase):
fd.close()
def check_file (self, fd, f):
"""
Test for GTranslator broken syntax.
"""
"""Test for GTranslator broken syntax."""
for line in fd:
if line.strip().startswith("#"):
continue
@ -81,8 +69,6 @@ class TestGTranslator (unittest.TestCase):
def test_suite ():
"""
Build and return a TestSuite.
"""
"""Build and return a TestSuite."""
prefix = __name__.split(".")[-1]
return make_suite(prefix, globals())