Remove unused code from network subpackage

This commit is contained in:
Chris Mayo 2019-10-18 19:50:41 +01:00
parent f73ba54a2a
commit c7a32d67fe
9 changed files with 1 additions and 527 deletions

View file

@ -54,7 +54,6 @@ clean:
-$(PYTHON) setup.py clean --all
rm -f $(LAPPNAME)-out.* *-stamp*
$(MAKE) -C linkcheck/HtmlParser clean
rm -f linkcheck/network/_network*.so
find . -name '*.py[co]' -exec rm -f {} \;
find . -name '*.bak' -exec rm -f {} \;
find . -depth -name '__pycache__' -exec rm -rf {} \;
@ -79,7 +78,6 @@ localbuild: MANIFEST locale
$(MAKE) -C linkcheck/HtmlParser
$(PYTHON) setup.py build
cp -f build/lib.$(PLATFORM)-$(PYVER)*/linkcheck/HtmlParser/htmlsax*.so linkcheck/HtmlParser
cp -f build/lib.$(PLATFORM)-$(PYVER)*/linkcheck/network/_network*.so linkcheck/network
release: distclean releasecheck filescheck
$(MAKE) dist sign register upload homepage tag changelog deb

View file

@ -1,194 +1 @@
# -*- coding: iso-8859-1 -*-
"""from http://twistedmatrix.com/wiki/python/IfConfig
"""
import socket
import sys
import errno
import array
import struct
import subprocess
from ._network import ifreq_size
from .. import log, LOG_CHECK
def pipecmd (cmd1, cmd2):
"""Return output of "cmd1 | cmd2"."""
p1 = subprocess.Popen(cmd1, stdout=subprocess.PIPE)
p2 = subprocess.Popen(cmd2, stdin=p1.stdout, stdout=subprocess.PIPE)
p1.stdout.close() # Allow p1 to receive a SIGPIPE if p2 exits.
return p2.communicate()[0]
def ifconfig_inet (iface):
"""Return parsed IPv4 info from ifconfig(8) binary."""
res = pipecmd(["ifconfig", iface], ["grep", "inet "])
info = {}
lastpart = None
for part in res.split():
# Linux systems have prefixes for each value
if part.startswith("addr:"):
info["address"] = part[5:]
elif part.startswith("Bcast:"):
info["broadcast"] = part[6:]
elif part.startswith("Mask:"):
info["netmask"] = part[5:]
elif lastpart == "inet":
info["address"] = part
elif lastpart in ("netmask", "broadcast"):
info[lastpart] = part
lastpart = part
return info
class IfConfig (object):
"""Access to socket interfaces"""
SIOCGIFNAME = 0x8910
SIOCGIFCONF = 0x8912
SIOCGIFFLAGS = 0x8913
SIOCGIFADDR = 0x8915
SIOCGIFBRDADDR = 0x8919
SIOCGIFNETMASK = 0x891b
SIOCGIFCOUNT = 0x8938
IFF_UP = 0x1 # Interface is up.
IFF_BROADCAST = 0x2 # Broadcast address valid.
IFF_DEBUG = 0x4 # Turn on debugging.
IFF_LOOPBACK = 0x8 # Is a loopback net.
IFF_POINTOPOINT = 0x10 # Interface is point-to-point link.
IFF_NOTRAILERS = 0x20 # Avoid use of trailers.
IFF_RUNNING = 0x40 # Resources allocated.
IFF_NOARP = 0x80 # No address resolution protocol.
IFF_PROMISC = 0x100 # Receive all packets.
IFF_ALLMULTI = 0x200 # Receive all multicast packets.
IFF_MASTER = 0x400 # Master of a load balancer.
IFF_SLAVE = 0x800 # Slave of a load balancer.
IFF_MULTICAST = 0x1000 # Supports multicast.
IFF_PORTSEL = 0x2000 # Can set media type.
IFF_AUTOMEDIA = 0x4000 # Auto media select active.
def __init__ (self):
"""Initialize a socket and determine ifreq structure size."""
# create a socket so we have a handle to query
self.sockfd = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# Note that sizeof(struct ifreq) is not always 32
# (eg. on *BSD, x86_64 Linux) Thus the function call.
self.ifr_size = ifreq_size()
def _ioctl (self, func, args):
"""Call ioctl() with given parameters."""
import fcntl
return fcntl.ioctl(self.sockfd.fileno(), func, args)
def _getifreq (self, ifname):
"""Return ifreq buffer for given interface name."""
return struct.pack("%ds" % self.ifr_size, ifname)
def _getaddr (self, ifname, func):
"""Get interface address."""
try:
result = self._ioctl(func, self._getifreq(ifname))
except IOError as msg:
log.warn(LOG_CHECK,
"error getting addr for interface %r: %s", ifname, msg)
return None
return socket.inet_ntoa(result[20:24])
def getInterfaceList (self, flags=0):
"""Get all interface names in a list."""
if sys.platform == 'darwin':
command = ['ifconfig', '-l']
if flags & self.IFF_UP:
command.append('-u')
# replace with subprocess.check_output() for Python 2.7
res = subprocess.Popen(command, stdout=subprocess.PIPE).communicate()[0]
return res.split()
# initial 8kB buffer to hold interface data
bufsize = 8192
# 80kB buffer should be enough for most boxen
max_bufsize = bufsize * 10
while True:
buf = array.array('b', b'\0' * bufsize)
ifreq = struct.pack("iP", buf.buffer_info()[1], buf.buffer_info()[0])
try:
result = self._ioctl(self.SIOCGIFCONF, ifreq)
break
except IOError as msg:
# in case of EINVAL the buffer size was too small
if msg[0] != errno.EINVAL or bufsize == max_bufsize:
raise
# increase buffer
bufsize += 8192
# loop over interface names
data = buf.tostring()
iflist = []
size, ptr = struct.unpack("iP", result)
i = 0
while i < size:
ifconf = data[i:i+self.ifr_size]
name = struct.unpack("16s%ds" % (self.ifr_size-16), ifconf)[0]
name = name.split(b'\0', 1)[0]
if name:
if flags and not (self.getFlags(name) & flags):
continue
iflist.append(name)
i += self.ifr_size
return iflist
def getFlags (self, ifname):
"""Get the flags for an interface"""
try:
result = self._ioctl(self.SIOCGIFFLAGS, self._getifreq(ifname))
except IOError as msg:
log.warn(LOG_CHECK,
"error getting flags for interface %r: %s", ifname, msg)
return 0
# extract the interface's flags from the return value
flags, = struct.unpack('H', result[16:18])
# return "UP" bit
return flags
def getAddr (self, ifname):
"""Get the inet addr for an interface.
@param ifname: interface name
@type ifname: string
"""
if sys.platform == 'darwin':
return ifconfig_inet(ifname).get('address')
return self._getaddr(ifname, self.SIOCGIFADDR)
def getMask (self, ifname):
"""Get the netmask for an interface.
@param ifname: interface name
@type ifname: string
"""
if sys.platform == 'darwin':
return ifconfig_inet(ifname).get('netmask')
return self._getaddr(ifname, self.SIOCGIFNETMASK)
def getBroadcast (self, ifname):
"""Get the broadcast addr for an interface.
@param ifname: interface name
@type ifname: string
"""
if sys.platform == 'darwin':
return ifconfig_inet(ifname).get('broadcast')
return self._getaddr(ifname, self.SIOCGIFBRDADDR)
def isUp (self, ifname):
"""Check whether interface is UP.
@param ifname: interface name
@type ifname: string
"""
return (self.getFlags(ifname) & self.IFF_UP) != 0
def isLoopback (self, ifname):
"""Check whether interface is a loopback device.
@param ifname: interface name
@type ifname: string
"""
# since not all systems have IFF_LOOPBACK as a flag defined,
# the ifname is tested first
if ifname.startswith('lo'):
return True
return (self.getFlags(ifname) & self.IFF_LOOPBACK) != 0

