diff --git a/linkcheck/dns/dnssec.py b/linkcheck/dns/dnssec.py index 9f80233a..448ba1c7 100644 --- a/linkcheck/dns/dnssec.py +++ b/linkcheck/dns/dnssec.py @@ -16,6 +16,11 @@ """Common DNSSEC-related functions and constants.""" +import linkcheck.dns.name +import linkcheck.dns.rdata +import linkcheck.dns.rdatatype +import linkcheck.dns.rdataclass + RSAMD5 = 1 DH = 2 DSA = 3 @@ -71,3 +76,44 @@ def algorithm_to_text(value): if text is None: text = str(value) return text + +def _to_rdata(record): + from cStringIO import StringIO + s = StringIO() + record.to_wire(s) + return s.getvalue() + +def key_id(key): + rdata = _to_rdata(key) + if key.algorithm == RSAMD5: + return (ord(rdata[-3]) << 8) + ord(rdata[-2]) + else: + total = 0 + for i in range(len(rdata) / 2): + total += (ord(rdata[2 * i]) << 8) + ord(rdata[2 * i + 1]) + if len(rdata) % 2 != 0: + total += ord(rdata[len(rdata) - 1]) << 8 + total += ((total >> 16) & 0xffff); + return total & 0xffff + +def make_ds(name, key, algorithm): + import hashlib + import struct + if algorithm.upper() == 'SHA1': + dsalg = 1 + hash = hashlib.sha1() + elif algorithm.upper() == 'SHA256': + dsalg = 2 + hash = hashlib.sha256() + else: + raise ValueError, 'unsupported algorithm "%s"' % algorithm + + if isinstance(name, basestring): + name = linkcheck.dns.name.from_text(name) + hash.update(name.canonicalize().to_wire()) + hash.update(_to_rdata(key)) + digest = hash.digest() + + dsrdata = struct.pack("!HBB", key_id(key), key.algorithm, dsalg) + digest + return linkcheck.dns.rdata.from_wire(linkcheck.dns.rdataclass.IN, + linkcheck.dns.rdatatype.DS, dsrdata, 0, len(dsrdata)) diff --git a/linkcheck/dns/message.py b/linkcheck/dns/message.py index 6abcf4ad..58842ccf 100644 --- a/linkcheck/dns/message.py +++ b/linkcheck/dns/message.py @@ -21,6 +21,7 @@ import struct import sys import time +import linkcheck.dns.edns import linkcheck.dns.exception import linkcheck.dns.flags import linkcheck.dns.name @@ -95,8 +96,11 @@ class Message(object): @ivar request_mac: The TSIG MAC of the request message associated with this message; used when validating TSIG signatures. @see: RFC 2845 for more information on TSIG fields. - @ivar keyalgorithm: The TSIG key algorithm to use. The default is - linkcheck.dns.tsig.default_algorithm. + @ivar keyalgorithm: The TSIG algorithm to use; defaults to + linkcheck.dns.tsig.default_algorithm. Constants for TSIG algorithms are defined + in linkcheck.dns.tsig, and the currently implemented algorithms are + HMAC_MD5, HMAC_SHA1, HMAC_SHA224, HMAC_SHA256, HMAC_SHA384, and + HMAC_SHA512. @type keyalgorithm: string @type request_mac: string @ivar fudge: TSIG time fudge; default is 300 seconds. @@ -1036,9 +1040,9 @@ def make_query(qname, rdtype, rdclass = linkcheck.dns.rdataclass.IN, if isinstance(qname, basestring): qname = linkcheck.dns.name.from_text(qname) - if isinstance(rdtype, str): + if isinstance(rdtype, basestring): rdtype = linkcheck.dns.rdatatype.from_text(rdtype) - if isinstance(rdclass, str): + if isinstance(rdclass, basestring): rdclass = linkcheck.dns.rdataclass.from_text(rdclass) m = Message() m.flags |= linkcheck.dns.flags.RD diff --git a/linkcheck/dns/query.py b/linkcheck/dns/query.py index 27acf12e..257916bf 100644 --- a/linkcheck/dns/query.py +++ b/linkcheck/dns/query.py @@ -313,7 +313,7 @@ def xfr (where, zone, rdtype=linkcheck.dns.rdatatype.AXFR, if isinstance(zone, basestring): zone = linkcheck.dns.name.from_text(zone) - if isinstance(rdtype, str): + if isinstance(rdtype, basestring): rdtype = linkcheck.dns.rdatatype.from_text(rdtype) q = linkcheck.dns.message.make_query(zone, rdtype, rdclass) if rdtype == linkcheck.dns.rdatatype.IXFR: diff --git a/linkcheck/dns/rdataset.py b/linkcheck/dns/rdataset.py index 7a4def69..4f96649b 100644 --- a/linkcheck/dns/rdataset.py +++ b/linkcheck/dns/rdataset.py @@ -278,9 +278,9 @@ def from_text_list(rdclass, rdtype, ttl, text_rdatas): @rtype: linkcheck.dns.rdataset.Rdataset object """ - if isinstance(rdclass, str): + if isinstance(rdclass, basestring): rdclass = linkcheck.dns.rdataclass.from_text(rdclass) - if isinstance(rdtype, str): + if isinstance(rdtype, basestring): rdtype = linkcheck.dns.rdatatype.from_text(rdtype) r = Rdataset(rdclass, rdtype) r.update_ttl(ttl) diff --git a/linkcheck/dns/resolver.py b/linkcheck/dns/resolver.py index d62143c6..b53057e9 100644 --- a/linkcheck/dns/resolver.py +++ b/linkcheck/dns/resolver.py @@ -654,9 +654,9 @@ class Resolver(object): if isinstance(qname, basestring): qname = linkcheck.dns.name.from_text(qname, None) - if isinstance(rdtype, str): + if isinstance(rdtype, basestring): rdtype = linkcheck.dns.rdatatype.from_text(rdtype) - if isinstance(rdclass, str): + if isinstance(rdclass, basestring): rdclass = linkcheck.dns.rdataclass.from_text(rdclass) qnames_to_try = [] if qname.is_absolute(): @@ -678,7 +678,8 @@ class Resolver(object): return answer request = linkcheck.dns.message.make_query(qname, rdtype, rdclass) if not self.keyname is None: - request.use_tsig(self.keyring, self.keyname, self.keyalgorithm) + request.use_tsig(self.keyring, self.keyname, + algorithm=self.keyalgorithm) request.use_edns(self.edns, self.ednsflags, self.payload) response = None diff --git a/linkcheck/dns/rrset.py b/linkcheck/dns/rrset.py index e985466a..9ad4bf82 100644 --- a/linkcheck/dns/rrset.py +++ b/linkcheck/dns/rrset.py @@ -37,7 +37,7 @@ class RRset(linkcheck.dns.rdataset.Rdataset): deleting=None): """Create a new RRset.""" - super(RRset, self).__init__(rdclass, rdtype) + super(RRset, self).__init__(rdclass, rdtype, covers) self.name = name self.deleting = deleting @@ -125,9 +125,9 @@ def from_text_list(name, ttl, rdclass, rdtype, text_rdatas): if isinstance(name, basestring): name = linkcheck.dns.name.from_text(name, None) - if isinstance(rdclass, str): + if isinstance(rdclass, basestring): rdclass = linkcheck.dns.rdataclass.from_text(rdclass) - if isinstance(rdtype, str): + if isinstance(rdtype, basestring): rdtype = linkcheck.dns.rdatatype.from_text(rdtype) r = RRset(name, rdclass, rdtype) r.update_ttl(ttl) diff --git a/linkcheck/dns/tsig.py b/linkcheck/dns/tsig.py index c25a1682..41a1bd41 100644 --- a/linkcheck/dns/tsig.py +++ b/linkcheck/dns/tsig.py @@ -18,6 +18,7 @@ import hmac import struct +import sys import linkcheck.dns.exception import linkcheck.dns.rdataclass @@ -51,7 +52,16 @@ class PeerBadTruncation(PeerError): """Raised if the peer didn't like amount of truncation in the TSIG we sent""" pass -default_algorithm = "HMAC-MD5.SIG-ALG.REG.INT" +# TSIG Algorithms + +HMAC_MD5 = linkcheck.dns.name.from_text("HMAC-MD5.SIG-ALG.REG.INT") +HMAC_SHA1 = linkcheck.dns.name.from_text("hmac-sha1") +HMAC_SHA224 = linkcheck.dns.name.from_text("hmac-sha224") +HMAC_SHA256 = linkcheck.dns.name.from_text("hmac-sha256") +HMAC_SHA384 = linkcheck.dns.name.from_text("hmac-sha384") +HMAC_SHA512 = linkcheck.dns.name.from_text("hmac-sha512") + +default_algorithm = HMAC_MD5 BADSIG = 16 BADKEY = 17 @@ -168,25 +178,20 @@ def validate(wire, keyname, secret, now, request_mac, tsig_start, tsig_rdata, raise BadSignature return ctx -def get_algorithm(algorithm): - """Returns the wire format string and the hash module to use for the - specified TSIG algorithm +_hashes = None - @rtype: (string, hash constructor) - @raises NotImplementedError: I{algorithm} is not supported - """ - - hashes = {} +def _setup_hashes(): + global _hashes + _hashes = {} try: import hashlib - hashes[linkcheck.dns.name.from_text('hmac-sha224')] = hashlib.sha224 - hashes[linkcheck.dns.name.from_text('hmac-sha256')] = hashlib.sha256 - hashes[linkcheck.dns.name.from_text('hmac-sha384')] = hashlib.sha384 - hashes[linkcheck.dns.name.from_text('hmac-sha512')] = hashlib.sha512 - hashes[linkcheck.dns.name.from_text('hmac-sha1')] = hashlib.sha1 - hashes[linkcheck.dns.name.from_text('HMAC-MD5.SIG-ALG.REG.INT')] = hashlib.md5 + _hashes[HMAC_SHA224] = hashlib.sha224 + _hashes[HMAC_SHA256] = hashlib.sha256 + _hashes[HMAC_SHA384] = hashlib.sha384 + _hashes[HMAC_SHA512] = hashlib.sha512 + _hashes[HMAC_SHA1] = hashlib.sha1 + _hashes[HMAC_MD5] = hashlib.md5 - import sys if sys.hexversion < 0x02050000: # hashlib doesn't conform to PEP 247: API for # Cryptographic Hash Functions, which hmac before python @@ -199,19 +204,36 @@ def get_algorithm(algorithm): def new(self, *args, **kwargs): return self.basehash(*args, **kwargs) - for name in hashes: - hashes[name] = HashlibWrapper(hashes[name]) + for name in _hashes: + _hashes[name] = HashlibWrapper(_hashes[name]) except ImportError: import md5, sha - hashes[linkcheck.dns.name.from_text('HMAC-MD5.SIG-ALG.REG.INT')] = md5.md5 - hashes[linkcheck.dns.name.from_text('hmac-sha1')] = sha.sha + _hashes[HMAC_MD5] = md5 + _hashes[HMAC_SHA1] = sha - if isinstance(algorithm, (str, unicode)): +def get_algorithm(algorithm): + """Returns the wire format string and the hash module to use for the + specified TSIG algorithm + + @rtype: (string, hash constructor) + @raises NotImplementedError: I{algorithm} is not supported + """ + + global _hashes + if _hashes is None: + _setup_hashes() + + if isinstance(algorithm, basestring): algorithm = linkcheck.dns.name.from_text(algorithm) - if algorithm in hashes: - return (algorithm.to_digestable(), hashes[algorithm]) + if sys.hexversion < 0x02050200 and \ + (algorithm == HMAC_SHA384 or algorithm == HMAC_SHA512): + raise NotImplementedError("TSIG algorithm " + str(algorithm) + + " requires Python 2.5.2 or later") - raise NotImplementedError("TSIG algorithm " + str(algorithm) + - " is not supported") + try: + return (algorithm.to_digestable(), _hashes[algorithm]) + except KeyError: + raise NotImplementedError("TSIG algorithm " + str(algorithm) + + " is not supported") diff --git a/linkcheck/dns/update.py b/linkcheck/dns/update.py index 17e49eb3..ad930b64 100644 --- a/linkcheck/dns/update.py +++ b/linkcheck/dns/update.py @@ -22,6 +22,7 @@ import linkcheck.dns.opcode import linkcheck.dns.rdata import linkcheck.dns.rdataclass import linkcheck.dns.rdataset +import linkcheck.dns.tsig class Update(linkcheck.dns.message.Message): def __init__(self, zone, rdclass=linkcheck.dns.rdataclass.IN, keyring=None, @@ -43,7 +44,10 @@ class Update(linkcheck.dns.message.Message): they know the keyring contains only one key. @type keyname: linkcheck.dns.name.Name or string @param keyalgorithm: The TSIG algorithm to use; defaults to - linkcheck.dns.tsig.default_algorithm + linkcheck.dns.tsig.default_algorithm. Constants for TSIG algorithms are defined + in linkcheck.dns.tsig, and the currently implemented algorithms are + HMAC_MD5, HMAC_SHA1, HMAC_SHA224, HMAC_SHA256, HMAC_SHA384, and + HMAC_SHA512. @type keyalgorithm: string """ super(Update, self).__init__() @@ -57,7 +61,7 @@ class Update(linkcheck.dns.message.Message): self.find_rrset(self.question, self.origin, rdclass, linkcheck.dns.rdatatype.SOA, create=True, force_unique=True) if not keyring is None: - self.use_tsig(keyring, keyname, keyalgorithm) + self.use_tsig(keyring, keyname, algorithm=keyalgorithm) def _add_rr(self, name, ttl, rd, deleting=None, section=None): """Add a single RR to the update section.""" @@ -100,7 +104,7 @@ class Update(linkcheck.dns.message.Message): self._add_rr(name, ttl, rd, section=section) else: rdtype = args.pop(0) - if isinstance(rdtype, str): + if isinstance(rdtype, basestring): rdtype = linkcheck.dns.rdatatype.from_text(rdtype) if replace: self.delete(name, rdtype) @@ -149,7 +153,7 @@ class Update(linkcheck.dns.message.Message): self._add_rr(name, 0, rd, linkcheck.dns.rdataclass.NONE) else: rdtype = args.pop(0) - if isinstance(rdtype, str): + if isinstance(rdtype, basestring): rdtype = linkcheck.dns.rdatatype.from_text(rdtype) if len(args) == 0: rrset = self.find_rrset(self.authority, name, @@ -207,7 +211,7 @@ class Update(linkcheck.dns.message.Message): self._add(False, self.answer, name, *args) else: rdtype = args[0] - if isinstance(rdtype, str): + if isinstance(rdtype, basestring): rdtype = linkcheck.dns.rdatatype.from_text(rdtype) rrset = self.find_rrset(self.answer, name, linkcheck.dns.rdataclass.ANY, rdtype, diff --git a/linkcheck/dns/zone.py b/linkcheck/dns/zone.py index dca303af..f6b2ba05 100644 --- a/linkcheck/dns/zone.py +++ b/linkcheck/dns/zone.py @@ -235,9 +235,9 @@ class Zone(object): """ name = self._validate_name(name) - if isinstance(rdtype, str): + if isinstance(rdtype, basestring): rdtype = linkcheck.dns.rdatatype.from_text(rdtype) - if isinstance(covers, str): + if isinstance(covers, basestring): covers = linkcheck.dns.rdatatype.from_text(covers) node = self.find_node(name, create) return node.find_rdataset(self.rdclass, rdtype, covers, create) @@ -298,9 +298,9 @@ class Zone(object): """ name = self._validate_name(name) - if isinstance(rdtype, str): + if isinstance(rdtype, basestring): rdtype = linkcheck.dns.rdatatype.from_text(rdtype) - if isinstance(covers, str): + if isinstance(covers, basestring): covers = linkcheck.dns.rdatatype.from_text(covers) node = self.get_node(name) if not node is None: @@ -361,9 +361,9 @@ class Zone(object): """ name = self._validate_name(name) - if isinstance(rdtype, str): + if isinstance(rdtype, basestring): rdtype = linkcheck.dns.rdatatype.from_text(rdtype) - if isinstance(covers, str): + if isinstance(covers, basestring): covers = linkcheck.dns.rdatatype.from_text(covers) rdataset = self.nodes[name].find_rdataset(self.rdclass, rdtype, covers) rrset = linkcheck.dns.rrset.RRset(name, self.rdclass, rdtype, covers) @@ -417,9 +417,9 @@ class Zone(object): @type covers: int or string """ - if isinstance(rdtype, str): + if isinstance(rdtype, basestring): rdtype = linkcheck.dns.rdatatype.from_text(rdtype) - if isinstance(covers, str): + if isinstance(covers, basestring): covers = linkcheck.dns.rdatatype.from_text(covers) for (name, node) in self.iteritems(): for rds in node: @@ -440,9 +440,9 @@ class Zone(object): @type covers: int or string """ - if isinstance(rdtype, str): + if isinstance(rdtype, basestring): rdtype = linkcheck.dns.rdatatype.from_text(rdtype) - if isinstance(covers, str): + if isinstance(covers, basestring): covers = linkcheck.dns.rdatatype.from_text(covers) for (name, node) in self.iteritems(): for rds in node: