mirror of
https://github.com/Hopiu/linkchecker.git
synced 2026-04-26 17:14:42 +00:00
Redirection refactoring part 1.
This commit is contained in:
parent
f566f98fe5
commit
dd53c78096
1 changed files with 101 additions and 96 deletions
|
|
@ -294,110 +294,115 @@ class HttpUrl (internpaturl.InternPatternUrl, proxysupport.ProxySupport):
|
|||
return self.content_type
|
||||
|
||||
def follow_redirections (self, response, set_result=True):
|
||||
"""Follow all redirections of http response.
|
||||
XXX split up this function; it is too big
|
||||
"""
|
||||
"""Follow all redirections of http response."""
|
||||
log.debug(LOG_CHECK, "follow all redirections")
|
||||
redirected = self.url
|
||||
tries = 0
|
||||
while response.status in [301, 302] and self.headers and \
|
||||
tries < self.max_redirects:
|
||||
newurl = self.headers.getheader("Location",
|
||||
self.headers.getheader("Uri", ""))
|
||||
# make new url absolute and unicode
|
||||
newurl = unicode_safe(newurl)
|
||||
newurl = urlparse.urljoin(redirected, newurl)
|
||||
log.debug(LOG_CHECK, "Redirected to %r", newurl)
|
||||
self.add_info(_("Redirected to `%(url)s'.") % {'url': newurl})
|
||||
# norm base url - can raise UnicodeError from url.idna_encode()
|
||||
redirected, is_idn = urlbase.url_norm(newurl)
|
||||
log.debug(LOG_CHECK, "Norm redirected to %r", redirected)
|
||||
urlparts = strformat.url_unicode_split(redirected)
|
||||
if urlparts[0] not in ('ftp', 'http', 'https'):
|
||||
# in case of changed scheme make new URL object
|
||||
# For security reasons do not allow redirects to protocols
|
||||
# other than HTTP, HTTPS or FTP.
|
||||
if set_result:
|
||||
self.add_warning(
|
||||
_("Redirection to url `%(newurl)s' is not allowed.") %
|
||||
{'newurl': redirected})
|
||||
self.set_result(u"syntax OK")
|
||||
return -1, response
|
||||
if urlparts[1] != self.urlparts[1]:
|
||||
# the URL domain changed
|
||||
if self.recursion_level == 0 and urlparts[0] in ('http', 'https'):
|
||||
# Add intern patterns for redirection of URLs given by the
|
||||
# user for HTTP schemes
|
||||
pat = internpaturl.get_intern_pattern(redirected)
|
||||
log.debug(LOG_CHECK, "Add intern pattern %r", pat)
|
||||
self.aggregate.config['internlinks'].append(get_link_pat(pat))
|
||||
else:
|
||||
# check extern filter again
|
||||
self.set_extern(redirected)
|
||||
if self.extern[0] and self.extern[1]:
|
||||
if set_result:
|
||||
self.check301status(response)
|
||||
self.add_info(
|
||||
_("The redirected URL is outside of the domain "
|
||||
"filter, checked only syntax."))
|
||||
self.set_result(u"filtered")
|
||||
return -1, response
|
||||
# check robots.txt allowance again
|
||||
if not self.allows_robots(redirected):
|
||||
if set_result:
|
||||
self.add_warning(
|
||||
_("Access to redirected URL denied by robots.txt, "
|
||||
"checked only syntax."),
|
||||
tag=WARN_HTTP_ROBOTS_DENIED)
|
||||
self.set_result(u"syntax OK")
|
||||
return -1, response
|
||||
# see about recursive redirect
|
||||
all_seen = [self.cache_url_key] + self.aliases
|
||||
if redirected in all_seen:
|
||||
if self.method == "HEAD" and self.method_get_allowed:
|
||||
# Microsoft servers tend to recurse HEAD requests
|
||||
# fall back to the original url and use GET
|
||||
return self.max_redirects, response
|
||||
recursion = all_seen + [redirected]
|
||||
if set_result:
|
||||
self.set_result(
|
||||
_("recursive redirection encountered:\n %(urls)s") %
|
||||
{"urls": "\n => ".join(recursion)}, valid=False)
|
||||
return -1, response
|
||||
if urlparts[0] in ('http', 'https'):
|
||||
# remember redirected url as alias
|
||||
self.aliases.append(redirected)
|
||||
else:
|
||||
# ftp scheme
|
||||
assert urlparts[0] == 'ftp', 'Invalid redirection %r' % redirected
|
||||
newobj = get_url_from(
|
||||
redirected, self.recursion_level, self.aggregate,
|
||||
parent_url=self.parent_url, base_ref=self.base_ref,
|
||||
line=self.line, column=self.column, name=self.name)
|
||||
if set_result:
|
||||
self.add_warning(
|
||||
_("Redirection to URL `%(newurl)s' with different scheme"
|
||||
" found; the original URL was `%(url)s'.") %
|
||||
{"url": self.url, "newurl": newobj.url},
|
||||
tag=WARN_HTTP_WRONG_REDIRECT)
|
||||
self.set_result(u"syntax OK")
|
||||
# append new object to queue
|
||||
self.aggregate.urlqueue.put(newobj)
|
||||
# pretend to be finished and logged
|
||||
return -1, response
|
||||
# note: urlparts has to be a list
|
||||
self.urlparts = urlparts
|
||||
if set_result:
|
||||
self.check301status(response)
|
||||
# check cache again on the changed URL
|
||||
if self.aggregate.urlqueue.checked_redirect(redirected, self):
|
||||
return -1, response
|
||||
# new response data
|
||||
response.close()
|
||||
response = self._try_http_response()
|
||||
num, response = self.follow_redirection(response, set_result, redirected)
|
||||
if num == -1:
|
||||
return num, response
|
||||
redirected = urlparse.urljoin(self.urlparts)
|
||||
tries += 1
|
||||
return tries, response
|
||||
|
||||
def follow_redirection (self, response, set_result, redirected):
|
||||
"""Follow one redirection of http response."""
|
||||
newurl = self.headers.getheader("Location",
|
||||
self.headers.getheader("Uri", ""))
|
||||
# make new url absolute and unicode
|
||||
newurl = urlparse.urljoin(redirected, unicode_safe(newurl))
|
||||
log.debug(LOG_CHECK, "Redirected to %r", newurl)
|
||||
self.add_info(_("Redirected to `%(url)s'.") % {'url': newurl})
|
||||
# norm base url - can raise UnicodeError from url.idna_encode()
|
||||
redirected, is_idn = urlbase.url_norm(newurl)
|
||||
log.debug(LOG_CHECK, "Norm redirected to %r", redirected)
|
||||
urlparts = strformat.url_unicode_split(redirected)
|
||||
if urlparts[0] not in ('ftp', 'http', 'https'):
|
||||
# in case of changed scheme make new URL object
|
||||
# For security reasons do not allow redirects to protocols
|
||||
# other than HTTP, HTTPS or FTP.
|
||||
if set_result:
|
||||
self.add_warning(
|
||||
_("Redirection to url `%(newurl)s' is not allowed.") %
|
||||
{'newurl': redirected})
|
||||
self.set_result(u"syntax OK")
|
||||
return -1, response
|
||||
if urlparts[1] != self.urlparts[1]:
|
||||
# the URL domain changed
|
||||
if self.recursion_level == 0 and urlparts[0] in ('http', 'https'):
|
||||
# Add intern patterns for redirection of URLs given by the
|
||||
# user for HTTP schemes
|
||||
pat = internpaturl.get_intern_pattern(redirected)
|
||||
log.debug(LOG_CHECK, "Add intern pattern %r", pat)
|
||||
self.aggregate.config['internlinks'].append(get_link_pat(pat))
|
||||
else:
|
||||
# check extern filter again
|
||||
self.set_extern(redirected)
|
||||
if self.extern[0] and self.extern[1]:
|
||||
if set_result:
|
||||
self.check301status(response)
|
||||
self.add_info(
|
||||
_("The redirected URL is outside of the domain "
|
||||
"filter, checked only syntax."))
|
||||
self.set_result(u"filtered")
|
||||
return -1, response
|
||||
# check robots.txt allowance again
|
||||
if not self.allows_robots(redirected):
|
||||
if set_result:
|
||||
self.add_warning(
|
||||
_("Access to redirected URL denied by robots.txt, "
|
||||
"checked only syntax."),
|
||||
tag=WARN_HTTP_ROBOTS_DENIED)
|
||||
self.set_result(u"syntax OK")
|
||||
return -1, response
|
||||
# see about recursive redirect
|
||||
all_seen = [self.cache_url_key] + self.aliases
|
||||
if redirected in all_seen:
|
||||
if self.method == "HEAD" and self.method_get_allowed:
|
||||
# Microsoft servers tend to recurse HEAD requests
|
||||
# fall back to the original url and use GET
|
||||
return self.max_redirects, response
|
||||
recursion = all_seen + [redirected]
|
||||
if set_result:
|
||||
self.set_result(
|
||||
_("recursive redirection encountered:\n %(urls)s") %
|
||||
{"urls": "\n => ".join(recursion)}, valid=False)
|
||||
return -1, response
|
||||
if urlparts[0] in ('http', 'https'):
|
||||
# remember redirected url as alias
|
||||
self.aliases.append(redirected)
|
||||
else:
|
||||
# ftp scheme
|
||||
assert urlparts[0] == 'ftp', 'Invalid redirection %r' % redirected
|
||||
newobj = get_url_from(
|
||||
redirected, self.recursion_level, self.aggregate,
|
||||
parent_url=self.parent_url, base_ref=self.base_ref,
|
||||
line=self.line, column=self.column, name=self.name)
|
||||
if set_result:
|
||||
self.add_warning(
|
||||
_("Redirection to URL `%(newurl)s' with different scheme"
|
||||
" found; the original URL was `%(url)s'.") %
|
||||
{"url": self.url, "newurl": newobj.url},
|
||||
tag=WARN_HTTP_WRONG_REDIRECT)
|
||||
self.set_result(u"syntax OK")
|
||||
# append new object to queue
|
||||
self.aggregate.urlqueue.put(newobj)
|
||||
# pretend to be finished and logged
|
||||
return -1, response
|
||||
# note: urlparts has to be a list
|
||||
self.urlparts = urlparts
|
||||
if set_result:
|
||||
self.check301status(response)
|
||||
# check cache again on the changed URL
|
||||
if self.aggregate.urlqueue.checked_redirect(redirected, self):
|
||||
return -1, response
|
||||
# new response data
|
||||
response.close()
|
||||
response = self._try_http_response()
|
||||
return 1, response
|
||||
|
||||
def check301status (self, response):
|
||||
"""If response page has been permanently moved add a warning."""
|
||||
if response.status == 301 and not self.has301status:
|
||||
|
|
|
|||
Loading…
Reference in a new issue