linkchecker/linkcheck/plugins/viruscheck.py
Chris Mayo ac0967e251 Fix remaining flake8 violations in linkcheck/
linkcheck/better_exchook2.py:28:89: E501 line too long (90 > 88 characters)
linkcheck/better_exchook2.py:155:9: E722 do not use bare 'except'
linkcheck/better_exchook2.py:166:9: E722 do not use bare 'except'
linkcheck/better_exchook2.py:289:13: E741 ambiguous variable name 'l'
linkcheck/better_exchook2.py:299:9: E722 do not use bare 'except'
linkcheck/containers.py:48:13: E731 do not assign a lambda expression, use a def
linkcheck/ftpparse.py:123:89: E501 line too long (93 > 88 characters)
linkcheck/loader.py:46:47: E203 whitespace before ':'
linkcheck/logconf.py:45:29: E231 missing whitespace after ','
linkcheck/robotparser2.py:157:89: E501 line too long (95 > 88 characters)
linkcheck/robotparser2.py:182:89: E501 line too long (89 > 88 characters)
linkcheck/strformat.py:181:16: E203 whitespace before ':'
linkcheck/strformat.py:181:43: E203 whitespace before ':'
linkcheck/strformat.py:253:9: E731 do not assign a lambda expression, use a def
linkcheck/strformat.py:254:9: E731 do not assign a lambda expression, use a def
linkcheck/strformat.py:341:89: E501 line too long (111 > 88 characters)
linkcheck/url.py:102:32: E203 whitespace before ':'
linkcheck/url.py:277:5: E741 ambiguous variable name 'l'
linkcheck/url.py:402:5: E741 ambiguous variable name 'l'
linkcheck/checker/__init__.py:203:1: E402 module level import not at top of file
linkcheck/checker/fileurl.py:200:89: E501 line too long (103 > 88 characters)
linkcheck/checker/mailtourl.py:122:60: E203 whitespace before ':'
linkcheck/checker/mailtourl.py:157:89: E501 line too long (96 > 88 characters)
linkcheck/checker/mailtourl.py:190:89: E501 line too long (109 > 88 characters)
linkcheck/checker/mailtourl.py:200:89: E501 line too long (111 > 88 characters)
linkcheck/checker/mailtourl.py:249:89: E501 line too long (106 > 88 characters)
linkcheck/checker/unknownurl.py:226:23: W291 trailing whitespace
linkcheck/checker/urlbase.py:245:89: E501 line too long (101 > 88 characters)
linkcheck/configuration/confparse.py:236:89: E501 line too long (186 > 88 characters)
linkcheck/configuration/confparse.py:247:89: E501 line too long (111 > 88 characters)
linkcheck/configuration/__init__.py:164:9: E266 too many leading '#' for block comment
linkcheck/configuration/__init__.py:184:9: E266 too many leading '#' for block comment
linkcheck/configuration/__init__.py:190:9: E266 too many leading '#' for block comment
linkcheck/configuration/__init__.py:195:9: E266 too many leading '#' for block comment
linkcheck/configuration/__init__.py:198:9: E266 too many leading '#' for block comment
linkcheck/configuration/__init__.py:435:89: E501 line too long (90 > 88 characters)
linkcheck/director/aggregator.py:45:43: E231 missing whitespace after ','
linkcheck/director/aggregator.py:178:89: E501 line too long (106 > 88 characters)
linkcheck/logger/__init__.py:29:1: E731 do not assign a lambda expression, use a def
linkcheck/logger/__init__.py:108:13: E741 ambiguous variable name 'l'
linkcheck/logger/__init__.py:275:19: F821 undefined name '_'
linkcheck/logger/__init__.py:342:16: F821 undefined name '_'
linkcheck/logger/__init__.py:380:13: F821 undefined name '_'
linkcheck/logger/__init__.py:384:13: F821 undefined name '_'
linkcheck/logger/__init__.py:387:13: F821 undefined name '_'
linkcheck/logger/__init__.py:396:13: F821 undefined name '_'
linkcheck/network/__init__.py:1:1: W391 blank line at end of file
linkcheck/plugins/locationinfo.py:89:9: E731 do not assign a lambda expression, use a def
linkcheck/plugins/locationinfo.py:91:9: E731 do not assign a lambda expression, use a def
linkcheck/plugins/markdowncheck.py:112:89: E501 line too long (111 > 88 characters)
linkcheck/plugins/markdowncheck.py:141:9: E741 ambiguous variable name 'l'
linkcheck/plugins/markdowncheck.py:165:23: E203 whitespace before ':'
linkcheck/plugins/viruscheck.py:95:42: E203 whitespace before ':'
2020-05-30 17:01:36 +01:00

234 lines
7.7 KiB
Python

# Copyright (C) 2000-2014 Bastian Kleineidam
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
"""
Check page content for virus infection with clamav.
"""
import os
import socket
from . import _ContentPlugin
from .. import log, LOG_PLUGIN
from ..socketutil import create_socket
class VirusCheck(_ContentPlugin):
"""Checks the page content for virus infections with clamav.
A local clamav daemon must be installed."""
def __init__(self, config):
"""Initialize clamav configuration."""
super(VirusCheck, self).__init__(config)
# XXX read config
self.clamav_conf = get_clamav_conf(canonical_clamav_conf())
if not self.clamav_conf:
log.warn(LOG_PLUGIN, "clamav daemon not found for VirusCheck plugin")
def applies_to(self, url_data):
"""Check for clamav and extern."""
return self.clamav_conf and not url_data.extern[0]
def check(self, url_data):
"""Try to ask GeoIP database for country info."""
data = url_data.get_raw_content()
infected, errors = scan(data, self.clamav_conf)
if infected or errors:
for msg in infected:
url_data.add_warning("Virus scan infection: %s" % msg)
for msg in errors:
url_data.add_warning("Virus scan error: %s" % msg)
else:
url_data.add_info("No viruses in data found.")
@classmethod
def read_config(cls, configparser):
"""Read configuration file options."""
config = dict()
section = cls.__name__
option = "clamavconf"
if configparser.has_option(section, option):
value = configparser.get(section, option)
else:
value = None
config[option] = value
return config
class ClamavError(Exception):
"""Raised on clamav errors."""
pass
class ClamdScanner:
"""Virus scanner using a clamd daemon process."""
def __init__(self, clamav_conf):
"""Initialize clamd daemon process sockets."""
self.infected = []
self.errors = []
self.sock, self.host = clamav_conf.new_connection()
self.sock_rcvbuf = self.sock.getsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF)
self.wsock = self.new_scansock()
def new_scansock(self):
"""Return a connected socket for sending scan data to it."""
port = None
try:
self.sock.sendall(b"STREAM")
port = None
for dummy in range(60):
data = self.sock.recv(self.sock_rcvbuf)
i = data.find(b"PORT")
if i != -1:
port = int(data[i + 5:])
break
except socket.error:
self.sock.close()
raise
if port is None:
raise ClamavError(_("clamd is not ready for stream scanning"))
sockinfo = get_sockinfo(self.host, port=port)
wsock = create_socket(socket.AF_INET, socket.SOCK_STREAM)
try:
wsock.connect(sockinfo[0][4])
except socket.error:
wsock.close()
raise
return wsock
def scan(self, data):
"""Scan given data for viruses."""
self.wsock.sendall(data)
def close(self):
"""Get results and close clamd daemon sockets."""
self.wsock.close()
data = self.sock.recv(self.sock_rcvbuf)
while data:
if b"FOUND\n" in data:
self.infected.append(data.decode('UTF-8', 'replace'))
if b"ERROR\n" in data:
self.errors.append(data.decode('UTF-8', 'replace'))
data = self.sock.recv(self.sock_rcvbuf)
self.sock.close()
def canonical_clamav_conf():
"""Default clamav configs for various platforms."""
if os.name == 'posix':
clamavconf = "/etc/clamav/clamd.conf"
elif os.name == 'nt':
clamavconf = r"c:\clamav-devel\etc\clamd.conf"
else:
clamavconf = "clamd.conf"
return clamavconf
def get_clamav_conf(filename):
"""Initialize clamav configuration."""
if os.path.isfile(filename):
return ClamavConfig(filename)
log.warn(LOG_PLUGIN, "No ClamAV config file found at %r.", filename)
def get_sockinfo(host, port=None):
"""Return socket.getaddrinfo for given host and port."""
family, socktype = socket.AF_INET, socket.SOCK_STREAM
return socket.getaddrinfo(host, port, family, socktype)
class ClamavConfig(dict):
"""Clamav configuration wrapper, with clamd connection method."""
def __init__(self, filename):
"""Parse clamav configuration file."""
super(ClamavConfig, self).__init__()
self.parseconf(filename)
if self.get('ScannerDaemonOutputFormat'):
raise ClamavError(_("ScannerDaemonOutputFormat must be disabled"))
if self.get('TCPSocket') and self.get('LocalSocket'):
raise ClamavError(
_("only one of TCPSocket and LocalSocket must be enabled")
)
def parseconf(self, filename):
"""Parse clamav configuration from given file."""
with open(filename) as fd:
# yet another config format, sigh
for line in fd:
line = line.strip()
if not line or line.startswith("#"):
# ignore empty lines and comments
continue
split = line.split(None, 1)
if len(split) == 1:
self[split[0]] = True
else:
self[split[0]] = split[1]
def new_connection(self):
"""Connect to clamd for stream scanning.
@return: tuple (connected socket, host)
"""
if self.get('LocalSocket'):
host = 'localhost'
sock = self.create_local_socket()
elif self.get('TCPSocket'):
host = self.get('TCPAddr', 'localhost')
sock = self.create_tcp_socket(host)
else:
raise ClamavError(_("one of TCPSocket or LocalSocket must be enabled"))
return sock, host
def create_local_socket(self):
"""Create local socket, connect to it and return socket object."""
sock = create_socket(socket.AF_UNIX, socket.SOCK_STREAM)
addr = self['LocalSocket']
try:
sock.connect(addr)
except socket.error:
sock.close()
raise
return sock
def create_tcp_socket(self, host):
"""Create tcp socket, connect to it and return socket object."""
port = int(self['TCPSocket'])
sockinfo = get_sockinfo(host, port=port)
sock = create_socket(socket.AF_INET, socket.SOCK_STREAM)
try:
sock.connect(sockinfo[0][4])
except socket.error:
sock.close()
raise
return sock
def scan(data, clamconf):
"""Scan data for viruses.
@return (infection msgs, errors)
@rtype ([], [])
"""
try:
scanner = ClamdScanner(clamconf)
except socket.error:
errmsg = _("Could not connect to ClamAV daemon.")
return ([], [errmsg])
try:
scanner.scan(data)
finally:
scanner.close()
return scanner.infected, scanner.errors