mirror of
https://github.com/Hopiu/linkchecker.git
synced 2026-04-11 01:50:59 +00:00
Added parser plugins and the applies_to() function.
This commit is contained in:
parent
0f8ee234c3
commit
0ffdea2b8d
8 changed files with 76 additions and 61 deletions
|
|
@ -18,6 +18,7 @@
|
|||
Module for plugin management.
|
||||
"""
|
||||
from .. import loader, log, LOG_PLUGIN
|
||||
from ..decorators import notimplemented
|
||||
|
||||
|
||||
class _PluginBase(object):
|
||||
|
|
@ -28,6 +29,12 @@ class _PluginBase(object):
|
|||
"""Add plugin-specific configuration."""
|
||||
pass
|
||||
|
||||
@notimplemented
|
||||
def applies_to(self, url_data, **kwargs):
|
||||
"""See if url_data should be handled by this plugin."""
|
||||
pass
|
||||
|
||||
@notimplemented
|
||||
def check(self, url_data):
|
||||
"""Common check method run for all plugins."""
|
||||
pass
|
||||
|
|
@ -48,6 +55,11 @@ class _ContentPlugin(_PluginBase):
|
|||
pass
|
||||
|
||||
|
||||
class _ParserPlugin(_PluginBase):
|
||||
"""Plugins run for valid URLs to parse their contents."""
|
||||
pass
|
||||
|
||||
|
||||
def get_plugin_modules(folders, package='plugins',
|
||||
parentpackage='linkcheck.dummy'):
|
||||
"""Get plugin modules for given folders."""
|
||||
|
|
@ -60,7 +72,7 @@ def get_plugin_modules(folders, package='plugins',
|
|||
|
||||
def get_plugin_classes(modules):
|
||||
"""Get plugin classes for given modules."""
|
||||
classes = (_ConnectionPlugin, _ContentPlugin)
|
||||
classes = (_ConnectionPlugin, _ContentPlugin, _ParserPlugin)
|
||||
return loader.get_plugins(modules, classes)
|
||||
|
||||
|
||||
|
|
@ -71,6 +83,7 @@ class PluginManager(object):
|
|||
"""Load enabled plugins."""
|
||||
self.connection_plugins = []
|
||||
self.content_plugins = []
|
||||
self.parser_plugins = []
|
||||
folders = config["pluginfolders"]
|
||||
modules = get_plugin_modules(folders)
|
||||
self.load_modules(modules, config)
|
||||
|
|
@ -83,9 +96,14 @@ class PluginManager(object):
|
|||
if issubclass(pluginclass, _ConnectionPlugin):
|
||||
log.debug(LOG_PLUGIN, "Enable connection plugin %s", name)
|
||||
self.connection_plugins.append(pluginclass(config[name]))
|
||||
else:
|
||||
elif issubclass(pluginclass, _ContentPlugin):
|
||||
log.debug(LOG_PLUGIN, "Enable content plugin %s", name)
|
||||
self.content_plugins.append(pluginclass(config[name]))
|
||||
elif issubclass(pluginclass, _ParserPlugin):
|
||||
log.debug(LOG_PLUGIN, "Enable parser plugin %s", name)
|
||||
self.parser_plugins.append(pluginclass(config[name]))
|
||||
else:
|
||||
raise ValueError("Invalid plugin class %s" % pluginclass)
|
||||
|
||||
def run_connection_plugins(self, url_data):
|
||||
"""Run all connection plugins."""
|
||||
|
|
@ -95,9 +113,16 @@ class PluginManager(object):
|
|||
"""Run all content plugins."""
|
||||
run_plugins(self.content_plugins, url_data)
|
||||
|
||||
def run_parser_plugins(self, url_data, pagetype):
|
||||
"""Run parser plugins for given pagetype."""
|
||||
run_plugins(self.parser_plugins, url_data, stop_after_match=True, pagetype=pagetype)
|
||||
|
||||
def run_plugins(plugins, url_data):
|
||||
|
||||
def run_plugins(plugins, url_data, stop_after_match=False, **kwargs):
|
||||
"""Run the check(url_data) method of given plugins."""
|
||||
for plugin in plugins:
|
||||
log.debug(LOG_PLUGIN, "Run plugin %s", plugin.__class__.__name__)
|
||||
plugin.check(url_data)
|
||||
if plugin.applies_to(url_data, **kwargs):
|
||||
plugin.check(url_data)
|
||||
if stop_after_match:
|
||||
break
|
||||
|
|
|
|||
|
|
@ -26,14 +26,12 @@ from ..parser import find_links
|
|||
class AnchorCheck(_ContentPlugin):
|
||||
"""Checks validity of HTML anchors."""
|
||||
|
||||
def applies_to(self, url_data):
|
||||
"""Check for HTML anchor existence."""
|
||||
return url_data.is_html() and url_data.anchor
|
||||
|
||||
def check(self, url_data):
|
||||
"""Check content for invalid anchors."""
|
||||
if not url_data.is_html():
|
||||
# not an HTML page
|
||||
return
|
||||
if not url_data.anchor:
|
||||
# no anchor
|
||||
return
|
||||
log.debug(LOG_PLUGIN, "checking content for invalid anchors")
|
||||
# list of parsed anchors
|
||||
self.anchors = []
|
||||
|
|
|
|||
|
|
@ -28,13 +28,12 @@ class HttpHeaderInfo(_ConnectionPlugin):
|
|||
super(HttpHeaderInfo, self).__init__(config)
|
||||
self.prefixes = tuple(config["prefixes"])
|
||||
|
||||
def applies_to(self, url_data):
|
||||
"""Check for HTTP and prefix config."""
|
||||
return self.prefixes and url_data.is_http()
|
||||
|
||||
def check(self, url_data):
|
||||
"""Check content for invalid anchors."""
|
||||
if not url_data.is_http():
|
||||
# not an HTTP URL
|
||||
return
|
||||
if not self.prefixes:
|
||||
return
|
||||
headers = []
|
||||
for name, value in url_data.headers.items():
|
||||
if name.startswith(self.prefixes):
|
||||
|
|
|
|||
|
|
@ -30,15 +30,22 @@ class LocationInfo(_ConnectionPlugin):
|
|||
"""Adds the country and if possible city name of the URL host as info.
|
||||
Needs GeoIP or pygeoip and a local country or city lookup DB installed."""
|
||||
|
||||
def __init__(self, config):
|
||||
"""Check for geoip module."""
|
||||
if not geoip:
|
||||
log.warn(LOG_PLUGIN, "GeoIP or pygeoip not found for LocationInfo plugin.")
|
||||
super(LocationInfo, self).__init__(config)
|
||||
|
||||
def applies_to(self, url_data):
|
||||
"""Check for validity, host existence and geoip module."""
|
||||
return url_data.valid and url_data.host and geoip
|
||||
|
||||
def check(self, url_data):
|
||||
"""Try to ask GeoIP database for country info."""
|
||||
if not url_data.valid:
|
||||
return
|
||||
if url_data.host and geoip:
|
||||
location = get_location(url_data.host)
|
||||
if location:
|
||||
url_data.add_info(_("URL is located in %(location)s.") %
|
||||
{"location": _(location)})
|
||||
location = get_location(url_data.host)
|
||||
if location:
|
||||
url_data.add_info(_("URL is located in %(location)s.") %
|
||||
{"location": _(location)})
|
||||
|
||||
# It is unknown if the geoip library is already thread-safe, so
|
||||
# no risks should be taken here by using a lock.
|
||||
|
|
|
|||
|
|
@ -45,15 +45,12 @@ class RegexCheck(_ContentPlugin):
|
|||
except re.error as msg:
|
||||
log.warn(LOG_PLUGIN, "Invalid regex pattern %r: %s" % (pattern, msg))
|
||||
|
||||
def applies_to(self, url_data):
|
||||
"""Check for warningregex, extern flag and parseability."""
|
||||
return self.warningregex and not url_data.extern[0] and url_data.is_parseable()
|
||||
|
||||
def check(self, url_data):
|
||||
"""Check content."""
|
||||
if not self.warningregex:
|
||||
return
|
||||
if url_data.extern[0]:
|
||||
# only scan internal pages for warnings
|
||||
return
|
||||
if not url_data.is_parseable():
|
||||
return
|
||||
log.debug(LOG_PLUGIN, "checking content for warning regex")
|
||||
content = url_data.get_content()
|
||||
# add warnings for found matches, up to the maximum allowed number
|
||||
|
|
|
|||
|
|
@ -42,21 +42,16 @@ class SslCertificateCheck(_ConnectionPlugin):
|
|||
# do not check hosts multiple times
|
||||
self.checked_hosts = set()
|
||||
|
||||
def applies_to(self, url_data):
|
||||
"""Check validity, scheme, extern and url_connection."""
|
||||
return url_data.valid and url_data.scheme == 'https' and \
|
||||
not url_data.extern[0] and url_data.url_connection is not None
|
||||
|
||||
@synchronized(_lock)
|
||||
def check(self, url_data):
|
||||
"""Run all SSL certificate checks that have not yet been done.
|
||||
OpenSSL already checked the SSL notBefore and notAfter dates.
|
||||
"""
|
||||
if url_data.extern[0]:
|
||||
# only check internal pages
|
||||
return
|
||||
if not url_data.valid:
|
||||
return
|
||||
if url_data.url_connection is None:
|
||||
# not allowed to connect
|
||||
return
|
||||
if url_data.scheme != 'https':
|
||||
return
|
||||
host = url_data.urlparts[1]
|
||||
if host in self.checked_hosts:
|
||||
return
|
||||
|
|
|
|||
|
|
@ -53,14 +53,12 @@ class HtmlSyntaxCheck(_ContentPlugin):
|
|||
super(HtmlSyntaxCheck, self).__init__(config)
|
||||
self.timer = W3Timer()
|
||||
|
||||
def applies_to(self, url_data):
|
||||
"""Check for HTML and extern."""
|
||||
return url_data.is_html() and not url_data.extern[0]
|
||||
|
||||
def check(self, url_data):
|
||||
"""Check HTML syntax of given URL."""
|
||||
if url_data.extern[0]:
|
||||
# only check internal pages
|
||||
return
|
||||
if not url_data.is_html():
|
||||
# only check HTML pages
|
||||
return
|
||||
self.timer.check_w3_time()
|
||||
session = url_data.session
|
||||
try:
|
||||
|
|
@ -74,8 +72,7 @@ class HtmlSyntaxCheck(_ContentPlugin):
|
|||
except requests.exceptions.RequestException:
|
||||
pass # ignore service failures
|
||||
except Exception as msg:
|
||||
log.warn(LOG_PLUGIN,
|
||||
_("HTML syntax check plugin error: %(msg)s ") % {"msg": msg})
|
||||
log.warn(LOG_PLUGIN, _("HTML syntax check plugin error: %(msg)s ") % {"msg": msg})
|
||||
|
||||
|
||||
class CssSyntaxCheck(_ContentPlugin):
|
||||
|
|
@ -88,14 +85,12 @@ class CssSyntaxCheck(_ContentPlugin):
|
|||
super(CssSyntaxCheck, self).__init__(config)
|
||||
self.timer = W3Timer()
|
||||
|
||||
def applies_to(self, url_data):
|
||||
"""Check for CSS and extern."""
|
||||
return url_data.is_css() and not url_data.extern[0]
|
||||
|
||||
def check(self, url_data):
|
||||
"""Check CSS syntax of given URL."""
|
||||
if url_data.extern[0]:
|
||||
# only check internal pages
|
||||
return
|
||||
if not url_data.is_css():
|
||||
# only check CSS pages
|
||||
return
|
||||
self.timer.check_w3_time()
|
||||
session = url_data.session
|
||||
try:
|
||||
|
|
@ -114,8 +109,7 @@ class CssSyntaxCheck(_ContentPlugin):
|
|||
except requests.exceptions.RequestException:
|
||||
pass # ignore service failures
|
||||
except Exception as msg:
|
||||
log.warn(LOG_PLUGIN,
|
||||
_("CSS syntax check plugin error: %(msg)s ") % {"msg": msg})
|
||||
log.warn(LOG_PLUGIN, _("CSS syntax check plugin error: %(msg)s ") % {"msg": msg})
|
||||
|
||||
|
||||
def check_w3_errors (url_data, xml, w3type):
|
||||
|
|
|
|||
|
|
@ -33,15 +33,15 @@ class VirusCheck(_ContentPlugin):
|
|||
super(VirusCheck, self).__init__(config)
|
||||
# XXX read config
|
||||
self.clamav_conf = get_clamav_conf(canonical_clamav_conf())
|
||||
if not self.clamav_conf:
|
||||
log.warn(LOG_PLUGIN, "clamav daemon not found for VirusCheck plugin")
|
||||
|
||||
def applies_to(self, url_data):
|
||||
"""Check for clamav and extern."""
|
||||
return self.clamav_conf and not url_data.extern[0]
|
||||
|
||||
def check(self, url_data):
|
||||
"""Try to ask GeoIP database for country info."""
|
||||
if url_data.extern[0]:
|
||||
# only scan internal pages for viruses
|
||||
return
|
||||
if not self.clamav_conf:
|
||||
# No clamav available
|
||||
return
|
||||
data = url_data.get_content()
|
||||
infected, errors = scan(data, self.clamav_conf)
|
||||
if infected or errors:
|
||||
|
|
|
|||
Loading…
Reference in a new issue