mirror of
https://github.com/Hopiu/linkchecker.git
synced 2026-05-23 13:45:49 +00:00
Display SSL info on redirects.
This commit is contained in:
parent
be59802569
commit
a8623bc0bc
4 changed files with 54 additions and 22 deletions
|
|
@ -14,6 +14,7 @@ Changes:
|
|||
- checking: Disable URL length warning for data: URLs.
|
||||
- checking: Do not warn about missing addresses on mailto links that have
|
||||
subjects.
|
||||
- checking: Display SSL certificate info even on redirects.
|
||||
- installation: Check requirement for Python requests >= 2.2.0.
|
||||
Closes: GH bug #478
|
||||
- logging: Display downloaded bytes.
|
||||
|
|
|
|||
|
|
@ -28,6 +28,7 @@ from ..HtmlParser import htmlsax
|
|||
from ..htmlutil import linkparse
|
||||
# import warnings
|
||||
from .const import WARN_HTTP_EMPTY_CONTENT
|
||||
from requests.sessions import REDIRECT_STATI
|
||||
|
||||
# assumed HTTP header encoding
|
||||
HEADER_ENCODING = "iso-8859-1"
|
||||
|
|
@ -49,6 +50,8 @@ class HttpUrl (internpaturl.InternPatternUrl, proxysupport.ProxySupport):
|
|||
# initialize check data
|
||||
self.headers = {}
|
||||
self.auth = None
|
||||
self.ssl_cipher = None
|
||||
self.ssl_cert = None
|
||||
|
||||
def allows_robots (self, url):
|
||||
"""
|
||||
|
|
@ -130,8 +133,8 @@ class HttpUrl (internpaturl.InternPatternUrl, proxysupport.ProxySupport):
|
|||
# check the http connection
|
||||
request = self.build_request()
|
||||
self.send_request(request)
|
||||
self._add_response_info()
|
||||
self.follow_redirections(request)
|
||||
self.charset = self.url_connection.apparent_encoding
|
||||
self.check_response()
|
||||
|
||||
def build_request(self):
|
||||
|
|
@ -174,10 +177,15 @@ class HttpUrl (internpaturl.InternPatternUrl, proxysupport.ProxySupport):
|
|||
log.debug(LOG_CHECK, "Send request with %s", kwargs)
|
||||
self.url_connection = self.session.send(request, **kwargs)
|
||||
self.headers = self.url_connection.headers
|
||||
if self.scheme == u'https':
|
||||
self.add_ssl_cipher_info()
|
||||
self._add_ssl_info()
|
||||
|
||||
def get_ssl_sock(self):
|
||||
def _add_response_info(self):
|
||||
"""Set info from established HTTP(S) connection."""
|
||||
self.charset = self.url_connection.apparent_encoding
|
||||
self.set_content_type()
|
||||
self.add_size_info()
|
||||
|
||||
def _get_ssl_sock(self):
|
||||
"""Get raw SSL socket."""
|
||||
assert self.scheme == u"https", self
|
||||
raw_connection = self.url_connection.raw._connection
|
||||
|
|
@ -187,12 +195,17 @@ class HttpUrl (internpaturl.InternPatternUrl, proxysupport.ProxySupport):
|
|||
raw_connection.connect()
|
||||
return raw_connection.sock
|
||||
|
||||
def add_ssl_cipher_info(self):
|
||||
def _add_ssl_info(self):
|
||||
"""Add SSL cipher info."""
|
||||
cipher_name, ssl_protocol, secret_bits = self.get_ssl_sock().cipher()
|
||||
msg = _(u"SSL cipher %(cipher)s, %(protocol)s.")
|
||||
attrs = dict(cipher=cipher_name, protocol=ssl_protocol)
|
||||
self.add_info(msg % attrs)
|
||||
if self.scheme == u'https':
|
||||
sock = self._get_ssl_sock()
|
||||
self.ssl_cipher = sock.cipher()
|
||||
log.debug(LOG_CHECK, "Got SSL cipher %s", self.ssl_cipher)
|
||||
self.ssl_cert = sock.getpeercert()
|
||||
log.debug(LOG_CHECK, "Got SSL certificate %s", self.ssl_cert)
|
||||
else:
|
||||
self.ssl_cipher = None
|
||||
self.ssl_cert = None
|
||||
|
||||
def construct_auth (self):
|
||||
"""Construct HTTP Basic authentication credentials if there
|
||||
|
|
@ -208,9 +221,17 @@ class HttpUrl (internpaturl.InternPatternUrl, proxysupport.ProxySupport):
|
|||
"""Return content MIME type or empty string."""
|
||||
self.content_type = headers.get_content_type(self.headers)
|
||||
|
||||
def is_redirect(self):
|
||||
"""Check if current response is a redirect."""
|
||||
return ('location' in self.headers and
|
||||
self.url_connection.status_code in REDIRECT_STATI)
|
||||
|
||||
def follow_redirections(self, request):
|
||||
"""Follow all redirections of http response."""
|
||||
log.debug(LOG_CHECK, "follow all redirections")
|
||||
if self.is_redirect():
|
||||
# run plugins for old connection
|
||||
self.aggregate.plugin_manager.run_connection_plugins(self)
|
||||
kwargs = dict(
|
||||
stream=True,
|
||||
)
|
||||
|
|
@ -220,14 +241,17 @@ class HttpUrl (internpaturl.InternPatternUrl, proxysupport.ProxySupport):
|
|||
log.debug(LOG_CHECK, "Redirected to %r", newurl)
|
||||
self.aliases.append(newurl)
|
||||
self.add_info(_("Redirected to `%(url)s'.") % {'url': newurl})
|
||||
urlparts = strformat.url_unicode_split(newurl)
|
||||
if response is not None:
|
||||
self.urlparts = urlparts
|
||||
self.urlparts = strformat.url_unicode_split(newurl)
|
||||
self.build_url_parts()
|
||||
self.url_connection = response
|
||||
self.headers = response.headers
|
||||
self.url = urlutil.urlunsplit(urlparts)
|
||||
self.scheme = urlparts[0].lower()
|
||||
self.url = urlutil.urlunsplit(self.urlparts)
|
||||
self.scheme = self.urlparts[0].lower()
|
||||
self._add_ssl_info()
|
||||
self._add_response_info()
|
||||
if self.is_redirect():
|
||||
# run plugins for old connection
|
||||
self.aggregate.plugin_manager.run_connection_plugins(self)
|
||||
|
||||
def getheader (self, name, default=None):
|
||||
"""Get decoded header value.
|
||||
|
|
|
|||
|
|
@ -20,7 +20,7 @@ Handle https links.
|
|||
import time
|
||||
import threading
|
||||
from . import _ConnectionPlugin
|
||||
from .. import log, LOG_PLUGIN, strformat, LinkCheckerError
|
||||
from .. import strformat, LinkCheckerError
|
||||
from ..decorators import synchronized
|
||||
|
||||
_lock = threading.Lock()
|
||||
|
|
@ -61,13 +61,20 @@ class SslCertificateCheck(_ConnectionPlugin):
|
|||
if host in self.checked_hosts:
|
||||
return
|
||||
self.checked_hosts.add(host)
|
||||
cert = url_data.get_ssl_sock().getpeercert()
|
||||
log.debug(LOG_PLUGIN, "Got SSL certificate %s", cert)
|
||||
if 'notAfter' in cert:
|
||||
cert = url_data.ssl_cert
|
||||
cipher_name, ssl_protocol, num_secret_bits = url_data.ssl_cipher
|
||||
msg = _(u"SSL cipher %(cipher)s, %(protocol)s.")
|
||||
attrs = dict(cipher=cipher_name, protocol=ssl_protocol)
|
||||
url_data.add_info(msg % attrs)
|
||||
config = url_data.aggregate.config
|
||||
if cert and 'notAfter' in cert:
|
||||
self.check_ssl_valid_date(url_data, cert)
|
||||
else:
|
||||
elif config['sslverify']:
|
||||
msg = _('certificate did not include "notAfter" information')
|
||||
url_data.add_warning(msg)
|
||||
else:
|
||||
msg = _('SSL verification is disabled; enable the sslverify option')
|
||||
url_data.add_warning(msg)
|
||||
|
||||
def check_ssl_valid_date(self, url_data, cert):
|
||||
"""Check if the certificate is still valid, or if configured check
|
||||
|
|
|
|||
|
|
@ -34,12 +34,12 @@ class TestHttps (LinkCheckTest):
|
|||
u"url %s" % url,
|
||||
u"cache key %s" % url,
|
||||
u"real url %s" % rurl,
|
||||
u"info SSL cipher RC4-SHA, TLSv1/SSLv3.",
|
||||
#u"info SSL cipher RC4-SHA, TLSv1/SSLv3.",
|
||||
u"info Redirected to `%s'." % rurl,
|
||||
u"valid",
|
||||
]
|
||||
confargs = dict(
|
||||
enabledplugins=['SslCertificateCheck'],
|
||||
SslCertificateCheck=dict(sslcertwarndays=10),
|
||||
#enabledplugins=['SslCertificateCheck'],
|
||||
#SslCertificateCheck=dict(sslcertwarndays=10),
|
||||
)
|
||||
self.direct(url, resultlines, recursionlevel=0, confargs=confargs)
|
||||
|
|
|
|||
Loading…
Reference in a new issue