git-svn-id: https://linkchecker.svn.sourceforge.net/svnroot/linkchecker/trunk/linkchecker@1404 e7d03fd6-7b0d-0410-9947-9c21f3af8025
This commit is contained in:
calvin 2004-07-30 08:11:40 +00:00
parent c9261c96c2
commit b48c84fe21
8 changed files with 0 additions and 1148 deletions

View file

@ -1,230 +0,0 @@
# -*- coding: iso-8859-1 -*-
"""
This file is part of the pydns project.
Homepage: http://pydns.sourceforge.net
This code is covered by the standard Python License.
Base functionality. Request and Response classes, that sort of thing.
"""
import select
import socket
import string
import types
import time
import asyncore
import os
class DNSError (Exception):
pass
import Lib
import Type
import Class
import Opcode
defaults = {
'protocol':'udp',
'port':53,
'opcode':Opcode.QUERY,
'qtype':Type.A,
'rd':1,
'timing':1,
'timeout': 30,
}
class DnsRequest (object):
""" high level Request object """
def __init__ (self, name, config, **args):
self.donefunc = None
self.async = None
self.defaults = {}
self.argparse(name, config, args)
self.defaults = self.args
def argparse (self, name, config, args):
self.name = name
self.config = config
args['server'] = self.config.nameservers[0]
for i in defaults.keys():
if not args.has_key(i):
if self.defaults.has_key(i):
args[i]=self.defaults[i]
else:
args[i]=defaults[i]
self.args=args
def socketInit (self, a, b):
self.s = socket.socket(a,b)
def processUDPReply (self):
if self.args['timeout'] > 0:
r,w,e = select.select([self.s],[],[],self.args['timeout'])
if not len(r):
raise DNSError, 'Timeout'
self.reply = self.s.recv(1024)
self.time_finish=time.time()
self.args['server']=self.ns
return self.processReply()
def processTCPReply (self):
self.f = self.s.makefile('r')
header = self.f.read(2)
if len(header) < 2:
raise DNSError,'EOF'
count = Lib.unpack16bit(header)
self.reply = self.f.read(count)
if len(self.reply) != count:
raise DNSError,'incomplete reply'
self.time_finish=time.time()
self.args['server']=self.ns
return self.processReply()
def processReply (self):
self.args['elapsed']=(self.time_finish-self.time_start)*1000
u = Lib.Munpacker(self.reply)
r = Lib.DnsResult(u,self.args)
r.args = self.args
#self.args=None # mark this DnsRequest object as used.
return r
#### TODO TODO TODO ####
# if protocol == 'tcp' and qtype == Type.AXFR:
# while 1:
# header = f.read(2)
# if len(header) < 2:
# print '========== EOF =========='
# break
# count = Lib.unpack16bit(header)
# if not count:
# print '========== ZERO COUNT =========='
# break
# print '========== NEXT =========='
# reply = f.read(count)
# if len(reply) != count:
# print '*** Incomplete reply ***'
# break
# u = Lib.Munpacker(reply)
# Lib.dumpM(u)
def conn (self):
self.s.connect((self.ns, self.port))
def req (self):
" needs a refactoring "
#if not self.args:
# raise DNSError,'reinitialize request before reuse'
protocol = self.args['protocol']
self.port = self.args['port']
opcode = self.args['opcode']
rd = self.args['rd']
server=self.args['server']
if type(self.args['qtype']) == types.StringType:
try:
qtype = getattr(Type, string.upper(self.args['qtype']))
except AttributeError:
raise DNSError,'unknown query type'
else:
qtype=self.args['qtype']
qname = self.name
if qtype == Type.AXFR:
print 'Query type AXFR, protocol forced to TCP'
protocol = 'tcp'
#print 'QTYPE %d(%s)' % (qtype, Type.typestr(qtype))
m = Lib.Mpacker()
# jesus. keywords and default args would be good. TODO.
m.addHeader(0,
0, opcode, 0, 0, rd, 0, 0, 0,
1, 0, 0, 0)
m.addQuestion(qname, qtype, Class.IN)
self.request = m.getbuf()
if protocol == 'udp':
self.sendUDPRequest(server)
else:
self.sendTCPRequest(server)
if self.async:
return None
else:
return self.response
def sendUDPRequest (self, server):
"refactor me"
self.response = None
self.socketInit(socket.AF_INET, socket.SOCK_DGRAM)
for self.ns in server:
try:
# TODO. Handle timeouts &c correctly (RFC)
#self.s.connect((self.ns, self.port))
self.conn()
self.time_start=time.time()
if not self.async:
self.s.send(self.request)
self.response=self.processUDPReply()
#except socket.error:
except None:
continue
break
if not self.response:
if not self.async:
raise DNSError,'no working nameservers found'
def sendTCPRequest (self, server):
" do the work of sending a TCP request "
self.response=None
for self.ns in server:
try:
self.socketInit(socket.AF_INET, socket.SOCK_STREAM)
self.time_start=time.time()
self.conn()
self.s.send(Lib.pack16bit(len(self.request))+self.request)
self.s.shutdown(1)
self.response=self.processTCPReply()
except socket.error:
continue
break
if not self.response:
raise DNSError,'no working nameservers found'
class DnsAsyncRequest (DnsRequest, asyncore.dispatcher_with_send):
" an asynchronous request object. out of date, probably broken "
def __init__ (self,*name,**args):
DnsRequest.__init__(self, *name, **args)
asyncore.dispatcher_with_send.__init__(self, *name, **args)
# XXX todo
if args.has_key('done') and args['done']:
self.donefunc = args['done']
else:
self.donefunc=self.showResult
#self.realinit(name,args) # XXX todo
self.async=1
def conn (self):
self.connect((self.ns,self.port))
self.time_start=time.time()
if self.args.has_key('start') and self.args['start']:
asyncore.dispatcher.go(self)
def socketInit (self,a,b):
self.create_socket(a,b)
asyncore.dispatcher.__init__(self)
self.s=self
def handle_read (self):
if self.args['protocol'] == 'udp':
self.response=self.processUDPReply()
if self.donefunc:
self.donefunc(*self)
def handle_connect (self):
self.send(self.request)
def handle_write (self):
pass
def showResult (self,*s):
self.response.show()

