Updated dnspython.

This commit is contained in:
Bastian Kleineidam 2010-10-24 01:02:26 +02:00
parent cb724616c0
commit 3834e56496
9 changed files with 130 additions and 53 deletions

View file

@ -16,6 +16,11 @@
"""Common DNSSEC-related functions and constants."""
import linkcheck.dns.name
import linkcheck.dns.rdata
import linkcheck.dns.rdatatype
import linkcheck.dns.rdataclass
RSAMD5 = 1
DH = 2
DSA = 3
@ -71,3 +76,44 @@ def algorithm_to_text(value):
if text is None:
text = str(value)
return text
def _to_rdata(record):
from cStringIO import StringIO
s = StringIO()
record.to_wire(s)
return s.getvalue()
def key_id(key):
rdata = _to_rdata(key)
if key.algorithm == RSAMD5:
return (ord(rdata[-3]) << 8) + ord(rdata[-2])
else:
total = 0
for i in range(len(rdata) / 2):
total += (ord(rdata[2 * i]) << 8) + ord(rdata[2 * i + 1])
if len(rdata) % 2 != 0:
total += ord(rdata[len(rdata) - 1]) << 8
total += ((total >> 16) & 0xffff);
return total & 0xffff
def make_ds(name, key, algorithm):
import hashlib
import struct
if algorithm.upper() == 'SHA1':
dsalg = 1
hash = hashlib.sha1()
elif algorithm.upper() == 'SHA256':
dsalg = 2
hash = hashlib.sha256()
else:
raise ValueError, 'unsupported algorithm "%s"' % algorithm
if isinstance(name, basestring):
name = linkcheck.dns.name.from_text(name)
hash.update(name.canonicalize().to_wire())
hash.update(_to_rdata(key))
digest = hash.digest()
dsrdata = struct.pack("!HBB", key_id(key), key.algorithm, dsalg) + digest
return linkcheck.dns.rdata.from_wire(linkcheck.dns.rdataclass.IN,
linkcheck.dns.rdatatype.DS, dsrdata, 0, len(dsrdata))

View file

@ -21,6 +21,7 @@ import struct
import sys
import time
import linkcheck.dns.edns
import linkcheck.dns.exception
import linkcheck.dns.flags
import linkcheck.dns.name
@ -95,8 +96,11 @@ class Message(object):
@ivar request_mac: The TSIG MAC of the request message associated with
this message; used when validating TSIG signatures. @see: RFC 2845 for
more information on TSIG fields.
@ivar keyalgorithm: The TSIG key algorithm to use. The default is
linkcheck.dns.tsig.default_algorithm.
@ivar keyalgorithm: The TSIG algorithm to use; defaults to
linkcheck.dns.tsig.default_algorithm. Constants for TSIG algorithms are defined
in linkcheck.dns.tsig, and the currently implemented algorithms are
HMAC_MD5, HMAC_SHA1, HMAC_SHA224, HMAC_SHA256, HMAC_SHA384, and
HMAC_SHA512.
@type keyalgorithm: string
@type request_mac: string
@ivar fudge: TSIG time fudge; default is 300 seconds.
@ -1036,9 +1040,9 @@ def make_query(qname, rdtype, rdclass = linkcheck.dns.rdataclass.IN,
if isinstance(qname, basestring):
qname = linkcheck.dns.name.from_text(qname)
if isinstance(rdtype, str):
if isinstance(rdtype, basestring):
rdtype = linkcheck.dns.rdatatype.from_text(rdtype)
if isinstance(rdclass, str):
if isinstance(rdclass, basestring):
rdclass = linkcheck.dns.rdataclass.from_text(rdclass)
m = Message()
m.flags |= linkcheck.dns.flags.RD

View file

@ -313,7 +313,7 @@ def xfr (where, zone, rdtype=linkcheck.dns.rdatatype.AXFR,
if isinstance(zone, basestring):
zone = linkcheck.dns.name.from_text(zone)
if isinstance(rdtype, str):
if isinstance(rdtype, basestring):
rdtype = linkcheck.dns.rdatatype.from_text(rdtype)
q = linkcheck.dns.message.make_query(zone, rdtype, rdclass)
if rdtype == linkcheck.dns.rdatatype.IXFR:

View file

@ -278,9 +278,9 @@ def from_text_list(rdclass, rdtype, ttl, text_rdatas):
@rtype: linkcheck.dns.rdataset.Rdataset object
"""
if isinstance(rdclass, str):
if isinstance(rdclass, basestring):
rdclass = linkcheck.dns.rdataclass.from_text(rdclass)
if isinstance(rdtype, str):
if isinstance(rdtype, basestring):
rdtype = linkcheck.dns.rdatatype.from_text(rdtype)
r = Rdataset(rdclass, rdtype)
r.update_ttl(ttl)

View file

@ -654,9 +654,9 @@ class Resolver(object):
if isinstance(qname, basestring):
qname = linkcheck.dns.name.from_text(qname, None)
if isinstance(rdtype, str):
if isinstance(rdtype, basestring):
rdtype = linkcheck.dns.rdatatype.from_text(rdtype)
if isinstance(rdclass, str):
if isinstance(rdclass, basestring):
rdclass = linkcheck.dns.rdataclass.from_text(rdclass)
qnames_to_try = []
if qname.is_absolute():
@ -678,7 +678,8 @@ class Resolver(object):
return answer
request = linkcheck.dns.message.make_query(qname, rdtype, rdclass)
if not self.keyname is None:
request.use_tsig(self.keyring, self.keyname, self.keyalgorithm)
request.use_tsig(self.keyring, self.keyname,
algorithm=self.keyalgorithm)
request.use_edns(self.edns, self.ednsflags, self.payload)
response = None

View file

@ -37,7 +37,7 @@ class RRset(linkcheck.dns.rdataset.Rdataset):
deleting=None):
"""Create a new RRset."""
super(RRset, self).__init__(rdclass, rdtype)
super(RRset, self).__init__(rdclass, rdtype, covers)
self.name = name
self.deleting = deleting
@ -125,9 +125,9 @@ def from_text_list(name, ttl, rdclass, rdtype, text_rdatas):
if isinstance(name, basestring):
name = linkcheck.dns.name.from_text(name, None)
if isinstance(rdclass, str):
if isinstance(rdclass, basestring):
rdclass = linkcheck.dns.rdataclass.from_text(rdclass)
if isinstance(rdtype, str):
if isinstance(rdtype, basestring):
rdtype = linkcheck.dns.rdatatype.from_text(rdtype)
r = RRset(name, rdclass, rdtype)
r.update_ttl(ttl)

View file

@ -18,6 +18,7 @@
import hmac
import struct
import sys
import linkcheck.dns.exception
import linkcheck.dns.rdataclass
@ -51,7 +52,16 @@ class PeerBadTruncation(PeerError):
"""Raised if the peer didn't like amount of truncation in the TSIG we sent"""
pass
default_algorithm = "HMAC-MD5.SIG-ALG.REG.INT"
# TSIG Algorithms
HMAC_MD5 = linkcheck.dns.name.from_text("HMAC-MD5.SIG-ALG.REG.INT")
HMAC_SHA1 = linkcheck.dns.name.from_text("hmac-sha1")
HMAC_SHA224 = linkcheck.dns.name.from_text("hmac-sha224")
HMAC_SHA256 = linkcheck.dns.name.from_text("hmac-sha256")
HMAC_SHA384 = linkcheck.dns.name.from_text("hmac-sha384")
HMAC_SHA512 = linkcheck.dns.name.from_text("hmac-sha512")
default_algorithm = HMAC_MD5
BADSIG = 16
BADKEY = 17
@ -168,25 +178,20 @@ def validate(wire, keyname, secret, now, request_mac, tsig_start, tsig_rdata,
raise BadSignature
return ctx
def get_algorithm(algorithm):
"""Returns the wire format string and the hash module to use for the
specified TSIG algorithm
_hashes = None
@rtype: (string, hash constructor)
@raises NotImplementedError: I{algorithm} is not supported
"""
hashes = {}
def _setup_hashes():
global _hashes
_hashes = {}
try:
import hashlib
hashes[linkcheck.dns.name.from_text('hmac-sha224')] = hashlib.sha224
hashes[linkcheck.dns.name.from_text('hmac-sha256')] = hashlib.sha256
hashes[linkcheck.dns.name.from_text('hmac-sha384')] = hashlib.sha384
hashes[linkcheck.dns.name.from_text('hmac-sha512')] = hashlib.sha512
hashes[linkcheck.dns.name.from_text('hmac-sha1')] = hashlib.sha1
hashes[linkcheck.dns.name.from_text('HMAC-MD5.SIG-ALG.REG.INT')] = hashlib.md5
_hashes[HMAC_SHA224] = hashlib.sha224
_hashes[HMAC_SHA256] = hashlib.sha256
_hashes[HMAC_SHA384] = hashlib.sha384
_hashes[HMAC_SHA512] = hashlib.sha512
_hashes[HMAC_SHA1] = hashlib.sha1
_hashes[HMAC_MD5] = hashlib.md5
import sys
if sys.hexversion < 0x02050000:
# hashlib doesn't conform to PEP 247: API for
# Cryptographic Hash Functions, which hmac before python
@ -199,19 +204,36 @@ def get_algorithm(algorithm):
def new(self, *args, **kwargs):
return self.basehash(*args, **kwargs)
for name in hashes:
hashes[name] = HashlibWrapper(hashes[name])
for name in _hashes:
_hashes[name] = HashlibWrapper(_hashes[name])
except ImportError:
import md5, sha
hashes[linkcheck.dns.name.from_text('HMAC-MD5.SIG-ALG.REG.INT')] = md5.md5
hashes[linkcheck.dns.name.from_text('hmac-sha1')] = sha.sha
_hashes[HMAC_MD5] = md5
_hashes[HMAC_SHA1] = sha
if isinstance(algorithm, (str, unicode)):
def get_algorithm(algorithm):
"""Returns the wire format string and the hash module to use for the
specified TSIG algorithm
@rtype: (string, hash constructor)
@raises NotImplementedError: I{algorithm} is not supported
"""
global _hashes
if _hashes is None:
_setup_hashes()
if isinstance(algorithm, basestring):
algorithm = linkcheck.dns.name.from_text(algorithm)
if algorithm in hashes:
return (algorithm.to_digestable(), hashes[algorithm])
if sys.hexversion < 0x02050200 and \
(algorithm == HMAC_SHA384 or algorithm == HMAC_SHA512):
raise NotImplementedError("TSIG algorithm " + str(algorithm) +
" requires Python 2.5.2 or later")
raise NotImplementedError("TSIG algorithm " + str(algorithm) +
" is not supported")
try:
return (algorithm.to_digestable(), _hashes[algorithm])
except KeyError:
raise NotImplementedError("TSIG algorithm " + str(algorithm) +
" is not supported")

View file

@ -22,6 +22,7 @@ import linkcheck.dns.opcode
import linkcheck.dns.rdata
import linkcheck.dns.rdataclass
import linkcheck.dns.rdataset
import linkcheck.dns.tsig
class Update(linkcheck.dns.message.Message):
def __init__(self, zone, rdclass=linkcheck.dns.rdataclass.IN, keyring=None,
@ -43,7 +44,10 @@ class Update(linkcheck.dns.message.Message):
they know the keyring contains only one key.
@type keyname: linkcheck.dns.name.Name or string
@param keyalgorithm: The TSIG algorithm to use; defaults to
linkcheck.dns.tsig.default_algorithm
linkcheck.dns.tsig.default_algorithm. Constants for TSIG algorithms are defined
in linkcheck.dns.tsig, and the currently implemented algorithms are
HMAC_MD5, HMAC_SHA1, HMAC_SHA224, HMAC_SHA256, HMAC_SHA384, and
HMAC_SHA512.
@type keyalgorithm: string
"""
super(Update, self).__init__()
@ -57,7 +61,7 @@ class Update(linkcheck.dns.message.Message):
self.find_rrset(self.question, self.origin, rdclass, linkcheck.dns.rdatatype.SOA,
create=True, force_unique=True)
if not keyring is None:
self.use_tsig(keyring, keyname, keyalgorithm)
self.use_tsig(keyring, keyname, algorithm=keyalgorithm)
def _add_rr(self, name, ttl, rd, deleting=None, section=None):
"""Add a single RR to the update section."""
@ -100,7 +104,7 @@ class Update(linkcheck.dns.message.Message):
self._add_rr(name, ttl, rd, section=section)
else:
rdtype = args.pop(0)
if isinstance(rdtype, str):
if isinstance(rdtype, basestring):
rdtype = linkcheck.dns.rdatatype.from_text(rdtype)
if replace:
self.delete(name, rdtype)
@ -149,7 +153,7 @@ class Update(linkcheck.dns.message.Message):
self._add_rr(name, 0, rd, linkcheck.dns.rdataclass.NONE)
else:
rdtype = args.pop(0)
if isinstance(rdtype, str):
if isinstance(rdtype, basestring):
rdtype = linkcheck.dns.rdatatype.from_text(rdtype)
if len(args) == 0:
rrset = self.find_rrset(self.authority, name,
@ -207,7 +211,7 @@ class Update(linkcheck.dns.message.Message):
self._add(False, self.answer, name, *args)
else:
rdtype = args[0]
if isinstance(rdtype, str):
if isinstance(rdtype, basestring):
rdtype = linkcheck.dns.rdatatype.from_text(rdtype)
rrset = self.find_rrset(self.answer, name,
linkcheck.dns.rdataclass.ANY, rdtype,

View file

@ -235,9 +235,9 @@ class Zone(object):
"""
name = self._validate_name(name)
if isinstance(rdtype, str):
if isinstance(rdtype, basestring):
rdtype = linkcheck.dns.rdatatype.from_text(rdtype)
if isinstance(covers, str):
if isinstance(covers, basestring):
covers = linkcheck.dns.rdatatype.from_text(covers)
node = self.find_node(name, create)
return node.find_rdataset(self.rdclass, rdtype, covers, create)
@ -298,9 +298,9 @@ class Zone(object):
"""
name = self._validate_name(name)
if isinstance(rdtype, str):
if isinstance(rdtype, basestring):
rdtype = linkcheck.dns.rdatatype.from_text(rdtype)
if isinstance(covers, str):
if isinstance(covers, basestring):
covers = linkcheck.dns.rdatatype.from_text(covers)
node = self.get_node(name)
if not node is None:
@ -361,9 +361,9 @@ class Zone(object):
"""
name = self._validate_name(name)
if isinstance(rdtype, str):
if isinstance(rdtype, basestring):
rdtype = linkcheck.dns.rdatatype.from_text(rdtype)
if isinstance(covers, str):
if isinstance(covers, basestring):
covers = linkcheck.dns.rdatatype.from_text(covers)
rdataset = self.nodes[name].find_rdataset(self.rdclass, rdtype, covers)
rrset = linkcheck.dns.rrset.RRset(name, self.rdclass, rdtype, covers)
@ -417,9 +417,9 @@ class Zone(object):
@type covers: int or string
"""
if isinstance(rdtype, str):
if isinstance(rdtype, basestring):
rdtype = linkcheck.dns.rdatatype.from_text(rdtype)
if isinstance(covers, str):
if isinstance(covers, basestring):
covers = linkcheck.dns.rdatatype.from_text(covers)
for (name, node) in self.iteritems():
for rds in node:
@ -440,9 +440,9 @@ class Zone(object):
@type covers: int or string
"""
if isinstance(rdtype, str):
if isinstance(rdtype, basestring):
rdtype = linkcheck.dns.rdatatype.from_text(rdtype)
if isinstance(covers, str):
if isinstance(covers, basestring):
covers = linkcheck.dns.rdatatype.from_text(covers)
for (name, node) in self.iteritems():
for rds in node: