Updated DNS code

git-svn-id: https://linkchecker.svn.sourceforge.net/svnroot/linkchecker/trunk/linkchecker@3898 e7d03fd6-7b0d-0410-9947-9c21f3af8025
This commit is contained in:
calvin 2009-01-10 18:05:13 +00:00
parent 3b193bda35
commit 4ffe29615e
31 changed files with 1027 additions and 223 deletions

View file

@ -23,5 +23,5 @@ sources:
Changed: resolver.py
Added: ifconfig.py
* added search patters for domain names
* added search patterns for domain names
Changed: resolver.py

View file

@ -18,6 +18,7 @@
__all__ = [
'dnssec',
'e164',
'exception',
'flags',
'inet',
@ -36,6 +37,7 @@ __all__ = [
'rdatatype',
'renderer',
'resolver',
'reversename',
'rrset',
'set',
'tokenizer',

79
linkcheck/dns/e164.py Normal file
View file

@ -0,0 +1,79 @@
# Copyright (C) 2006, 2007 Nominum, Inc.
#
# Permission to use, copy, modify, and distribute this software and its
# documentation for any purpose with or without fee is hereby granted,
# provided that the above copyright notice and this permission notice
# appear in all copies.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND NOMINUM DISCLAIMS ALL WARRANTIES
# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NOMINUM BE LIABLE FOR
# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
"""DNS E.164 helpers
@var public_enum_domain: The DNS public ENUM domain, e164.arpa.
@type public_enum_domain: linkcheck.dns.name.Name object
"""
import linkcheck.dns.exception
import linkcheck.dns.name
import linkcheck.dns.resolver
public_enum_domain = linkcheck.dns.name.from_text('e164.arpa.')
def from_e164(text, origin=public_enum_domain):
"""Convert an E.164 number in textual form into a Name object whose
value is the ENUM domain name for that number.
@param text: an E.164 number in textual form.
@type text: str
@param origin: The domain in which the number should be constructed.
The default is e164.arpa.
@type: linkcheck.dns.name.Name object or None
@rtype: linkcheck.dns.name.Name object
"""
parts = [d for d in text if d.isdigit()]
parts.reverse()
return linkcheck.dns.name.from_text('.'.join(parts), origin=origin)
def to_e164(name, origin=public_enum_domain, want_plus_prefix=True):
"""Convert an ENUM domain name into an E.164 number.
@param name: the ENUM domain name.
@type name: linkcheck.dns.name.Name object.
@param origin: A domain containing the ENUM domain name. The
name is relativized to this domain before being converted to text.
@type: linkcheck.dns.name.Name object or None
@param want_plus_prefix: if True, add a '+' to the beginning of the
returned number.
@rtype: str
"""
if not origin is None:
name = name.relativize(origin)
dlabels = [d for d in name.labels if (d.isdigit() and len(d) == 1)]
if len(dlabels) != len(name.labels):
raise linkcheck.dns.exception.SyntaxError, 'non-digit labels in ENUM domain name'
dlabels.reverse()
text = ''.join(dlabels)
if want_plus_prefix:
text = '+' + text
return text
def query(number, domains, resolver=None):
"""Look for NAPTR RRs for the specified number in the specified domains.
e.g. lookup('16505551212', ['e164.dnspython.org.', 'e164.arpa.'])
"""
if resolver is None:
resolver = linkcheck.dns.resolver.get_default_resolver()
for domain in domains:
if isinstance(domain, (str, unicode)):
domain = linkcheck.dns.name.from_text(domain)
qname = linkcheck.dns.e164.from_e164(number, domain)
try:
return linkcheck.dns.resolver.query(qname, 'NAPTR')
except linkcheck.dns.resolver.NXDOMAIN:
pass
raise linkcheck.dns.resolver.NXDOMAIN

View file

@ -89,3 +89,20 @@ def inet_ntop(family, address):
return linkcheck.dns.ipv6.inet_ntoa(address)
else:
raise NotImplementedError
def is_multicast(text):
"""Is the textual-form network address a multicast address?
@param text: the textual address
@raises ValueError: the address family cannot be determined from the input.
@rtype: bool
"""
try:
first = ord(linkcheck.dns.ipv4.inet_aton(text)[0])
return (first >= 224 and first <= 239)
except:
try:
first = ord(linkcheck.dns.ipv6.inet_aton(text)[0])
return (first == 255)
except:
raise ValueError

View file

@ -14,6 +14,8 @@
# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
"""IPv6 helper functions."""
import re
import linkcheck.dns.exception

View file

@ -14,6 +14,8 @@
# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
"""DNS Messages"""
from cStringIO import StringIO
import random
import struct
@ -120,6 +122,10 @@ class Message(object):
message sequence? This variable is used when validating TSIG signatures
on messages which are part of a zone transfer.
@type first: bool
@ivar index: An index of rrsets in the message. The index key is
(section, name, rdclass, rdtype, covers, deleting). Indexing can be
disabled by setting the index to None.
@type index: dict
"""
def __init__(self, id=None):
@ -135,6 +141,7 @@ class Message(object):
self.edns = -1
self.ednsflags = 0
self.payload = 0
self.request_payload = 0
self.keyring = None
self.keyname = None
self.request_mac = ''
@ -149,6 +156,7 @@ class Message(object):
self.had_tsig = False
self.multi = False
self.first = True
self.index = {}
def __repr__(self):
return '<DNS message, ID %r>' % self.id
@ -200,11 +208,11 @@ class Message(object):
print >> s, ';ADDITIONAL'
for rrset in self.additional:
print >> s, rrset.to_text(origin, relativize, **kw)
#
# We strip off the final \n so the caller can print the result without
# doing weird things to get around eccentricities in Python print
# formatting
#
return s.getvalue()[:-1]
def __eq__(self, other):
@ -246,6 +254,7 @@ class Message(object):
"""Is other a response to self?
@rtype: bool"""
if other.flags & linkcheck.dns.flags.QR == 0 or \
self.id != other.id or \
linkcheck.dns.opcode.from_flags(self.flags) != \
linkcheck.dns.opcode.from_flags(other.flags):
return False
@ -262,6 +271,18 @@ class Message(object):
return False
return True
def section_number(self, section):
if section is self.question:
return 0
elif section is self.answer:
return 1
elif section is self.authority:
return 2
elif section is self.additional:
return 3
else:
raise ValueError, 'unknown section'
def find_rrset(self, section, name, rdclass, rdtype,
covers=linkcheck.dns.rdatatype.NONE, deleting=None, create=False,
force_unique=False):
@ -289,14 +310,23 @@ class Message(object):
@raises KeyError: the RRset was not found and create was False
@rtype: linkcheck.dns.rrset.RRset object"""
key = (self.section_number(section),
name, rdclass, rdtype, covers, deleting)
if not force_unique:
for rrset in section:
if rrset.match(name, rdclass, rdtype, covers, deleting):
if not self.index is None:
rrset = self.index.get(key)
if not rrset is None:
return rrset
else:
for rrset in section:
if rrset.match(name, rdclass, rdtype, covers, deleting):
return rrset
if not create:
raise KeyError
rrset = linkcheck.dns.rrset.RRset(name, rdclass, rdtype, covers, deleting)
section.append(rrset)
if not self.index is None:
self.index[key] = rrset
return rrset
def get_rrset(self, section, name, rdclass, rdtype,
@ -334,7 +364,7 @@ class Message(object):
rrset = None
return rrset
def to_wire(self, origin=None, max_size=65535, **kw):
def to_wire(self, origin=None, max_size=0, **kw):
"""Return a string containing the message in DNS compressed wire
format.
@ -343,12 +373,23 @@ class Message(object):
@param origin: The origin to be appended to any relative names.
@type origin: linkcheck.dns.name.Name object
@param max_size: The maximum size of the wire format output.
@param max_size: The maximum size of the wire format output; default
is 0, which means 'the message's request payload, if nonzero, or
65536'.
@type max_size: int
@raises linkcheck.dns.exception.TooBig: max_size was exceeded
@rtype: string
"""
if max_size == 0:
if self.request_payload != 0:
max_size = self.request_payload
else:
max_size = 65535
if max_size < 512:
max_size = 512
elif max_size > 65535:
max_size = 65535
r = linkcheck.dns.renderer.Renderer(self.id, self.flags, max_size, origin)
for rrset in self.question:
r.add_question(rrset.name, rrset.rdtype, rrset.rdclass)
@ -396,7 +437,7 @@ class Message(object):
if keyname is None:
self.keyname = self.keyring.keys()[0]
else:
if isinstance(keyname, str):
if isinstance(keyname, basestring):
keyname = linkcheck.dns.name.from_text(keyname)
self.keyname = keyname
self.fudge = fudge
@ -407,23 +448,51 @@ class Message(object):
self.tsig_error = tsig_error
self.other_data = other_data
def use_edns(self, edns, ednsflags, payload):
def use_edns(self, edns=0, ednsflags=0, payload=1280, request_payload=None):
"""Configure EDNS behavior.
@param edns: The EDNS level to use. Specifying None or -1 means
'do not use EDNS'.
@type edns: int or None
@param edns: The EDNS level to use. Specifying None, False, or -1
means 'do not use EDNS', and in this case the other parameters are
ignored. Specifying True is equivalent to specifying 0, i.e. 'use
EDNS0'.
@type edns: int or bool or None
@param ednsflags: EDNS flag values.
@type ednsflags: int
@param payload: The EDNS sender's payload field, which is the maximum
size of UDP datagram the sender can handle.
@type payload: int
@param request_payload: The EDNS payload size to use when sending
this message. If not specified, defaults to the value of payload.
@type request_payload: int or None
@see: RFC 2671
"""
if edns is None:
if edns is None or edns is False:
edns = -1
if edns is True:
edns = 0
if request_payload is None:
request_payload = payload
if edns < 0:
ednsflags = 0
payload = 0
request_payload = 0
self.edns = edns
self.ednsflags = ednsflags
self.payload = payload
self.request_payload = request_payload
def want_dnssec(self, wanted=True):
"""Enable or disable 'DNSSEC desired' flag in requests.
@param wanted: Is DNSSEC desired? If True, EDNS is enabled if
required, and then the DO bit is set. If False, the DO bit is
cleared if EDNS is enabled.
@type wanted: bool
"""
if wanted:
if self.edns < 0:
self.use_edns()
self.ednsflags |= linkcheck.dns.flags.DO
elif self.edns >= 0:
self.ednsflags &= ~linkcheck.dns.flags.DO
def rcode(self):
"""Return the rcode.
@ -890,7 +959,8 @@ def from_file(f):
return m
def make_query(qname, rdtype, rdclass = linkcheck.dns.rdataclass.IN):
def make_query(qname, rdtype, rdclass = linkcheck.dns.rdataclass.IN,
use_edns=None, want_dnssec=False):
"""Make a query message.
The query name, type, and class may all be specified either
@ -905,8 +975,16 @@ def make_query(qname, rdtype, rdclass = linkcheck.dns.rdataclass.IN):
@type rdtype: int
@param rdclass: The desired rdata class; the default is class IN.
@type rdclass: int
@rtype: linkcheck.dns.message.Message object"""
if isinstance(qname, str):
@rtype: linkcheck.dns.message.Message object
@param use_edns: The EDNS level to use; the default is None (no EDNS).
See the description of dns.message.Message.use_edns() for the possible
values for use_edns and their meanings.
@type use_edns: int or bool or None
@param want_dnssec: Should the query indicate that DNSSEC is desired?
@type want_dnssec: bool
@rtype: dns.message.Message object"""
if isinstance(qname, basestring):
qname = linkcheck.dns.name.from_text(qname)
if isinstance(rdtype, str):
rdtype = linkcheck.dns.rdatatype.from_text(rdtype)
@ -916,6 +994,8 @@ def make_query(qname, rdtype, rdclass = linkcheck.dns.rdataclass.IN):
m.flags |= linkcheck.dns.flags.RD
m.find_rrset(m.question, qname, rdclass, rdtype, create=True,
force_unique=True)
m.use_edns(use_edns)
m.want_dnssec(want_dnssec)
return m

View file

@ -22,10 +22,11 @@
@type empty: linkcheck.dns.name.Name object
"""
import string
from cStringIO import StringIO
import struct
import sys
import encodings.idna
import linkcheck.dns.exception
NAMERELN_NONE = 0
@ -321,7 +322,28 @@ class Name(object):
l = self.labels[:-1]
else:
l = self.labels
s = string.join(map(_escapify, l), '.')
s = '.'.join(map(_escapify, l))
return s
def to_unicode(self, omit_final_dot = False):
"""Convert name to Unicode text format.
IDN ACE lables are converted to Unicode.
@param omit_final_dot: If True, don't emit the final dot (denoting the
root label) for absolute names. The default is False.
@rtype: string
"""
if len(self.labels) == 0:
return u'@'
if len(self.labels) == 1 and self.labels[0] == '':
return u'.'
if omit_final_dot and self.is_absolute():
l = self.labels[:-1]
else:
l = self.labels
s = u'.'.join([encodings.idna.ToUnicode(_escapify(x)) for x in l])
return s
def to_digestable(self, origin=None):
@ -352,8 +374,9 @@ class Name(object):
"""Convert name to wire format, possibly compressing it.
@param file: the file where the compressed name is emitted (typically
a cStringIO file)
@type file: file
a cStringIO file) If None, a string containing the wire name
will be returned.
@type file: file or None
@param compress: The compression table. If None (the default) names
will not be compressed.
@type compress: dict
@ -365,6 +388,12 @@ class Name(object):
if it is missing, then this exception is raised
"""
if file is None:
file = StringIO()
want_return = True
else:
want_return = False
if not self.is_absolute():
if origin is None or not origin.is_absolute():
raise NeedAbsoluteNameOrOrigin
@ -384,7 +413,7 @@ class Name(object):
value = 0xc000 + pos
s = struct.pack('!H', value)
file.write(s)
return
break
else:
if not compress is None and len(n) > 1:
pos = file.tell()
@ -394,6 +423,8 @@ class Name(object):
file.write(chr(l))
if l > 0:
file.write(label)
if want_return:
return file.getvalue()
def __len__(self):
"""The length of the name (in labels).
@ -498,6 +529,68 @@ class Name(object):
root = Name([''])
empty = Name([])
def from_unicode(text, origin = root):
"""Convert unicode text into a Name object.
Lables are encoded in IDN ACE form.
@rtype: dns.name.Name object
"""
if not isinstance(text, unicode):
raise ValueError, "input to from_unicode() must be a unicode string"
if not (origin is None or isinstance(origin, Name)):
raise ValueError, "origin must be a Name or None"
labels = []
label = u''
escaping = False
edigits = 0
total = 0
if text == u'@':
text = u''
if text:
if text == u'.':
return Name(['']) # no Unicode "u" on this constant!
for c in text:
if escaping:
if edigits == 0:
if c.isdigit():
total = int(c)
edigits += 1
else:
label += c
escaping = False
else:
if not c.isdigit():
raise BadEscape
total *= 10
total += int(c)
edigits += 1
if edigits == 3:
escaping = False
label += chr(total)
elif c == u'.' or c == u'\u3002' or \
c == u'\uff0e' or c == u'\uff61':
if len(label) == 0:
raise EmptyLabel
labels.append(encodings.idna.ToASCII(label))
label = u''
elif c == u'\\':
escaping = True
edigits = 0
total = 0
else:
label += c
if escaping:
raise BadEscape
if len(label) > 0:
labels.append(encodings.idna.ToASCII(label))
else:
labels.append('')
if (len(labels) == 0 or labels[-1] != '') and not origin is None:
labels.extend(list(origin.labels))
return Name(labels)
def from_text(text, origin = root):
"""Convert text into a Name object.
@rtype: linkcheck.dns.name.Name object
@ -505,6 +598,10 @@ def from_text(text, origin = root):
if not isinstance(text, str):
raise ValueError, "input to from_text() must be a byte string"
if isinstance(text, unicode):
return from_unicode(text, origin)
else:
raise ValueError, "input to from_text() must be a string"
if not (origin is None or isinstance(origin, Name)):
raise ValueError, "origin must be a Name or None"
labels = []

View file

@ -47,18 +47,25 @@ def _compute_expiration(timeout):
def _wait_for(ir, iw, ix, expiration):
if expiration is None:
timeout = None
else:
timeout = expiration - time.time()
if timeout <= 0.0:
done = False
while not done:
if expiration is None:
timeout = None
else:
timeout = expiration - time.time()
if timeout <= 0.0:
raise linkcheck.dns.exception.Timeout
try:
if timeout is None:
(r, w, x) = select.select(ir, iw, ix)
else:
(r, w, x) = select.select(ir, iw, ix, timeout)
except select.error, e:
if e.args[0] != errno.EINTR:
raise e
done = True
if len(r) == 0 and len(w) == 0 and len(x) == 0:
raise linkcheck.dns.exception.Timeout
if timeout is None:
(r, w, x) = select.select(ir, iw, ix)
else:
(r, w, x) = select.select(ir, iw, ix, timeout)
if len(r) == 0 and len(w) == 0 and len(x) == 0:
raise linkcheck.dns.exception.Timeout
def _wait_for_readable(s, expiration):
_wait_for([s], [], [s], expiration)
@ -68,7 +75,8 @@ def _wait_for_writable(s, expiration):
_wait_for([], [s], [s], expiration)
def udp(q, where, timeout=None, port=53, af=None, source=None, source_port=0):
def udp(q, where, timeout=None, port=53, af=None, source=None, source_port=0,
ignore_unexpected=False):
"""Return the response obtained after sending a query via UDP.
@param q: the query
@ -90,6 +98,9 @@ def udp(q, where, timeout=None, port=53, af=None, source=None, source_port=0):
@param source_port: The port from which to send the message.
The default is 0.
@type source_port: int
@param ignore_unexpected: If True, ignore responses from unexpected
sources. The default is False.
@type ignore_unexpected: bool
"""
wire = q.to_wire()
if af is None:
@ -113,12 +124,19 @@ def udp(q, where, timeout=None, port=53, af=None, source=None, source_port=0):
s.bind(source)
_wait_for_writable(s, expiration)
s.sendto(wire, destination)
_wait_for_readable(s, expiration)
(wire, from_address) = s.recvfrom(65535)
while 1:
_wait_for_readable(s, expiration)
(wire, from_address) = s.recvfrom(65535)
if from_address == destination or \
(linkcheck.dns.inet.is_multicast(where) and \
from_address[1] == destination[1]):
break
if not ignore_unexpected:
raise UnexpectedSource, \
'got a response from %s instead of %s' % (from_address,
destination)
finally:
s.close()
if from_address != destination:
raise UnexpectedSource
r = linkcheck.dns.message.from_wire(wire, keyring=q.keyring, request_mac=q.mac)
if not q.is_response(r):
raise BadResponse
@ -331,23 +349,23 @@ def xfr (where, zone, rdtype=linkcheck.dns.rdatatype.AXFR,
if soa_rrset is None:
if not r.answer or r.answer[0].name != oname:
raise linkcheck.dns.exception.FormError, "first RRset is not an SOA"
answer_index = 1
rrset = r.answer[0]
if rrset.rdtype != linkcheck.dns.rdatatype.SOA:
raise linkcheck.dns.exception.FormError
answer_index = 1
soa_rrset = rrset.copy()
if rdtype == linkcheck.dns.rdatatype.IXFR:
if soa_rrset[0].serial == serial:
#
# We're already up-to-date.
#
done = True
else:
expecting_SOA = True
#
# Process SOAs in the answer section (other than the initial
# SOA in the first message).
#
for rrset in r.answer[answer_index:]:
if done:
raise linkcheck.dns.exception.FormError, "answers after final SOA"
@ -362,11 +380,11 @@ def xfr (where, zone, rdtype=linkcheck.dns.rdatatype.AXFR,
if rrset == soa_rrset and not delete_mode:
done = True
elif expecting_SOA:
#
# We made an IXFR request and are expecting another
# SOA RR, but saw something else, so this must be an
# AXFR response.
#
rdtype = linkcheck.dns.rdatatype.AXFR
expecting_SOA = False
if done and q.keyring and not r.had_tsig:

View file

@ -171,6 +171,14 @@ class Rdata(object):
raise NotImplementedError
def validate(self):
"""Check that the current contents of the rdata's fields are
valid. If you change an rdata by assigning to its fields,
it is a good idea to call validate() when you are done making
changes.
"""
linkcheck.dns.rdata.from_text(self.rdclass, self.rdtype, self.to_text())
def __repr__(self):
covers = self.covers()
if covers == linkcheck.dns.rdatatype.NONE:
@ -401,11 +409,11 @@ def from_text(rdclass, rdtype, tok, origin = None, relativize = True):
tok.unget(token)
if token[0] == linkcheck.dns.tokenizer.IDENTIFIER and \
token[1] == r'\#':
#
# Known type using the generic syntax. Extract the
# wire form from the generic syntax, and then run
# from_wire on it.
#
rdata = GenericRdata.from_text(rdclass, rdtype, tok, origin,
relativize)
return from_wire(rdclass, rdtype, rdata.data, 0, len(rdata.data),

View file

@ -26,6 +26,9 @@ import linkcheck.dns.rdataclass
import linkcheck.dns.rdata
import linkcheck.dns.set
# define SimpleSet here for backwards compatibility
SimpleSet = linkcheck.dns.set.Set
class DifferingCovers(linkcheck.dns.exception.DNSException):
"""Raised if an attempt is made to add a SIG/RRSIG whose covered type
is not the same as that of the other rdatas in the rdataset."""
@ -100,12 +103,12 @@ class Rdataset(linkcheck.dns.set.Set):
@param ttl: The TTL
@type ttl: int"""
#
# If we're adding a signature, do some special handling to
# check that the signature covers the same type as the
# other rdatas in this rdataset. If this is the first rdata
# in the set, initialize the covers field.
#
if self.rdclass != rd.rdclass or self.rdtype != rd.rdtype:
raise IncompatibleTypes
if not ttl is None:
@ -194,11 +197,11 @@ class Rdataset(linkcheck.dns.set.Set):
else:
rdclass = self.rdclass
if len(self) == 0:
#
# Empty rdatasets are used for the question section, and in
# some dynamic updates, so we don't need to print out the TTL
# (which is meaningless anyway).
#
print >> s, '%s%s%s %s' % (ntext, pad,
linkcheck.dns.rdataclass.to_text(rdclass),
linkcheck.dns.rdatatype.to_text(self.rdtype))
@ -208,9 +211,9 @@ class Rdataset(linkcheck.dns.set.Set):
(ntext, pad, self.ttl, linkcheck.dns.rdataclass.to_text(rdclass),
linkcheck.dns.rdatatype.to_text(self.rdtype),
rd.to_text(origin=origin, relativize=relativize, **kw))
#
# We strip off the final \n for the caller's convenience in printing
#
return s.getvalue()[:-1]
def to_wire(self, name, file, compress=None, origin=None,

View file

@ -72,9 +72,12 @@ OPT = 41
APL = 42
DS = 43
SSHFP = 44
IPSECKEY = 45
RRSIG = 46
NSEC = 47
DNSKEY = 48
DHCID = 49
SPF = 99
UNSPEC = 103
TKEY = 249
TSIG = 250
@ -126,9 +129,12 @@ _by_text = {
'APL' : APL,
'DS' : DS,
'SSHFP' : SSHFP,
'IPSECKEY' : IPSECKEY,
'RRSIG' : RRSIG,
'NSEC' : NSEC,
'DNSKEY' : DNSKEY,
'DHCID' : DHCID,
'SPF' : SPF,
'UNSPEC' : UNSPEC,
'TKEY' : TKEY,
'TSIG' : TSIG,

View file

@ -59,7 +59,7 @@ def _tuple_to_float(what):
value += float(what[1]) / 60.0
value += float(what[2]) / 3600.0
value += float(what[3]) / 3600000.0
return value
return sign * value
def _encode_size(what, desc):
what = long(what);

View file

@ -0,0 +1,22 @@
# Copyright (C) 2006, 2007 Nominum, Inc.
#
# Permission to use, copy, modify, and distribute this software and its
# documentation for any purpose with or without fee is hereby granted,
# provided that the above copyright notice and this permission notice
# appear in all copies.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND NOMINUM DISCLAIMS ALL WARRANTIES
# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NOMINUM BE LIABLE FOR
# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
import linkcheck.dns.rdtypes.txtbase
class SPF(linkcheck.dns.rdtypes.txtbase.TXTBase):
"""SPF record
@see: RFC 4408"""
pass

View file

@ -14,73 +14,8 @@
# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
import linkcheck.dns.exception
import linkcheck.dns.rdata
import linkcheck.dns.tokenizer
import linkcheck.dns.rdtypes.txtbase
class TXT(linkcheck.dns.rdata.Rdata):
"""TXT record
@ivar strings: the text strings
@type strings: list of string
@see: RFC 1035"""
__slots__ = ['strings']
def __init__(self, rdclass, rdtype, strings):
super(TXT, self).__init__(rdclass, rdtype)
if isinstance(strings, str):
strings = [ strings ]
self.strings = strings[:]
def to_text(self, origin=None, relativize=True, **kw):
txt = ''
prefix = ''
for s in self.strings:
txt += '%s"%s"' % (prefix, linkcheck.dns.rdata._escapify(s))
prefix = ' '
return txt
def from_text(cls, rdclass, rdtype, tok, origin = None, relativize = True):
strings = []
while 1:
(ttype, s) = tok.get()
if ttype == linkcheck.dns.tokenizer.EOL or ttype == linkcheck.dns.tokenizer.EOF:
break
if ttype != linkcheck.dns.tokenizer.QUOTED_STRING:
raise linkcheck.dns.exception.DNSSyntaxError, "expected a quoted string"
if len(s) > 255:
raise linkcheck.dns.exception.DNSSyntaxError, "string too long"
strings.append(s)
if len(strings) == 0:
raise linkcheck.dns.exception.UnexpectedEnd
return cls(rdclass, rdtype, strings)
from_text = classmethod(from_text)
def to_wire(self, file, compress = None, origin = None):
for s in self.strings:
l = len(s)
assert l < 256
byte = chr(l)
file.write(byte)
file.write(s)
def from_wire(cls, rdclass, rdtype, wire, current, rdlen, origin = None):
strings = []
while rdlen > 0:
l = ord(wire[current])
current += 1
rdlen -= 1
if l > rdlen:
raise linkcheck.dns.exception.FormError
s = wire[current : current + l]
current += l
rdlen -= l
strings.append(s)
return cls(rdclass, rdtype, strings)
from_wire = classmethod(from_wire)
def _cmp(self, other):
return cmp(self.strings, other.strings)
class TXT(linkcheck.dns.rdtypes.txtbase.TXTBase):
"""TXT record"""
pass

View file

@ -21,6 +21,7 @@ __all__ = [
'CERT',
'CNAME',
'DNAME',
'DNSKEY',
'DS',
'GPOS',
'HINFO',
@ -29,16 +30,16 @@ __all__ = [
'LOC',
'MX',
'NS',
'NSEC',
'NXT',
'PTR',
'RP',
'RRSIG',
'RT',
'SIG',
'SOA',
'SPF',
'SSHFP',
'TXT',
'X25',
'RRSIG',
'NSEC',
'DNSKEY',
'SSHFP',
]

View file

@ -0,0 +1,61 @@
# -*- coding: iso-8859-1 -*-
# Copyright (C) 2006, 2007 Nominum, Inc.
#
# Permission to use, copy, modify, and distribute this software and its
# documentation for any purpose with or without fee is hereby granted,
# provided that the above copyright notice and this permission notice
# appear in all copies.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND NOMINUM DISCLAIMS ALL WARRANTIES
# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NOMINUM BE LIABLE FOR
# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
import linkcheck.dns.exception
class DHCID(linkcheck.dns.rdata.Rdata):
"""DHCID record
@ivar data: the data (the content of the RR is opaque as far as the
DNS is concerned)
@type data: string
@see: RFC 4701"""
__slots__ = ['data']
def __init__(self, rdclass, rdtype, data):
super(DHCID, self).__init__(rdclass, rdtype)
self.data = data
def to_text(self, origin=None, relativize=True, **kw):
return linkcheck.dns.rdata._base64ify(self.data)
def from_text(cls, rdclass, rdtype, tok, origin = None, relativize = True):
chunks = []
while 1:
t = tok.get()
if t[0] == linkcheck.dns.tokenizer.EOL or t[0] == linkcheck.dns.tokenizer.EOF:
break
if t[0] != linkcheck.dns.tokenizer.IDENTIFIER:
raise linkcheck.dns.exception.SyntaxError
chunks.append(t[1])
b64 = ''.join(chunks)
data = b64.decode('base64_codec')
return cls(rdclass, rdtype, data)
from_text = classmethod(from_text)
def to_wire(self, file, compress = None, origin = None):
file.write(self.data)
def from_wire(cls, rdclass, rdtype, wire, current, rdlen, origin = None):
data = wire[current : current + rdlen]
return cls(rdclass, rdtype, data)
from_wire = classmethod(from_wire)
def _cmp(self, other):
return cmp(self.data, other.data)

View file

@ -0,0 +1,160 @@
# -*- coding: iso-8859-1 -*-
# Copyright (C) 2006, 2007 Nominum, Inc.
#
# Permission to use, copy, modify, and distribute this software and its
# documentation for any purpose with or without fee is hereby granted,
# provided that the above copyright notice and this permission notice
# appear in all copies.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND NOMINUM DISCLAIMS ALL WARRANTIES
# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NOMINUM BE LIABLE FOR
# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
from cStringIO import StringIO
import struct
import linkcheck.dns.exception
import linkcheck.dns.inet
import linkcheck.dns.name
class IPSECKEY(linkcheck.dns.rdata.Rdata):
"""IPSECKEY record
@ivar precedence: the precedence for this key data
@type precedence: int
@ivar gateway_type: the gateway type
@type gateway_type: int
@ivar algorithm: the algorithm to use
@type algorithm: int
@ivar gateway: the public key
@type gateway: None, IPv4 address, IPV6 address, or domain name
@ivar key: the public key
@type key: string
@see: RFC 4025"""
__slots__ = ['precedence', 'gateway_type', 'algorithm', 'gateway', 'key']
def __init__(self, rdclass, rdtype, precedence, gateway_type, algorithm,
gateway, key):
super(IPSECKEY, self).__init__(rdclass, rdtype)
if gateway_type == 0:
if gateway != '.' and not gateway is None:
raise SyntaxError, 'invalid gateway for gateway type 0'
gateway = None
elif gateway_type == 1:
# check that it's OK
junk = linkcheck.dns.inet.inet_pton(linkcheck.dns.inet.AF_INET, gateway)
elif gateway_type == 2:
# check that it's OK
junk = linkcheck.dns.inet.inet_pton(linkcheck.dns.inet.AF_INET6, gateway)
elif gateway_type == 3:
pass
else:
raise SyntaxError, \
'invalid IPSECKEY gateway type: %d' % gateway_type
self.precedence = precedence
self.gateway_type = gateway_type
self.algorithm = algorithm
self.gateway = gateway
self.key = key
def to_text(self, origin=None, relativize=True, **kw):
if self.gateway_type == 0:
gateway = '.'
elif self.gateway_type == 1:
gateway = self.gateway
elif self.gateway_type == 2:
gateway = self.gateway
elif self.gateway_type == 3:
gateway = str(self.gateway.choose_relativity(origin, relativize))
else:
raise ValueError, 'invalid gateway type'
return '%d %d %d %s %s' % (self.precedence, self.gateway_type,
self.algorithm, gateway,
linkcheck.dns.rdata._base64ify(self.key))
def from_text(cls, rdclass, rdtype, tok, origin = None, relativize = True):
precedence = tok.get_uint8()
gateway_type = tok.get_uint8()
algorithm = tok.get_uint8()
if gateway_type == 3:
gateway = tok.get_name().choose_relativity(origin, relativize)
else:
gateway = tok.get_string()
chunks = []
while 1:
t = tok.get()
if t[0] == linkcheck.dns.tokenizer.EOL or t[0] == linkcheck.dns.tokenizer.EOF:
break
if t[0] != linkcheck.dns.tokenizer.IDENTIFIER:
raise linkcheck.dns.exception.SyntaxError
chunks.append(t[1])
b64 = ''.join(chunks)
key = b64.decode('base64_codec')
return cls(rdclass, rdtype, precedence, gateway_type, algorithm,
gateway, key)
from_text = classmethod(from_text)
def to_wire(self, file, compress = None, origin = None):
header = struct.pack("!BBB", self.precedence, self.gateway_type,
self.algorithm)
file.write(header)
if self.gateway_type == 0:
pass
elif self.gateway_type == 1:
file.write(linkcheck.dns.inet.inet_pton(linkcheck.dns.inet.AF_INET, self.gateway))
elif self.gateway_type == 2:
file.write(linkcheck.dns.inet.inet_pton(linkcheck.dns.inet.AF_INET6, self.gateway))
elif self.gateway_type == 3:
self.gateway.to_wire(file, None, origin)
else:
raise ValueError, 'invalid gateway type'
file.write(self.key)
def from_wire(cls, rdclass, rdtype, wire, current, rdlen, origin = None):
if rdlen < 3:
raise linkcheck.dns.exception.FormError
header = struct.unpack('!BBB', wire[current : current + 3])
gateway_type = header[1]
current += 3
rdlen -= 3
if gateway_type == 0:
gateway = None
elif gateway_type == 1:
gateway = linkcheck.dns.inet.inet_ntop(linkcheck.dns.inet.AF_INET,
wire[current : current + 4])
current += 4
rdlen -= 4
elif gateway_type == 2:
gateway = linkcheck.dns.inet.inet_ntop(linkcheck.dns.inet.AF_INET6,
wire[current : current + 16])
current += 16
rdlen -= 16
elif gateway_type == 3:
(gateway, cused) = linkcheck.dns.name.from_wire(wire[: current + rdlen],
current)
current += cused
rdlen -= cused
else:
raise linkcheck.dns.exception.FormError, 'invalid IPSECKEY gateway type'
key = wire[current : current + rdlen]
return cls(rdclass, rdtype, header[0], gateway_type, header[2],
gateway, key)
from_wire = classmethod(from_wire)
def _cmp(self, other):
f = StringIO()
self.to_wire(f)
wire1 = f.getvalue()
f.seek(0)
f.truncate()
other.to_wire(f)
wire2 = f.getvalue()
f.close()
return cmp(wire1, wire2)

View file

@ -120,7 +120,7 @@ class NAPTR(linkcheck.dns.rdata.Rdata):
def _cmp(self, other):
sp = struct.pack("!HH", self.order, self.preference)
op = struct.pack("!HH", other.order, self.preference)
op = struct.pack("!HH", other.order, other.preference)
v = cmp(sp, op)
if v == 0:
v = cmp(self.flags, other.flags)

View file

@ -20,11 +20,12 @@ __all__ = [
'A',
'AAAA',
'APL',
'DHCID',
'KX',
'PX',
'NAPTR',
'NSAP',
'NSAP_PTR',
'PX',
'SRV',
'WKS',
]

View file

@ -102,7 +102,7 @@ class SIGBase(linkcheck.dns.rdata.Rdata):
type_covered = linkcheck.dns.rdatatype.from_text(tok.get_string())
algorithm = linkcheck.dns.dnssec.algorithm_from_text(tok.get_string())
labels = tok.get_int()
original_ttl = tok.get_uint32()
original_ttl = tok.get_ttl()
expiration = sigtime_to_posixtime(tok.get_string())
inception = sigtime_to_posixtime(tok.get_string())
key_tag = tok.get_int()

View file

@ -0,0 +1,88 @@
# Copyright (C) 2006, 2007 Nominum, Inc.
#
# Permission to use, copy, modify, and distribute this software and its
# documentation for any purpose with or without fee is hereby granted,
# provided that the above copyright notice and this permission notice
# appear in all copies.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND NOMINUM DISCLAIMS ALL WARRANTIES
# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NOMINUM BE LIABLE FOR
# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
"""TXT-like base class."""
import linkcheck.dns.exception
import linkcheck.dns.rdata
import linkcheck.dns.tokenizer
class TXTBase(linkcheck.dns.rdata.Rdata):
"""Base class for rdata that is like a TXT record
@ivar strings: the text strings
@type strings: list of string
@see: RFC 1035"""
__slots__ = ['strings']
def __init__(self, rdclass, rdtype, strings):
super(TXTBase, self).__init__(rdclass, rdtype)
if isinstance(strings, str):
strings = [ strings ]
self.strings = strings[:]
def to_text(self, origin=None, relativize=True, **kw):
txt = ''
prefix = ''
for s in self.strings:
txt += '%s"%s"' % (prefix, linkcheck.dns.rdata._escapify(s))
prefix = ' '
return txt
def from_text(cls, rdclass, rdtype, tok, origin = None, relativize = True):
strings = []
while 1:
(ttype, s) = tok.get()
if ttype == linkcheck.dns.tokenizer.EOL or ttype == linkcheck.dns.tokenizer.EOF:
break
if ttype != linkcheck.dns.tokenizer.QUOTED_STRING and \
ttype != linkcheck.dns.tokenizer.IDENTIFIER:
raise linkcheck.dns.exception.SyntaxError, "expected a string"
if len(s) > 255:
raise linkcheck.dns.exception.SyntaxError, "string too long"
strings.append(s)
if len(strings) == 0:
raise linkcheck.dns.exception.UnexpectedEnd
return cls(rdclass, rdtype, strings)
from_text = classmethod(from_text)
def to_wire(self, file, compress = None, origin = None):
for s in self.strings:
l = len(s)
assert l < 256
byte = chr(l)
file.write(byte)
file.write(s)
def from_wire(cls, rdclass, rdtype, wire, current, rdlen, origin = None):
strings = []
while rdlen > 0:
l = ord(wire[current])
current += 1
rdlen -= 1
if l > rdlen:
raise linkcheck.dns.exception.FormError
s = wire[current : current + l]
current += l
rdlen -= l
strings.append(s)
return cls(rdclass, rdtype, strings)
from_wire = classmethod(from_wire)
def _cmp(self, other):
return cmp(self.strings, other.strings)

View file

@ -343,7 +343,13 @@ class Resolver(object):
a string, it is used as the name of the file to open; otherwise it
is treated as the file itself."""
if isinstance(f, basestring):
f = open(f, 'r')
try:
f = open(f, 'r')
except IOError:
# /etc/resolv.conf doesn't exist, can't be read, etc.
# We'll just use the default resolver configuration.
self.nameservers = ['127.0.0.1']
return
want_close = True
else:
want_close = False
@ -405,10 +411,27 @@ class Resolver(object):
for h in addrinfo[2]:
self.localhosts.add(h.lower())
def _determine_split_char(self, entry):
#
# The windows registry irritatingly changes the list element
# delimiter in between ' ' and ',' (and vice-versa) in various
# versions of windows.
#
if entry.find(' ') >= 0:
split_char = ' '
elif entry.find(',') >= 0:
split_char = ','
else:
# probably a singleton; treat as a space-separated list.
split_char = ' '
return split_char
def _config_win32_nameservers (self, nameservers, split_char=','):
"""Configure a NameServer registry entry."""
# we call str() on nameservers to convert it from unicode to ascii
ns_list = str(nameservers).split(split_char)
nameservers = str(nameservers)
split_char = self._determine_split_char(nameservers)
ns_list = nameservers.split(split_char)
for ns in ns_list:
if not ns in self.nameservers:
self.nameservers.append(ns)
@ -421,7 +444,9 @@ class Resolver(object):
def _config_win32_search (self, search):
"""Configure a Search registry entry."""
# we call str() on search to convert it from unicode to ascii
search_list = str(search).split(',')
search = str(search)
split_char = self._determine_split_char(search)
search_list = search.split(split_char)
for s in search_list:
if not s in self.search:
self.search.add(linkcheck.dns.name.from_text(s))
@ -512,16 +537,10 @@ class Resolver(object):
guid = _winreg.EnumKey(interfaces, i)
i += 1
key = _winreg.OpenKey(interfaces, guid)
if not self._win32_is_nic_enabled(lm, guid, key):
continue
try:
# enabled interfaces seem to have a non-empty
# NTEContextList
try:
(nte, ttype) = _winreg.QueryValueEx(key,
'NTEContextList')
except WindowsError:
nte = None
if nte:
self._config_win32_fromkey(key)
self._config_win32_fromkey(key)
finally:
key.Close()
except EnvironmentError:
@ -531,6 +550,61 @@ class Resolver(object):
finally:
lm.Close()
def _win32_is_nic_enabled(self, lm, guid, interface_key):
# Look in the Windows Registry to determine whether the network
# interface corresponding to the given guid is enabled.
#
# (Code contributed by Paul Marks, thanks!)
#
try:
# This hard-coded location seems to be consistent, at least
# from Windows 2000 through Vista.
connection_key = _winreg.OpenKey(
lm,
r'SYSTEM\CurrentControlSet\Control\Network'
r'\{4D36E972-E325-11CE-BFC1-08002BE10318}'
r'\%s\Connection' % guid)
try:
# The PnpInstanceID points to a key inside Enum
(pnp_id, ttype) = _winreg.QueryValueEx(
connection_key, 'PnpInstanceID')
if ttype != _winreg.REG_SZ:
raise ValueError
device_key = _winreg.OpenKey(
lm, r'SYSTEM\CurrentControlSet\Enum\%s' % pnp_id)
try:
# Get ConfigFlags for this device
(flags, ttype) = _winreg.QueryValueEx(
device_key, 'ConfigFlags')
if ttype != _winreg.REG_DWORD:
raise ValueError
# Based on experimentation, bit 0x1 indicates that the
# device is disabled.
return not (flags & 0x1)
finally:
device_key.Close()
finally:
connection_key.Close()
except (EnvironmentError, ValueError):
# Pre-vista, enabled interfaces seem to have a non-empty
# NTEContextList; this was how dnspython detected enabled
# nics before the code above was contributed. We've retained
# the old method since we don't know if the code above works
# on Windows 95/98/ME.
try:
(nte, ttype) = _winreg.QueryValueEx(interface_key,
'NTEContextList')
return nte is not None
except WindowsError:
return False
def _compute_timeout(self, start):
now = time.time()
if now < start:
@ -538,7 +612,7 @@ class Resolver(object):
# Time going backwards is bad. Just give up.
raise Timeout
else:
# Time went backwards, but only a little. This can
# Time went backwards, but only a little. This can
# happen, e.g. under vmware with older linux kernels.
# Pretend it didn't happen.
now = start
@ -571,11 +645,7 @@ class Resolver(object):
@raises NoNameservers: no non-broken nameservers are available to
answer the question."""
if isinstance(qname, str):
qname = linkcheck.dns.name.from_text(qname, None)
elif isinstance(qname, unicode):
# Unicode domain names: http://www.faqs.org/rfcs/rfc3490.html
qname = encodings.idna.ToASCII(qname)
if isinstance(qname, basestring):
qname = linkcheck.dns.name.from_text(qname, None)
if isinstance(rdtype, str):
rdtype = linkcheck.dns.rdatatype.from_text(rdtype)

View file

@ -0,0 +1,75 @@
# Copyright (C) 2006, 2007 Nominum, Inc.
#
# Permission to use, copy, modify, and distribute this software and its
# documentation for any purpose with or without fee is hereby granted,
# provided that the above copyright notice and this permission notice
# appear in all copies.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND NOMINUM DISCLAIMS ALL WARRANTIES
# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NOMINUM BE LIABLE FOR
# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
"""DNS Reverse Map Names.
@var ipv4_reverse_domain: The DNS IPv4 reverse-map domain, in-addr.arpa.
@type ipv4_reverse_domain: dns.name.Name object
@var ipv6_reverse_domain: The DNS IPv6 reverse-map domain, ip6.arpa.
@type ipv6_reverse_domain: dns.name.Name object
"""
import linkcheck.dns.name
import linkcheck.dns.ipv6
import linkcheck.dns.ipv4
ipv4_reverse_domain = linkcheck.dns.name.from_text('in-addr.arpa.')
ipv6_reverse_domain = linkcheck.dns.name.from_text('ip6.arpa.')
def from_address(text):
"""Convert an IPv4 or IPv6 address in textual form into a Name object whose
value is the reverse-map domain name of the address.
@param text: an IPv4 or IPv6 address in textual form (e.g. '127.0.0.1',
'::1')
@type text: str
@rtype: linkcheck.dns.name.Name object
"""
try:
parts = list(linkcheck.dns.ipv6.inet_aton(text).encode('hex_codec'))
origin = ipv6_reverse_domain
except:
parts = ['%d' % ord(byte) for byte in linkcheck.dns.ipv4.inet_aton(text)]
origin = ipv4_reverse_domain
parts.reverse()
return linkcheck.dns.name.from_text('.'.join(parts), origin=origin)
def to_address(name):
"""Convert a reverse map domain name into textual address form.
@param name: an IPv4 or IPv6 address in reverse-map form.
@type name: linkcheck.dns.name.Name object
@rtype: str
"""
if name.is_subdomain(ipv4_reverse_domain):
name = name.relativize(ipv4_reverse_domain)
labels = list(name.labels)
labels.reverse()
text = '.'.join(labels)
# run through inet_aton() to check syntax and make pretty.
return linkcheck.dns.ipv4.inet_ntoa(linkcheck.dns.ipv4.inet_aton(text))
elif name.is_subdomain(ipv6_reverse_domain):
name = name.relativize(ipv6_reverse_domain)
labels = list(name.labels)
labels.reverse()
parts = []
i = 0
l = len(labels)
while i < l:
parts.append(''.join(labels[i:i+4]))
i += 4
text = ':'.join(parts)
# run through inet_aton() to check syntax and make pretty.
return linkcheck.dns.ipv6.inet_ntoa(linkcheck.dns.ipv6.inet_aton(text))
else:
raise linkcheck.dns.exception.SyntaxError, 'unknown reverse-map address family'

View file

@ -108,6 +108,13 @@ class RRset(linkcheck.dns.rdataset.Rdataset):
return super(RRset, self).to_wire(self.name, file, compress, origin,
self.deleting, **kw)
def to_rdataset(self):
"""Convert an RRset into an Rdataset.
@rtype: dns.rdataset.Rdataset object
"""
return linkcheck.dns.rdataset.from_rdata_list(self.ttl, list(self))
def from_text_list(name, ttl, rdclass, rdtype, text_rdatas):
"""Create an RRset with the specified name, TTL, class, and type, and with
@ -116,7 +123,7 @@ def from_text_list(name, ttl, rdclass, rdtype, text_rdatas):
@rtype: linkcheck.dns.rrset.RRset object
"""
if isinstance(name, str):
if isinstance(name, basestring):
name = linkcheck.dns.name.from_text(name, None)
if isinstance(rdclass, str):
rdclass = linkcheck.dns.rdataclass.from_text(rdclass)
@ -145,7 +152,7 @@ def from_rdata_list(name, ttl, rdatas):
@rtype: linkcheck.dns.rrset.RRset object
"""
if isinstance(name, str):
if isinstance(name, basestring):
name = linkcheck.dns.name.from_text(name, None)
if len(rdatas) == 0:

View file

@ -279,10 +279,10 @@ class Tokenizer(object):
elif c == '\n':
raise linkcheck.dns.exception.DNSSyntaxError, 'newline in quoted string'
elif c == '\\':
#
# Treat \ followed by a delimiter as the
# delimiter, otherwise leave it alone.
#
c = self._get_char()
if c == '' or not c in self.delimiters:
self._unget_char(c)
@ -419,3 +419,9 @@ class Tokenizer(object):
raise linkcheck.dns.exception.DNSSyntaxError, \
'expected EOL or EOF, got %d "%s"' % (ttype, t)
return t
def get_ttl(self):
(ttype, t) = self.get()
if ttype != IDENTIFIER:
raise linkcheck.dns.exception.SyntaxError, 'expecting an identifier'
return linkcheck.dns.ttl.from_text(t)

View file

@ -33,30 +33,33 @@ def from_text(text):
"""
if text.isdigit():
return int(text)
if not text[0].isdigit():
raise BadTTL
total = 0
current = 0
for c in text:
if c.isdigit():
current *= 10
current += int(c)
else:
c = c.lower()
if c == 'w':
total += current * 604800
elif c == 'd':
total += current * 86400
elif c == 'h':
total += current * 3600
elif c == 'm':
total += current * 60
elif c == 's':
total += current
total = long(text)
else:
if not text[0].isdigit():
raise BadTTL
total = 0L
current = 0L
for c in text:
if c.isdigit():
current *= 10
current += long(c)
else:
raise BadTTL, "unknown unit '%s'" % c
current = 0
if not current == 0:
raise BadTTL, "trailing integer"
c = c.lower()
if c == 'w':
total += current * 604800L
elif c == 'd':
total += current * 86400L
elif c == 'h':
total += current * 3600L
elif c == 'm':
total += current * 60L
elif c == 's':
total += current
else:
raise BadTTL, "unknown unit '%s'" % c
current = 0
if not current == 0:
raise BadTTL, "trailing integer"
if total < 0L or total > 2147483647L:
raise BadTTL, "TTL should be between 0 and 2^31 - 1 (inclusive)"
return total

View file

@ -47,10 +47,8 @@ class Update(linkcheck.dns.message.Message):
self.flags |= linkcheck.dns.opcode.to_flags(linkcheck.dns.opcode.UPDATE)
if isinstance(zone, str):
zone = linkcheck.dns.name.from_text(zone)
else:
zone = zone.copy()
self.origin = zone
if isinstance(rdclass, str):
if isinstance(rdclass, basestring):
rdclass = linkcheck.dns.rdataclass.from_text(rdclass)
self.zone_rdclass = rdclass
self.find_rrset(self.question, self.origin, rdclass, linkcheck.dns.rdatatype.SOA,
@ -81,7 +79,7 @@ class Update(linkcheck.dns.message.Message):
- ttl, rdtype, string..."""
if isinstance(name, str):
if isinstance(name, basestring):
name = linkcheck.dns.name.from_text(name, None)
if isinstance(args[0], linkcheck.dns.rdataset.Rdataset):
for rds in args:
@ -131,7 +129,7 @@ class Update(linkcheck.dns.message.Message):
- rdtype, [string...]"""
if isinstance(name, str):
if isinstance(name, basestring):
name = linkcheck.dns.name.from_text(name, None)
if len(args) == 0:
rrset = self.find_rrset(self.authority, name, linkcheck.dns.rdataclass.ANY,
@ -189,7 +187,7 @@ class Update(linkcheck.dns.message.Message):
- rdtype, string..."""
if isinstance(name, str):
if isinstance(name, basestring):
name = linkcheck.dns.name.from_text(name, None)
if len(args) == 0:
rrset = self.find_rrset(self.answer, name,
@ -217,7 +215,7 @@ class Update(linkcheck.dns.message.Message):
"""Require that an owner name (and optionally an rdata type) does
not exist as a prerequisite to the execution of the update."""
if isinstance(name, str):
if isinstance(name, basestring):
name = linkcheck.dns.name.from_text(name, None)
if rdtype is None:
rrset = self.find_rrset(self.answer, name,
@ -232,7 +230,7 @@ class Update(linkcheck.dns.message.Message):
linkcheck.dns.rdatatype.NONE, None,
True, True)
def to_wire(self, origin=None, max_size=65535, **kw):
def to_wire(self, origin=None, max_size=65535):
"""Return a string containing the update in DNS compressed wire
format.
@rtype: string"""

View file

@ -17,8 +17,8 @@
"""dnspython release version information."""
MAJOR = 1
MINOR = 3
MICRO = 2
MINOR = 6
MICRO = 0
RELEASELEVEL = 0x0f
SERIAL = 0

View file

@ -39,6 +39,10 @@ class NoNS(BadZone):
"""The zone has no NS RRset at its origin."""
pass
class UnknownOrigin(BadZone):
"""The zone's origin is unknown."""
pass
class Zone(object):
"""A DNS zone.
@ -101,7 +105,7 @@ class Zone(object):
return not self.__eq__(other)
def _validate_name(self, name):
if isinstance(name, str):
if isinstance(name, basestring):
name = linkcheck.dns.name.from_text(name, None)
elif not isinstance(name, linkcheck.dns.name.Name):
raise KeyError, \
@ -127,22 +131,22 @@ class Zone(object):
del self.nodes[key]
def __iter__(self):
return self.nodes.keys()
return self.nodes.iterkeys()
def iterkeys(self):
return self.nodes.keys()
return self.nodes.iterkeys()
def keys(self):
return self.nodes.keys()
def itervalues(self):
return self.nodes.values()
return self.nodes.itervalues()
def values(self):
return self.nodes.values()
def iteritems(self):
return self.nodes.items()
return self.nodes.iteritems()
def items(self):
return self.nodes.items()
@ -419,7 +423,7 @@ class Zone(object):
rdtype = linkcheck.dns.rdatatype.from_text(rdtype)
if isinstance(covers, str):
covers = linkcheck.dns.rdatatype.from_text(covers)
for (name, node) in self.items():
for (name, node) in self.iteritems():
for rds in node:
if rdtype == linkcheck.dns.rdatatype.ANY or \
(rds.rdtype == rdtype and rds.covers == covers):
@ -442,7 +446,7 @@ class Zone(object):
rdtype = linkcheck.dns.rdatatype.from_text(rdtype)
if isinstance(covers, str):
covers = linkcheck.dns.rdatatype.from_text(covers)
for (name, node) in self.items():
for (name, node) in self.iteritems():
for rds in node:
if rdtype == linkcheck.dns.rdatatype.ANY or \
(rds.rdtype == rdtype and rds.covers == covers):
@ -467,16 +471,11 @@ class Zone(object):
@type nl: string or None
"""
if sys.hexversion >= 0x02030000:
# allow Unicode filenames
str_type = basestring
else:
str_type = str
if nl is None:
opts = 'w'
else:
opts = 'wb'
if isinstance(f, str_type):
if isinstance(f, basestring):
f = file(f, opts)
want_close = True
else:
@ -486,7 +485,7 @@ class Zone(object):
names = self.keys()
names.sort()
else:
names = self.keys()
names = self.iterkeys()
for n in names:
l = self[n].to_text(n, origin=self.origin,
relativize=relativize)
@ -538,11 +537,14 @@ class _MasterReader(object):
(None if no $INCLUDE is active).
@ivar allow_include: is $INCLUDE allowed?
@type allow_include: bool
@ivar check_origin: should sanity checks of the origin node be done?
The default is True.
@type check_origin: bool
"""
def __init__(self, tok, origin, rdclass, relativize, zone_factory=Zone,
allow_include=False):
if isinstance(origin, str):
allow_include=False, check_origin=True):
if isinstance(origin, basestring):
origin = linkcheck.dns.name.from_text(origin)
self.tok = tok
self.current_origin = origin
@ -553,6 +555,7 @@ class _MasterReader(object):
self.saved_state = []
self.current_file = None
self.allow_include = allow_include
self.check_origin = check_origin
def _eat_line(self):
while 1:
@ -672,6 +675,8 @@ class _MasterReader(object):
elif u == '$ORIGIN':
self.current_origin = self.tok.get_name()
self.tok.get_eol()
if self.zone.origin is None:
self.zone.origin = self.current_origin
elif u == '$INCLUDE' and self.allow_include:
token = self.tok.get()
if token[0] != linkcheck.dns.tokenizer.QUOTED_STRING:
@ -712,15 +717,19 @@ class _MasterReader(object):
"%s:%d: %s" % (filename, line_number, detail)
# Now that we're done reading, do some basic checking of the zone.
self.zone.check_origin()
if self.check_origin:
self.zone.check_origin()
def from_text(text, origin, rdclass = linkcheck.dns.rdataclass.IN, relativize = True,
zone_factory=Zone, filename=None, allow_include=False):
def from_text(text, origin, rdclass = linkcheck.dns.rdataclass.IN,
relativize = True, zone_factory=Zone, filename=None,
allow_include=False, check_origin=True):
"""Build a zone object from a master file format string.
@param text: the master file format input
@type text: string.
@param origin: The origin of the zone.
@param origin: The origin of the zone; if not specified, the first
$ORIGIN statement in the master file will determine the origin of the
zone.
@type origin: linkcheck.dns.name.Name object or string
@param rdclass: The zone's rdata class; the default is class IN.
@type rdclass: int
@ -730,9 +739,12 @@ def from_text(text, origin, rdclass = linkcheck.dns.rdataclass.IN, relativize =
@type zone_factory: function returning a Zone
@param filename: The filename to emit when describing where an error
occurred; the default is '<string>'.
@type filename: string
@param allow_include: is $INCLUDE allowed?
@type allow_include: bool
@type filename: string
@param check_origin: should sanity checks of the origin node be done?
The default is True.
@type check_origin: bool
@raises linkcheck.dns.zone.NoSOA: No SOA RR was found at the zone origin
@raises linkcheck.dns.zone.NoNS: No NS RRset was found at the zone origin
@rtype: linkcheck.dns.zone.Zone object
@ -746,17 +758,21 @@ def from_text(text, origin, rdclass = linkcheck.dns.rdataclass.IN, relativize =
filename = '<string>'
tok = linkcheck.dns.tokenizer.Tokenizer(text, filename)
reader = _MasterReader(tok, origin, rdclass, relativize, zone_factory,
allow_include=allow_include)
allow_include=allow_include,
check_origin=check_origin)
reader.read()
return reader.zone
def from_file(f, origin, rdclass = linkcheck.dns.rdataclass.IN, relativize = True,
zone_factory=Zone, filename=None, allow_include=True):
zone_factory=Zone, filename=None,
allow_include=True, check_origin=True):
"""Read a master file and build a zone object.
@param f: file or string. If I{f} is a string, it is treated
as the name of a file to open.
@param origin: The origin of the zone.
@param origin: The origin of the zone; if not specified, the first
$ORIGIN statement in the master file will determine the origin of the
zone.
@type origin: linkcheck.dns.name.Name object or string
@param rdclass: The zone's rdata class; the default is class IN.
@type rdclass: int
@ -770,22 +786,18 @@ def from_file(f, origin, rdclass = linkcheck.dns.rdataclass.IN, relativize = Tru
@type filename: string
@param allow_include: is $INCLUDE allowed?
@type allow_include: bool
@param check_origin: should sanity checks of the origin node be done?
The default is True.
@type check_origin: bool
@raises linkcheck.dns.zone.NoSOA: No SOA RR was found at the zone origin
@raises linkcheck.dns.zone.NoNS: No NS RRset was found at the zone origin
@rtype: linkcheck.dns.zone.Zone object
"""
if sys.hexversion >= 0x02030000:
# allow Unicode filenames; turn on universal newline support
str_type = basestring
opts = 'rU'
else:
str_type = str
opts = 'r'
if isinstance(f, str_type):
if isinstance(f, basestring):
if filename is None:
filename = f
f = file(f, opts)
f = file(f, 'rU')
want_close = True
else:
if filename is None:
@ -794,7 +806,7 @@ def from_file(f, origin, rdclass = linkcheck.dns.rdataclass.IN, relativize = Tru
try:
z = from_text(f, origin, rdclass, relativize, zone_factory,
filename, allow_include)
filename, allow_include, check_origin)
finally:
if want_close:
f.close()
@ -807,7 +819,7 @@ def from_xfr(xfr, zone_factory=Zone, relativize=True):
@type xfr: generator of linkcheck.dns.message.Message objects
@param relativize: should names be relativized? The default is True.
It is essential that the relativize setting matches the one specified
to dns.query.xfr()
to dns.query.xfr().
@type relativize: bool
@raises linkcheck.dns.zone.NoSOA: No SOA RR was found at the zone origin
@raises linkcheck.dns.zone.NoNS: No NS RRset was found at the zone origin
@ -817,7 +829,10 @@ def from_xfr(xfr, zone_factory=Zone, relativize=True):
z = None
for r in xfr:
if z is None:
origin = r.answer[0].name
if relativize:
origin = r.origin
else:
origin = r.answer[0].name
rdclass = r.answer[0].rdclass
z = zone_factory(origin, rdclass, relativize=relativize)
for rrset in r.answer:

44
tests/dns/test_bugs.py Normal file
View file

@ -0,0 +1,44 @@
# Copyright (C) 2006, 2007 Nominum, Inc.
#
# Permission to use, copy, modify, and distribute this software and its
# documentation for any purpose with or without fee is hereby granted,
# provided that the above copyright notice and this permission notice
# appear in all copies.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND NOMINUM DISCLAIMS ALL WARRANTIES
# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NOMINUM BE LIABLE FOR
# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
import unittest
import linkcheck.dns.rdata
import linkcheck.dns.rdataclass
import linkcheck.dns.rdatatype
import linkcheck.dns.ttl
class BugsTestCase(unittest.TestCase):
def test_float_LOC(self):
rdata = linkcheck.dns.rdata.from_text(linkcheck.dns.rdataclass.IN, linkcheck.dns.rdatatype.LOC,
"30 30 0.000 N 100 30 0.000 W 10.00m 20m 2000m 20m")
self.failUnless(rdata.float_latitude == 30.5)
self.failUnless(rdata.float_longitude == -100.5)
def test_SOA_BIND8_TTL(self):
rdata1 = linkcheck.dns.rdata.from_text(linkcheck.dns.rdataclass.IN, linkcheck.dns.rdatatype.SOA,
"a b 100 1s 1m 1h 1d")
rdata2 = linkcheck.dns.rdata.from_text(linkcheck.dns.rdataclass.IN, linkcheck.dns.rdatatype.SOA,
"a b 100 1 60 3600 86400")
self.failUnless(rdata1 == rdata2)
def test_TTL_bounds_check(self):
def bad():
ttl = linkcheck.dns.ttl.from_text("2147483648")
self.failUnlessRaises(linkcheck.dns.ttl.BadTTL, bad)
if __name__ == '__main__':
unittest.main()

View file

@ -129,7 +129,13 @@ class TestMessage (unittest.TestCase):
def test_TooBig(self):
def bad():
q = linkcheck.dns.message.from_text(query_text)
w = q.to_wire(max_size=15)
for i in xrange(0, 25):
rrset = linkcheck.dns.rrset.from_text('foo%d.' % i, 3600,
linkcheck.dns.rdataclass.IN,
linkcheck.dns.rdatatype.A,
'10.0.0.%d' % i)
q.additional.append(rrset)
w = q.to_wire(max_size=512)
self.assertRaises(linkcheck.dns.exception.TooBig, bad)
def test_answer1(self):