View file

@ -1,34 +0,0 @@
# -*- coding: iso-8859-1 -*-
"""
This file is part of the pydns project.
Homepage: http://pydns.sourceforge.net
This code is covered by the standard Python License.
CLASS values (section 3.2.4)
"""
IN = 1 # the Internet
CS = 2 # the CSNET class (Obsolete - used only for examples in
# some obsolete RFCs)
CH = 3 # the CHAOS class. When someone shows me python running on
# a Symbolics Lisp machine, I'll look at implementing this.
HS = 4 # Hesiod [Dyer 87]
# QCLASS values (section 3.2.5)
ANY = 255 # any class
# Construct reverse mapping dictionary
_classmap = {}
for _name in dir():
if not _name.startswith('_'):
_classmap[eval(_name)] = _name
def classstr (klass):
"""return string representation of DNS class"""
return _classmap.get(klass, repr(klass))

View file

@ -1,726 +0,0 @@
# -*- coding: iso-8859-1 -*-
"""
This file is part of the pydns project.
Homepage: http://pydns.sourceforge.net
This code is covered by the standard Python License.
Library code. Largely this is packers and unpackers for various types.
"""
#
# See RFC 1035:
# ------------------------------------------------------------------------
# Network Working Group P. Mockapetris
# Request for Comments: 1035 ISI
# November 1987
# Obsoletes: RFCs 882, 883, 973
#
# DOMAIN NAMES - IMPLEMENTATION AND SPECIFICATION
# ------------------------------------------------------------------------
import types
import time
import Type
import Class
import Opcode
import Status
class DNSError (Exception):
"""basic DNS error class"""
pass
class UnpackError (DNSError):
"""DNS packet unpacking error"""
pass
class PackError (DNSError):
"""DNS packet packing error"""
pass
# Low-level 16 and 32 bit integer packing and unpacking
from struct import pack as struct_pack
from struct import unpack as struct_unpack
def pack16bit (n):
return struct_pack('!H', n)
def pack32bit (n):
return struct_pack('!L', n)
def pack128bit (n):
return struct_pack('!LLLL', n)
def unpack16bit (s):
return struct_unpack('!H', s)[0]
def unpack32bit (s):
return struct_unpack('!L', s)[0]
def unpack128bit (s):
return struct_unpack('!LLLL', s)[0]
def addr2bin (addr):
if type(addr) == type(0):
return addr
bytes = addr.split('.')
if len(bytes) != 4: raise ValueError, 'bad IP address'
n = 0L
for byte in bytes:
n = n<<8 | int(byte)
return n
def bin2addr (n):
return '%d.%d.%d.%d' % ((n>>24)&0xFF, (n>>16)&0xFF,
(n>>8)&0xFF, n&0xFF)
# Packing class
class Packer (object):
" packer base class. supports basic byte/16bit/32bit/addr/string/name "
def __init__ (self):
self.buf = ''
self.index = {}
def getbuf (self):
return self.buf
def addbyte (self, c):
if len(c) != 1: raise TypeError, 'one character expected'
self.buf += c
def addbytes (self, bytes):
self.buf += bytes
def add16bit (self, n):
self.buf += pack16bit(n)
def add32bit (self, n):
self.buf += pack32bit(n)
def addaddr (self, addr):
n = addr2bin(addr)
self.buf += pack32bit(n)
def addaddr6 (self, addr):
n = addr2bin(addr)
self.buf += pack128bit(n)
def addstring (self, s):
if len(s) > 255:
raise ValueError, "Can't encode string of length "+ \
"%s (> 255)"%(len(s))
self.addbyte(chr(len(s)))
self.addbytes(s)
def addname (self, name):
"""Domain name packing (section 4.1.4)
Add a domain name to the buffer, possibly using pointers.
The case of the first occurrence of a name is preserved.
Redundant dots are ignored.
"""
lst = []
for label in name.split('.'):
if label:
if len(label) > 63:
raise PackError, 'label too long'
lst.append(label)
keys = []
for i in range(len(lst)):
key = '.'.join(lst[i:]).upper()
keys.append(key)
if self.index.has_key(key):
pointer = self.index[key]
break
else:
i = len(lst)
pointer = None
# Do it into temporaries first so exceptions don't
# mess up self.index and self.buf
buf = ''
offset = len(self.buf)
index = []
for j in range(i):
label = lst[j]
n = len(label)
if offset + len(buf) < 0x3FFF:
index.append((keys[j], offset + len(buf)))
else:
print 'DNS.Lib.Packer.addname:',
print 'warning: pointer too big'
buf += (chr(n) + label)
if pointer:
buf += pack16bit(pointer | 0xC000)
else:
buf += '\0'
self.buf += buf
for key, value in index:
self.index[key] = value
def dump (self):
"""print string dump of packer data"""
keys = self.index.keys()
keys.sort()
print '-'*40
for key in keys:
print '%20s %3d' % (key, self.index[key])
print '-'*40
space = 1
for i in range(0, len(self.buf)+1, 2):
if self.buf[i:i+2] == '**':
if not space: print
space = 1
continue
space = 0
print '%4d' % i,
for c in self.buf[i:i+2]:
if ' ' < c < '\177':
print ' %c' % c,
else:
print '%2d' % ord(c),
print
print '-'*40
# Unpacking class
class Unpacker (object):
def __init__ (self, buf):
self.buf = buf
self.offset = 0
def getbyte (self):
if self.offset > len(self.buf):
raise UnpackError, "Ran off end of data"
c = self.buf[self.offset]
self.offset += 1
return c
def getbytes (self, n):
s = self.buf[self.offset : self.offset + n]
if len(s) != n: raise UnpackError, 'not enough data left'
self.offset +=n
return s
def get16bit (self):
return unpack16bit(self.getbytes(2))
def get32bit (self):
return unpack32bit(self.getbytes(4))
def get128bit (self):
return unpack128bit(self.getbytes(16))
def getaddr (self):
return bin2addr(self.get32bit())
def getaddr6 (self):
return bin2addr(self.get128bit())
def getstring (self):
return self.getbytes(ord(self.getbyte()))
def getname (self):
# Domain name unpacking (section 4.1.4)
c = self.getbyte()
i = ord(c)
if i & 0xC0 == 0xC0:
d = self.getbyte()
j = ord(d)
pointer = ((i<<8) | j) & ~0xC000
save_offset = self.offset
try:
self.offset = pointer
domain = self.getname()
finally:
self.offset = save_offset
return domain
if i == 0:
return ''
domain = self.getbytes(i)
remains = self.getname()
if not remains:
return domain
else:
return domain + '.' + remains
# Pack/unpack RR toplevel format (section 3.2.1)
class RRpacker (Packer):
def __init__ (self):
Packer.__init__(self)
self.rdstart = None
def addRRheader (self, name, type, klass, ttl, *rest):
self.addname(name)
self.add16bit(type)
self.add16bit(klass)
self.add32bit(ttl)
if rest:
if rest[1:]: raise TypeError, 'too many args'
rdlength = rest[0]
else:
rdlength = 0
self.add16bit(rdlength)
self.rdstart = len(self.buf)
def patchrdlength (self):
rdlength = unpack16bit(self.buf[self.rdstart-2:self.rdstart])
if rdlength == len(self.buf) - self.rdstart:
return
rdata = self.buf[self.rdstart:]
save_buf = self.buf
ok = 0
try:
self.buf = self.buf[:self.rdstart-2]
self.add16bit(len(rdata))
self.buf = self.buf + rdata
ok = 1
finally:
if not ok: self.buf = save_buf
def endRR (self):
if self.rdstart is not None:
self.patchrdlength()
self.rdstart = None
def getbuf (self):
if self.rdstart is not None: self.patchrdlength()
return Packer.getbuf(self)
# Standard RRs (section 3.3)
def addCNAME (self, name, klass, ttl, cname):
self.addRRheader(name, Type.CNAME, klass, ttl)
self.addname(cname)
self.endRR()
def addHINFO (self, name, klass, ttl, cpu, os):
self.addRRheader(name, Type.HINFO, klass, ttl)
self.addstring(cpu)
self.addstring(os)
self.endRR()
def addMX (self, name, klass, ttl, preference, exchange):
self.addRRheader(name, Type.MX, klass, ttl)
self.add16bit(preference)
self.addname(exchange)
self.endRR()
def addNS (self, name, klass, ttl, nsdname):
self.addRRheader(name, Type.NS, klass, ttl)
self.addname(nsdname)
self.endRR()
def addPTR (self, name, klass, ttl, ptrdname):
self.addRRheader(name, Type.PTR, klass, ttl)
self.addname(ptrdname)
self.endRR()
def addSOA (self, name, klass, ttl,
mname, rname, serial, refresh, retry, expire, minimum):
self.addRRheader(name, Type.SOA, klass, ttl)
self.addname(mname)
self.addname(rname)
self.add32bit(serial)
self.add32bit(refresh)
self.add32bit(retry)
self.add32bit(expire)
self.add32bit(minimum)
self.endRR()
def addTXT (self, name, klass, ttl, lst):
self.addRRheader(name, Type.TXT, klass, ttl)
if type(lst) is types.StringType:
lst = [lst]
for txtdata in lst:
self.addstring(txtdata)
self.endRR()
# Internet specific RRs (section 3.4) -- class = IN
def addA (self, name, klass, ttl, address):
self.addRRheader(name, Type.A, klass, ttl)
self.addaddr(address)
self.endRR()
def addAAAA (self, name, klass, ttl, address):
self.addRRheader(name, Type.A, klass, ttl)
self.addaddr6(address)
self.endRR()
def addWKS (self, name, ttl, address, protocol, bitmap):
self.addRRheader(name, Type.WKS, Class.IN, ttl)
self.addaddr(address)
self.addbyte(chr(protocol))
self.addbytes(bitmap)
self.endRR()
def addSRV (self):
raise NotImplementedError
def prettyTime (seconds):
if seconds<60:
return seconds,"%d seconds"%(seconds)
if seconds<3600:
return seconds,"%d minutes"%(seconds/60)
if seconds<86400:
return seconds,"%d hours"%(seconds/3600)
if seconds<604800:
return seconds,"%d days"%(seconds/86400)
else:
return seconds,"%d weeks"%(seconds/604800)
class RRunpacker (Unpacker):
def __init__ (self, buf):
Unpacker.__init__(self, buf)
self.rdend = None
def getRRheader (self):
name = self.getname()
rrtype = self.get16bit()
klass = self.get16bit()
ttl = self.get32bit()
rdlength = self.get16bit()
self.rdend = self.offset + rdlength
return (name, rrtype, klass, ttl, rdlength)
def endRR (self):
if self.offset != self.rdend:
raise UnpackError, 'end of RR not reached'
def getCNAMEdata (self):
return self.getname()
def getHINFOdata (self):
return self.getstring(), self.getstring()
def getMXdata (self):
return self.get16bit(), self.getname()
def getNSdata (self):
return self.getname()
def getPTRdata (self):
return self.getname()
def getSOAdata (self):
return self.getname(), \
self.getname(), \
('serial',)+(self.get32bit(),), \
('refresh ',)+prettyTime(self.get32bit()), \
('retry',)+prettyTime(self.get32bit()), \
('expire',)+prettyTime(self.get32bit()), \
('minimum',)+prettyTime(self.get32bit())
def getTXTdata (self):
lst = []
while self.offset != self.rdend:
lst.append(self.getstring())
return lst
def getAdata (self):
return self.getaddr()
def getAAAAdata (self):
return self.getaddr6()
def getWKSdata (self):
address = self.getaddr()
protocol = ord(self.getbyte())
bitmap = self.getbytes(self.rdend - self.offset)
return address, protocol, bitmap
def getSRVdata (self):
"""
_Service._Proto.Name TTL Class SRV Priority Weight Port Target
"""
priority = self.get16bit()
weight = self.get16bit()
port = self.get16bit()
target = self.getname()
#print '***priority, weight, port, target', priority, weight, port, target
return priority, weight, port, target
# Pack/unpack Message Header (section 4.1)
class Hpacker (Packer):
def addHeader (self, rid, qr, opcode, aa, tc, rd, ra, z, rcode,
qdcount, ancount, nscount, arcount):
self.add16bit(rid)
self.add16bit((qr&1)<<15 | (opcode&0xF)<<11 | (aa&1)<<10
| (tc&1)<<9 | (rd&1)<<8 | (ra&1)<<7
| (z&7)<<4 | (rcode&0xF))
self.add16bit(qdcount)
self.add16bit(ancount)
self.add16bit(nscount)
self.add16bit(arcount)
class Hunpacker (Unpacker):
def getHeader (self):
rid = self.get16bit()
flags = self.get16bit()
qr, opcode, aa, tc, rd, ra, z, rcode = (
(flags>>15)&1,
(flags>>11)&0xF,
(flags>>10)&1,
(flags>>9)&1,
(flags>>8)&1,
(flags>>7)&1,
(flags>>4)&7,
(flags>>0)&0xF)
qdcount = self.get16bit()
ancount = self.get16bit()
nscount = self.get16bit()
arcount = self.get16bit()
return (rid, qr, opcode, aa, tc, rd, ra, z, rcode,
qdcount, ancount, nscount, arcount)
# Pack/unpack Question (section 4.1.2)
class Qpacker (Packer):
def addQuestion (self, qname, qtype, qclass):
self.addname(qname)
self.add16bit(qtype)
self.add16bit(qclass)
class Qunpacker (Unpacker):
def getQuestion (self):
return self.getname(), self.get16bit(), self.get16bit()
# Pack/unpack Message(section 4)
# NB the order of the base classes is important for __init__()!
class Mpacker (RRpacker, Qpacker, Hpacker):
pass
class Munpacker (RRunpacker, Qunpacker, Hunpacker):
pass
# Routines to print an unpacker to stdout, for debugging.
# These affect the unpacker's current position!
def dumpM (u):
print 'HEADER:',
(id, qr, opcode, aa, tc, rd, ra, z, rcode,
qdcount, ancount, nscount, arcount) = u.getHeader()
print 'id=%d,' % id,
print 'qr=%d, opcode=%d, aa=%d, tc=%d, rd=%d, ra=%d, z=%d, rcode=%d,' \
% (qr, opcode, aa, tc, rd, ra, z, rcode)
if tc: print '*** response truncated! ***'
if rcode: print '*** nonzero error code! (%d) ***' % rcode
print ' qdcount=%d, ancount=%d, nscount=%d, arcount=%d' \
% (qdcount, ancount, nscount, arcount)
for i in range(qdcount):
print 'QUESTION %d:' % i,
dumpQ(u)
for i in range(ancount):
print 'ANSWER %d:' % i,
dumpRR(u)
for i in range(nscount):
print 'AUTHORITY RECORD %d:' % i,
dumpRR(u)
for i in range(arcount):
print 'ADDITIONAL RECORD %d:' % i,
dumpRR(u)
class DnsResult (object):
def __init__ (self,u,args):
self.header={}
self.questions=[]
self.answers=[]
self.authority=[]
self.additional=[]
self.args=args
self.storeM(u)
def show (self):
print '; <<>> PDG.py 1.0 <<>> %s %s'%(self.args['name'],
self.args['qtype'])
opt=""
if self.args['rd']:
opt=opt+'recurs '
h=self.header
print ';; options: '+opt
print ';; got answer:'
print ';; ->>HEADER<<- opcode %s, status %s, id %d'%(
h['opcode'],h['status'],h['id'])
flags=filter(lambda x,h=h:h[x],('qr','aa','rd','ra','tc'))
print ';; flags: %s; Ques: %d, Ans: %d, Auth: %d, Addit: %d'%(
''.join(flags),h['qdcount'],h['ancount'],h['nscount'],
h['arcount'])
print ';; QUESTIONS:'
for q in self.questions:
print ';; %s, type = %s, class = %s'%(q['qname'],q['qtypestr'],
q['qclassstr'])
print
print ';; ANSWERS:'
for a in self.answers:
print '%-20s %-6r %-6s %s'%(a['name'],a['ttl'],a['typename'],
a['data'])
print
print ';; AUTHORITY RECORDS:'
for a in self.authority:
print '%-20s %-6r %-6s %s'%(a['name'],a['ttl'],a['typename'],
a['data'])
print
print ';; ADDITIONAL RECORDS:'
for a in self.additional:
print '%-20s %-6r %-6s %s'%(a['name'],a['ttl'],a['typename'],
a['data'])
print
if self.args.has_key('elapsed'):
print ';; Total query time: %d msec'%self.args['elapsed']
print ';; To SERVER: %s'%(self.args['server'])
print ';; WHEN: %s'%time.ctime(time.time())
def storeM (self,u):
(self.header['id'], self.header['qr'], self.header['opcode'],
self.header['aa'], self.header['tc'], self.header['rd'],
self.header['ra'], self.header['z'], self.header['rcode'],
self.header['qdcount'], self.header['ancount'],
self.header['nscount'], self.header['arcount']) = u.getHeader()
self.header['opcodestr']=Opcode.opcodestr(self.header['opcode'])
self.header['status']=Status.statusstr(self.header['rcode'])
for i in range(self.header['qdcount']):
#print 'QUESTION %d:' % i,
self.questions.append(self.storeQ(u))
for i in range(self.header['ancount']):
#print 'ANSWER %d:' % i,
self.answers.append(self.storeRR(u))
for i in range(self.header['nscount']):
#print 'AUTHORITY RECORD %d:' % i,
self.authority.append(self.storeRR(u))
for i in range(self.header['arcount']):
#print 'ADDITIONAL RECORD %d:' % i,
self.additional.append(self.storeRR(u))
def storeQ (self,u):
q={}
q['qname'], q['qtype'], q['qclass'] = u.getQuestion()
q['qtypestr']=Type.typestr(q['qtype'])
q['qclassstr']=Class.classstr(q['qclass'])
return q
def storeRR (self,u):
r={}
r['name'],r['type'],r['class'],r['ttl'],r['rdlength'] = u.getRRheader()
r['typename'] = Type.typestr(r['type'])
r['classstr'] = Class.classstr(r['class'])
#print 'name=%s, type=%d(%s), class=%d(%s), ttl=%d' \
# % (name,
# type, typename,
# klass, Class.classstr(class),
# ttl)
mname = 'get%sdata' % r['typename']
if hasattr(u, mname):
r['data']=getattr(u, mname)()
else:
r['data']=u.getbytes(r['rdlength'])
return r
def dumpQ (u):
qname, qtype, qclass = u.getQuestion()
print 'qname=%s, qtype=%d(%s), qclass=%d(%s)' \
% (qname,
qtype, Type.typestr(qtype),
qclass, Class.classstr(qclass))
def dumpRR (u):
name, type, klass, ttl, rdlength = u.getRRheader()
typename = Type.typestr(type)
print 'name=%s, type=%d(%s), class=%d(%s), ttl=%d' \
% (name,
type, typename,
klass, Class.classstr(klass),
ttl)
mname = 'get%sdata' % typename
if hasattr(u, mname):
print ' formatted rdata:', getattr(u, mname)()
else:
print ' binary rdata:', u.getbytes(rdlength)