View file

@ -1,75 +0,0 @@
/* Copyright (C) 2000-2014 Bastian Kleineidam
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License along
with this program; if not, write to the Free Software Foundation, Inc.,
51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
#include "Python.h"
#ifndef _WIN32
#include <sys/ioctl.h>
#include <sys/socket.h>
#include <net/route.h>
#include <net/if.h>
#endif
/* Python 2/3 compatibility */
#if PY_MAJOR_VERSION >= 3
#define MOD_ERROR_VAL NULL
#define MOD_SUCCESS_VAL(val) val
#define MOD_INIT(name) PyMODINIT_FUNC PyInit_##name(void)
#define MOD_DEF(ob, name, doc, methods) \
static struct PyModuleDef moduledef = { \
PyModuleDef_HEAD_INIT, name, doc, -1, methods, }; \
ob = PyModule_Create(&moduledef)
#else
#define MOD_ERROR_VAL
#define MOD_SUCCESS_VAL(val)
#define MOD_INIT(name) void init##name(void)
#define MOD_DEF(ob, name, doc, methods) \
ob = Py_InitModule3(name, methods, doc)
#endif
/* The struct ifreq size varies on different platforms, so we need
this helper function to determine the size of it.
On Windows platforms this function returns zero.
*/
static PyObject* network_ifreq_size (PyObject* self, PyObject* args)
{
if (!PyArg_ParseTuple(args, ""))
return NULL;
return Py_BuildValue("i",
#ifdef _WIN32
0
#else
sizeof(struct ifreq)
#endif
);
}
static PyMethodDef module_functions[] = {
{"ifreq_size", network_ifreq_size, METH_VARARGS,
"Return sizeof(struct ifreq)."},
{NULL, NULL, 0, NULL} /* Sentinel */
};
MOD_INIT(_network) {
PyObject *m;
MOD_DEF(m, "_network", "network helper routines", module_functions);
if (m == NULL) {
return MOD_ERROR_VAL;
}
return MOD_SUCCESS_VAL(m);
}

