git-svn-id: https://linkchecker.svn.sourceforge.net/svnroot/linkchecker/trunk/linkchecker@2819 e7d03fd6-7b0d-0410-9947-9c21f3af8025
This commit is contained in:
calvin 2005-10-10 21:15:09 +00:00
parent e11bc21593
commit ba6a2ed4e0
7 changed files with 132 additions and 30 deletions

View file

@ -75,16 +75,18 @@ class IfConfig (object):
# increase buffer
bufsize += 8192
# loop over interface names
# XXX on *BSD, struct ifreq is not hardcoded 32, but dynamic.
ifreq_size = 32
data = buf.tostring()
iflist = []
size, ptr = struct.unpack("iP", result)
for i in range(0, size, ifreq_size):
i = 0
while i < size:
# XXX on *BSD, struct ifreq is not hardcoded 32, but dynamic.
ifreq_size = 32
ifconf = data[i:i+ifreq_size]
name, dummy = struct.unpack("16s16s", ifconf)
name, dummy = name.split('\0', 1)
iflist.append(name)
i += ifreq_size
return iflist
def getFlags (self, ifname):

View file

@ -49,6 +49,7 @@ def inet_ntoa(address):
#
# Compress the longest subsequence of 0-value chunks to ::
#
start = 0
best_start = 0
best_len = 0
last_was_zero = False

View file

@ -245,8 +245,10 @@ class Message(object):
def is_response(self, other):
"""Is other a response to self?
@rtype: bool"""
if self.id != other.id:
linkcheck.log.warn(linkcheck.LOG_DNS,
"DNS message IDs differ\n%s\n----\n%s\n----", self, other)
if other.flags & linkcheck.dns.flags.QR == 0 or \
self.id != other.id or \
linkcheck.dns.opcode.from_flags(self.flags) != \
linkcheck.dns.opcode.from_flags(other.flags):
return False

View file