View file

@ -1,29 +0,0 @@
# -*- coding: iso-8859-1 -*-
"""
This file is part of the pydns project.
Homepage: http://pydns.sourceforge.net
This code is covered by the standard Python License.
Opcode values in message header. RFC 1035, 1996, 2136.
"""
QUERY = 0
IQUERY = 1
STATUS = 2
NOTIFY = 4
UPDATE = 5
# Construct reverse mapping dictionary
_opcodemap = {}
for _name in dir():
if not _name.startswith('_'):
_opcodemap[eval(_name)] = _name
def opcodestr (opcode):
"""return string representation of DNS opcode"""
return _opcodemap.get(opcode, repr(opcode))

View file

@ -1,40 +0,0 @@
# -*- coding: iso-8859-1 -*-
"""
This file is part of the pydns project.
Homepage: http://pydns.sourceforge.net
This code is covered by the standard Python License.
Status values in message header
"""
NOERROR = 0 # No Error [RFC 1035]
FORMERR = 1 # Format Error [RFC 1035]
SERVFAIL = 2 # Server Failure [RFC 1035]
NXDOMAIN = 3 # Non-Existent Domain [RFC 1035]
NOTIMP = 4 # Not Implemented [RFC 1035]
REFUSED = 5 # Query Refused [RFC 1035]
YXDOMAIN = 6 # Name Exists when it should not [RFC 2136]
YXRRSET = 7 # RR Set Exists when it should not [RFC 2136]
NXRRSET = 8 # RR Set that should exist does not [RFC 2136]
NOTAUTH = 9 # Server Not Authoritative for zone [RFC 2136]
NOTZONE = 10 # Name not contained in zone [RFC 2136]
BADVERS = 16 # Bad OPT Version [RFC 2671]
BADSIG = 16 # TSIG Signature Failure [RFC 2845]
BADKEY = 17 # Key not recognized [RFC 2845]
BADTIME = 18 # Signature out of time window [RFC 2845]
BADMODE = 19 # Bad TKEY Mode [RFC 2930]
BADNAME = 20 # Duplicate key name [RFC 2930]
BADALG = 21 # Algorithm not supported [RFC 2930]
# Construct reverse mapping dictionary
_statusmap = {}
for _name in dir():
if not _name.startswith('_'):
_statusmap[eval(_name)] = _name
def statusstr (status):
"""return string representation of DNS status"""
return _statusmap.get(status, repr(status))

View file

@ -1,52 +0,0 @@
# -*- coding: iso-8859-1 -*-
"""
This file is part of the pydns project.
Homepage: http://pydns.sourceforge.net
This code is covered by the standard Python License.
TYPE values (section 3.2.2)
"""
A = 1 # a host address
NS = 2 # an authoritative name server
MD = 3 # a mail destination (Obsolete - use MX)
MF = 4 # a mail forwarder (Obsolete - use MX)
CNAME = 5 # the canonical name for an alias
SOA = 6 # marks the start of a zone of authority
MB = 7 # a mailbox domain name (EXPERIMENTAL)
MG = 8 # a mail group member (EXPERIMENTAL)
MR = 9 # a mail rename domain name (EXPERIMENTAL)
NULL = 10 # a null RR (EXPERIMENTAL)
WKS = 11 # a well known service description
PTR = 12 # a domain name pointer
HINFO = 13 # host information
MINFO = 14 # mailbox or mail list information
MX = 15 # mail exchange
TXT = 16 # text strings
AAAA = 28 # IPv6 AAAA records (RFC 1886)
SRV = 33 # DNS RR for specifying the location of services (RFC 2782)
# Additional TYPE values from host.c source
UNAME = 110
MP = 240
# QTYPE values (section 3.2.3)
AXFR = 252 # A request for a transfer of an entire zone
MAILB = 253 # A request for mailbox-related records (MB, MG or MR)
MAILA = 254 # A request for mail agent RRs (Obsolete - see MX)
ANY = 255 # A request for all records
# Construct reverse mapping dictionary
_typemap = {}
for _name in dir():
if not _name.startswith('_'):
_typemap[eval(_name)] = _name
def typestr (dnstype):
"""return string representation of DNS type"""
return _typemap.get(dnstype, repr(dnstype))

View file

@ -1,9 +0,0 @@
# -*- coding: iso-8859-1 -*-
#
# This file is part of the pydns project.
# Homepage: http://pydns.sourceforge.net
#
# This code is covered by the standard Python License.
#
"""Python DNS module"""

View file

@ -1,28 +0,0 @@
# -*- coding: iso-8859-1 -*-
#
# This file is part of the pydns project.
# Homepage: http://pydns.sourceforge.net
#
# This code is covered by the standard Python License.
#
"""routines for lazy people."""
import Base
def revlookup (name, config):
"convenience routine for doing a reverse lookup of an address"
a = name.split('.')
a.reverse()
b = '.'.join(a)+'.in-addr.arpa'
# this will only return one of any records returned.
return Base.DnsRequest(b, config, qtype='ptr').req().answers[0]['data']
def mxlookup (name, config):
"""
convenience routine for doing an MX lookup of a name. returns a
sorted list of (preference, mail exchanger) records
"""
a = Base.DnsRequest(name, config, qtype='mx').req().answers
l = map(lambda x:x['data'], a)
l.sort()
return l