View file

@ -20,22 +20,15 @@ Ip number related utility functions.
import re
import socket
import struct
from .. import log, LOG_CHECK
# IP Adress regular expressions
# Note that each IPv4 octet can be encoded in dezimal, hexadezimal and octal.
_ipv4_num = r"\d{1,3}"
_ipv4_hex = r"0*[\da-f]{1,2}"
_ipv4_oct = r"0+[0-7]{0, 3}"
# XXX
_ipv4_num_4 = r"%s\.%s\.%s\.%s" % ((_ipv4_num,) * 4)
_ipv4_re = re.compile(r"^%s$" % _ipv4_num_4)
_ipv4_hex_4 = r"%s\.%s\.%s\.%s" % ((_ipv4_hex,) * 4)
# IPv4 encoded in octal, eg. 0x42.0x66.0x0d.0x63
_ipv4_oct = r"0*[0-7]{1,3}"
_ipv4_hex_4 = r"%s\.%s\.%s\.%s" % ((_ipv4_hex,) * 4)
# IPv6; See also rfc2373
@ -48,50 +41,6 @@ _ipv6_abbr_re = re.compile(r"^((%s:){0,6}%s)?::((%s:){0,6}%s)?$" % \
_ipv6_ipv4_abbr_re = re.compile(r"^((%s:){0,4}%s)?::((%s:){0,5})?" % \
((_ipv6_num,) * 3) + \
"%s$" % _ipv4_num_4)
# netmask regex
_host_netmask_re = re.compile(r"^%s/%s$" % (_ipv4_num_4, _ipv4_num_4))
_host_cidrmask_re = re.compile(r"^%s/\d{1,2}$" % _ipv4_num_4)
def expand_ipv6 (ip, num):
"""
Expand an IPv6 address with included :: to num octets.
@raise: ValueError on invalid IP addresses
"""
i = ip.find("::")
prefix = ip[:i]
suffix = ip[i + 2:]
count = prefix.count(":") + suffix.count(":")
if prefix:
count += 1
prefix = prefix + ":"
if suffix:
count += 1
suffix = ":" + suffix
if count >= num:
raise ValueError("invalid ipv6 number: %s" % ip)
fill = (num - count - 1) * "0:" + "0"
return prefix + fill + suffix
def expand_ip (ip):
"""
ipv6 addresses are expanded to full 8 octets, all other
addresses are left untouched
return a tuple (ip, num) where num==1 if ip is a numeric ip, 0
otherwise.
"""
if _ipv4_re.match(ip) or \
_ipv6_re.match(ip) or \
_ipv6_ipv4_re.match(ip):
return (ip, 1)
if _ipv6_abbr_re.match(ip):
return (expand_ipv6(ip, 8), 1)
if _ipv6_ipv4_abbr_re.match(ip):
i = ip.rfind(":") + 1
return (expand_ipv6(ip[:i], 6) + ip[i:], 1)
return (ip, 0)
def is_valid_ip (ip):
@ -122,145 +71,6 @@ def is_valid_ipv6 (ip):
return True
def is_valid_cidrmask (mask):
"""
Check if given mask is a valid network bitmask in CIDR notation.
"""
return 0 <= mask <= 32
def dq2num (ip):
"""
Convert decimal dotted quad string to long integer.
"""
return struct.unpack('!L', socket.inet_aton(ip))[0]
def num2dq (n):
"""
Convert long int to dotted quad string.
"""
return socket.inet_ntoa(struct.pack('!L', n))
def cidr2mask (n):
"""
Return a mask where the n left-most of 32 bits are set.
"""
return ((1 << n) - 1) << (32 - n)
def netmask2mask (ip):
"""
Return a mask of bits as a long integer.
"""
return dq2num(ip)
def mask2netmask (mask):
"""
Return dotted quad string as netmask.
"""
return num2dq(mask)
def dq2net (ip, mask):
"""
Return a tuple (network ip, network mask) for given ip and mask.
"""
return dq2num(ip) & mask
def dq_in_net (n, mask):
"""
Return True iff numerical ip n is in given network.
"""
return (n & mask) == mask
def host_in_set (ip, hosts, nets):
"""
Return True if given ip is in host or network list.
"""
if ip in hosts:
return True
if is_valid_ipv4(ip):
n = dq2num(ip)
for net in nets:
if dq_in_net(n, net):
return True
return False
def strhosts2map (strhosts):
"""
Convert a string representation of hosts and networks to
a tuple (hosts, networks).
"""
return hosts2map([s.strip() for s in strhosts.split(",") if s])
def hosts2map (hosts):
"""
Return a set of named hosts, and a list of subnets (host/netmask
adresses).
Only IPv4 host/netmasks are supported.
"""
hostset = set()
nets = []
for host in hosts:
if _host_cidrmask_re.match(host):
host, mask = host.split("/")
mask = int(mask)
if not is_valid_cidrmask(mask):
log.error(LOG_CHECK,
"CIDR mask %d is not a valid network mask", mask)
continue
if not is_valid_ipv4(host):
log.error(LOG_CHECK, "host %r is not a valid ip address", host)
continue
nets.append(dq2net(host, cidr2mask(mask)))
elif _host_netmask_re.match(host):
host, mask = host.split("/")
if not is_valid_ipv4(host):
log.error(LOG_CHECK, "host %r is not a valid ip address", host)
continue
if not is_valid_ipv4(mask):
log.error(LOG_CHECK,
"mask %r is not a valid ip network mask", mask)
continue
nets.append(dq2net(host, netmask2mask(mask)))
elif is_valid_ip(host):
hostset.add(expand_ip(host)[0])
else:
hostset |= set(resolve_host(host))
return (hostset, nets)
def map2hosts (hostmap):
"""
Convert a tuple (hosts, networks) into a host/network list
suitable for storing in a config file.
"""
ret = hostmap[0].copy()
for net, mask in hostmap[1]:
ret.add("%s/%d" % (num2dq(net), mask2netmask(mask)))
return ret
def lookup_ips (ips):
"""
Return set of host names that resolve to given ips.
"""
hosts = set()
for ip in ips:
try:
hosts.add(socket.gethostbyaddr(ip)[0])
except socket.error:
hosts.add(ip)
return hosts
def resolve_host (host):
"""
@host: hostname or IP address
@ -278,18 +88,4 @@ def resolve_host (host):
return ips
def obfuscate_ip(ip):
"""Obfuscate given host in IP form.
@ip: IPv4 address string
@return: hexadecimal IP string ('0x1ab...')
@raise: ValueError on invalid IP addresses
"""
if is_valid_ipv4(ip):
res = "0x%s" % "".join(hex(int(x))[2:] for x in ip.split("."))
else:
raise ValueError('Invalid IP value %r' % ip)
assert is_obfuscated_ip(res), '%r obfuscation error' % res
return res
is_obfuscated_ip = re.compile(r"^(0x[a-f0-9]+|[0-9]+)$").match

View file

@ -479,14 +479,6 @@ args = dict(
define_macros = define_macros + [('YY_NO_INPUT', None)],
include_dirs = include_dirs + [normpath("linkcheck/HtmlParser")],
),
Extension("linkcheck.network._network",
sources = ["linkcheck/network/_network.c"],
extra_compile_args = extra_compile_args,
library_dirs = library_dirs,
libraries = libraries,
define_macros = define_macros,
include_dirs = include_dirs,
),
],
scripts = scripts,
data_files = data_files,

View file

@ -17,11 +17,8 @@
"""
Test http checking.
"""
import os
import sys
from .httpserver import HttpServerTest
from tests import need_network
from linkcheck.network import iputil
class TestHttpMisc (HttpServerTest):
"""Test http:// misc link checking."""
@ -29,7 +26,6 @@ class TestHttpMisc (HttpServerTest):
@need_network
def test_html (self):
self.swf_test()
self.obfuscate_test()
def swf_test (self):
url = self.get_url(u"test.swf")
@ -44,25 +40,3 @@ class TestHttpMisc (HttpServerTest):
u"valid",
]
self.direct(url, resultlines, recursionlevel=1)
def obfuscate_test (self):
if os.name != "posix" or sys.platform != 'linux2':
return
host = "www.heise.de"
for tentative_ip in iputil.resolve_host(host):
if iputil.is_valid_ipv4(tentative_ip):
ip = tentative_ip
break
else:
raise NotImplementedError("no IPv4 address detected for test host")
url = u"http://%s/" % iputil.obfuscate_ip(ip)
rurl = u"http://%s/" % ip
resultlines = [
u"url %s" % url,
u"cache key %s" % rurl,
u"real url https://%s/" % host,
u"info Redirected to `https://%s/'." % host,
u"warning URL %s has obfuscated IP address %s" % (url, ip),
u"valid",
]
self.direct(url, resultlines, recursionlevel=0)

View file

@ -19,23 +19,13 @@ Test network functions.
"""
import unittest
from tests import need_posix, need_network, need_linux
import linkcheck.network
from tests import need_network, need_linux
from linkcheck.network import iputil
class TestNetwork (unittest.TestCase):
"""Test network functions."""
@need_posix
def test_ifreq_size (self):
self.assertTrue(linkcheck.network.ifreq_size() > 0)
@need_posix
def test_interfaces (self):
ifc = linkcheck.network.IfConfig()
ifc.getInterfaceList()
@need_network
@need_linux
def test_iputils (self):
@ -43,9 +33,3 @@ class TestNetwork (unittest.TestCase):
host = "dinsdale.python.org"
ips = iputil.resolve_host(host)
self.assertTrue(len(ips) > 0)
for ip in ips:
if iputil.is_valid_ipv4(ip):
obfuscated = iputil.obfuscate_ip(ip)
self.assertTrue(iputil.is_obfuscated_ip(obfuscated))
hosts = iputil.lookup_ips([obfuscated])
self.assertTrue(host in hosts)

View file

@ -40,6 +40,5 @@ if defined MSSdk (
%PYDIR%\python.exe setup.py build %COMPILER%
:: copy .pyd files to start linkchecker in local directory
copy build\lib.%PLATFORM%-%PYVER%\linkcheck\HtmlParser\htmlsax.pyd linkcheck\HtmlParser
copy build\lib.%PLATFORM%-%PYVER%\linkcheck\network\_network.pyd linkcheck\network
:finish

View file

@ -17,6 +17,5 @@
set PYDIR=C:\Python27
%PYDIR%\python.exe setup.py clean --all
del linkcheck\HtmlParser\htmlsax.pyd
del linkcheck\network\_network.pyd
del doc\html\lccollection.qhc
del doc\html\lcdoc.qch