@ -68,6 +68,11 @@ class AbsoluteConcatenation(linkcheck.dns.exception.DNSException):
empty name to an absolute name."""
pass
class NoParent(linkcheck.dns.exception.DNSException):
"""Raised if an attempt is made to get the parent of the root name
or the empty name."""
pass
_escaped = {
'"' : True,
'(' : True,
@ -480,6 +485,16 @@ class Name(object):
else:
return self
def parent(self):
"""Return the parent of the name.
@rtype: linkcheck.dns.name.Name object
@raises NoParent: the name is either the root name or the enpty name
and thus has no parent.
"""
if self == root or self == empty:
raise NoParent
return Name(self.labels[1:])
root = Name([''])
empty = Name([])
@ -494,6 +509,7 @@ def from_text(text, origin = root):
raise ValueError, "origin must be a Name or None"
labels = []
label = ''
edigits = 0
escaping = False
if text == '@':
text = ''

View file

@ -116,6 +116,9 @@ class Answer(object):
self.rrset = rrset
self.expiration = time.time() + min_ttl
def __str__ (self):
return str(self.rrset)
def __getattr__(self, attr):
if attr == 'name':
return self.rrset.name
@ -335,7 +338,6 @@ class Resolver(object):
self.nameservers.append('127.0.0.1')
def read_local_hosts (self):
# XXX is this default list of localhost stuff complete?
self.add_addrinfo(socket.gethostname())
# add system specific hosts for all enabled interfaces
for addr in self.read_local_ifaddrs():
@ -353,7 +355,7 @@ class Resolver(object):
import linkcheck.dns.ifconfig
ifc = linkcheck.dns.ifconfig.IfConfig()
return [ifc.getAddr(iface) for iface in ifc.getInterfaceList() \
if ifc.isUp(iface) and not ifc.isLoopback(iface)]
if ifc.isUp(iface)]
def add_addrinfo (self, host, interface=False):
try:
@ -500,8 +502,14 @@ class Resolver(object):
def _compute_timeout(self, start):
now = time.time()
if now < start:
# Time going backwards is bad. Just give up.
raise Timeout
if start - now > 1:
# Time going backwards is bad. Just give up.
raise Timeout
else:
# Time went backwards, but only a little. This can
# happen, e.g. under vmware with older linux kernels.
# Pretend it didn't happen.
now = start
duration = now - start
if duration >= self.lifetime:
raise Timeout
@ -673,17 +681,50 @@ class Resolver(object):
default_resolver = None
def get_default_resolver ():
global default_resolver
if default_resolver is None:
default_resolver = Resolver()
def query(qname, rdtype=linkcheck.dns.rdatatype.A, rdclass=linkcheck.dns.rdataclass.IN,
tcp=False):
tcp=False, resolver=None):
"""Query nameservers to find the answer to the question.
This is a convenience function that uses the default resolver
object to make the query.
@see: L{linkcheck.dns.resolver.Resolver.query} for more information on the
parameters."""
global default_resolver
if default_resolver is None:
default_resolver = Resolver()
linkcheck.log.debug(linkcheck.LOG_DNS,
"Query %s %s %s", qname, rdtype, rdclass)
return default_resolver.query(qname, rdtype, rdclass, tcp)
linkcheck.log.debug(linkcheck.LOG_DNS, "Query %s %s %s", qname, rdtype, rdclass)
if resolver is None:
resolver = get_default_resolver()
return resolver.query(qname, rdtype, rdclass, tcp)
def zone_for_name(name, rdclass=linkcheck.dns.rdataclass.IN,
tcp=False, resolver=None):
"""Find the name of the zone which contains the specified name.
@param name: the query name
@type name: absolute linkcheck.dns.name.Name object or string
@ivar rdclass: The query class
@type rdclass: int
@param tcp: use TCP to make the query (default is False).
@type tcp: bool
@param resolver: the resolver to use
@type resolver: linkcheck.dns.resolver.Resolver object or None
@rtype: dns.name.Name"""
if isinstance(name, str):
name = linkcheck.dns.name.from_text(name, linkcheck.dns.name.root)
if resolver is None:
resolver = get_default_resolver()
if not name.is_absolute():
raise NotAbsolute, name
while 1:
try:
answer = resolver.query(name, linkcheck.dns.rdatatype.SOA, rdclass, tcp)
return name
except (linkcheck.dns.resolver.NXDOMAIN, linkcheck.dns.resolver.NoAnswer):
try:
name = name.parent()
except NoParent:
raise NoRootSoa

View file

@ -593,6 +593,28 @@ class TestName (unittest.TestCase):
(n, cused) = linkcheck.dns.name.from_wire(w, 0)
self.assertRaises(linkcheck.dns.name.BadLabelType, bad)
def testParent1(self):
n = linkcheck.dns.name.from_text('foo.bar.')
self.failUnless(n.parent() == linkcheck.dns.name.from_text('bar.'))
self.failUnless(n.parent().parent() == linkcheck.dns.name.root)
def testParent2(self):
n = linkcheck.dns.name.from_text('foo.bar', None)
self.failUnless(n.parent() == linkcheck.dns.name.from_text('bar', None))
self.failUnless(n.parent().parent() == linkcheck.dns.name.empty)
def testParent3(self):
def bad():
n = linkcheck.dns.name.root
n.parent()
self.failUnlessRaises(linkcheck.dns.name.NoParent, bad)
def testParent4(self):
def bad():
n = linkcheck.dns.name.empty
n.parent()
self.failUnlessRaises(linkcheck.dns.name.NoParent, bad)
def test_suite ():
return unittest.makeSuite(TestName)

48
test.py
View file

@ -63,8 +63,7 @@ Options:
--search-in dir limit directory tree walk to dir (optimisation)
--immediate-errors show errors as soon as they happen (default)
--delayed-errors show errors after all unit tests were run
--resource name enable given resource; currently only 'network'
is allowed
--resource name enable given resource
"""
#
# This script borrows ideas from Zope 3's test runner heavily. It is smaller
@ -86,8 +85,6 @@ from sets import Set
__metaclass__ = type
Resources = ['network']
class TestSkipped (Exception):
"""Test skipped.
@ -190,6 +187,16 @@ def walk_with_symlinks(top, func, arg):
walk_with_symlinks(name, func, arg)
def has_path_component (path, name):
drive, path = os.path.splitdrive(path)
head, tail = os.path.split(path)
while head and tail:
if tail == name:
return True
head, tail = os.path.split(head)
return False
def get_test_files(cfg):
"""Return a list of test module filenames."""
matcher = compile_matcher(cfg.pathname_regex)
@ -203,17 +210,20 @@ def get_test_files(cfg):
def visit(ignored, dir, files):
# Ignore files starting with a dot.
# Do not not descend into subdirs containing a dot.
# Ignore versioning system files
remove = []
for idx, file in enumerate(files):
if file.startswith('.'):
remove.append(idx)
elif '.' in file and os.path.isdir(os.path.join(dir, file)):
remove.append(idx)
elif file in ["CVS", ".svn"]:
remove.append(idx)
remove.reverse()
for idx in remove:
del files[idx]
# Skip non-test directories, but look for tests.py and/or ftests.py
if os.path.basename(dir) != test_name:
if not has_path_component(dir, test_name):
if test_name + '.py' in files:
path = os.path.join(dir, test_name + '.py')
if matcher(path[baselen:]):
@ -705,7 +715,7 @@ class CustomTestResult(unittest._TextTestResult):
self.__super_printErrors()
def printSkipped (self):
self.printErrorList("SKIP", self.skipped)
self.printErrorList("SKIPPED", self.skipped)
def formatError(self, err):
return "".join(format_exception(basedir=self.cfg.basedir,
@ -726,6 +736,21 @@ class CustomTestResult(unittest._TextTestResult):
w(self.formatError(err))
w()
def printSkip (self, kind, test, err):
w = self.stream.writeln
if self.cfg.colorize:
c = colorize
else:
c = lambda texttype, text: text
w()
w(c('separator', self.separator1))
kind = c('fail', kind)
description = c('longtestname', self.getDescription(test))
w("%s: %s" % (kind, description))
w(c('separator', self.separator2))
w(str(err[1]))
w()
def addFailure(self, test, err):
if self.cfg.immediate_errors:
self.printTraceback("FAIL", test, err)
@ -745,9 +770,7 @@ class CustomTestResult(unittest._TextTestResult):
def addSkipped(self, test, err):
if self.cfg.immediate_errors:
self.printTraceback("SKIPPED", test, err)
if self.cfg.postmortem:
pdb.post_mortem(sys.exc_info()[2])
self.printSkip("SKIP", test, err)
self.skipped.append((test, self.formatError(err)))
def printErrorList(self, flavour, errors):
@ -782,7 +805,7 @@ class CustomTestRunner(unittest.TextTestRunner):
__super_init = __super.__init__
__super_run = __super.run
def __init__(self, cfg, hooks=None, stream=sys.stderr, count=None):
def __init__(self, cfg, hooks=None, stream=sys.stdout, count=None):
self.__super_init(verbosity=cfg.verbosity, stream=stream)
self.cfg = cfg
if hooks is not None:
@ -915,11 +938,6 @@ def main(argv):
elif k == '--coverage':
cfg.coverage = True
elif k == '--resource':
if v not in Resources:
print >> sys.stderr, ('%s: argument to --resource (%s) must'
' be one of %s'
% (argv[0], v, str(Resources)))
return 1
cfg.resources.append(v)
elif k == '--level':
try: