Redirection refactoring part 2 of 2.

This commit is contained in:
Bastian Kleineidam 2011-04-27 13:33:01 +02:00
parent dd53c78096
commit 1f9cd2f67f

View file

@ -303,8 +303,8 @@ class HttpUrl (internpaturl.InternPatternUrl, proxysupport.ProxySupport):
num, response = self.follow_redirection(response, set_result, redirected)
if num == -1:
return num, response
redirected = urlparse.urljoin(self.urlparts)
tries += 1
redirected = urlparse.urlunsplit(self.urlparts)
tries += num
return tries, response
def follow_redirection (self, response, set_result, redirected):
@ -319,78 +319,20 @@ class HttpUrl (internpaturl.InternPatternUrl, proxysupport.ProxySupport):
redirected, is_idn = urlbase.url_norm(newurl)
log.debug(LOG_CHECK, "Norm redirected to %r", redirected)
urlparts = strformat.url_unicode_split(redirected)
if urlparts[0] not in ('ftp', 'http', 'https'):
# in case of changed scheme make new URL object
# For security reasons do not allow redirects to protocols
# other than HTTP, HTTPS or FTP.
if set_result:
self.add_warning(
_("Redirection to url `%(newurl)s' is not allowed.") %
{'newurl': redirected})
self.set_result(u"syntax OK")
if not self.check_redirection_scheme(redirected, urlparts, set_result):
return -1, response
if urlparts[1] != self.urlparts[1]:
# the URL domain changed
if self.recursion_level == 0 and urlparts[0] in ('http', 'https'):
# Add intern patterns for redirection of URLs given by the
# user for HTTP schemes
pat = internpaturl.get_intern_pattern(redirected)
log.debug(LOG_CHECK, "Add intern pattern %r", pat)
self.aggregate.config['internlinks'].append(get_link_pat(pat))
else:
# check extern filter again
self.set_extern(redirected)
if self.extern[0] and self.extern[1]:
if set_result:
self.check301status(response)
self.add_info(
_("The redirected URL is outside of the domain "
"filter, checked only syntax."))
self.set_result(u"filtered")
return -1, response
# check robots.txt allowance again
if not self.allows_robots(redirected):
if set_result:
self.add_warning(
_("Access to redirected URL denied by robots.txt, "
"checked only syntax."),
tag=WARN_HTTP_ROBOTS_DENIED)
self.set_result(u"syntax OK")
if not self.check_redirection_domain(redirected, urlparts,
set_result, response):
return -1, response
# see about recursive redirect
all_seen = [self.cache_url_key] + self.aliases
if redirected in all_seen:
if self.method == "HEAD" and self.method_get_allowed:
# Microsoft servers tend to recurse HEAD requests
# fall back to the original url and use GET
return self.max_redirects, response
recursion = all_seen + [redirected]
if set_result:
self.set_result(
_("recursive redirection encountered:\n %(urls)s") %
{"urls": "\n => ".join(recursion)}, valid=False)
if not self.check_redirection_robots(redirected, set_result):
return -1, response
if urlparts[0] in ('http', 'https'):
# remember redirected url as alias
self.aliases.append(redirected)
else:
# ftp scheme
assert urlparts[0] == 'ftp', 'Invalid redirection %r' % redirected
newobj = get_url_from(
redirected, self.recursion_level, self.aggregate,
parent_url=self.parent_url, base_ref=self.base_ref,
line=self.line, column=self.column, name=self.name)
if set_result:
self.add_warning(
_("Redirection to URL `%(newurl)s' with different scheme"
" found; the original URL was `%(url)s'.") %
{"url": self.url, "newurl": newobj.url},
tag=WARN_HTTP_WRONG_REDIRECT)
self.set_result(u"syntax OK")
# append new object to queue
self.aggregate.urlqueue.put(newobj)
# pretend to be finished and logged
num = self.check_redirection_recursion(redirected, set_result)
if num != 0:
return num, response
if not self.check_redirection_newscheme (redirected, urlparts, set_result):
return -1, response
# remember redirected url as alias
self.aliases.append(redirected)
# note: urlparts has to be a list
self.urlparts = urlparts
if set_result:
@ -403,6 +345,95 @@ class HttpUrl (internpaturl.InternPatternUrl, proxysupport.ProxySupport):
response = self._try_http_response()
return 1, response
def check_redirection_scheme (self, redirected, urlparts, set_result):
"""Return True if redirection scheme is ok, else False."""
if urlparts[0] in ('ftp', 'http', 'https'):
return True
# in case of changed scheme make new URL object
# For security reasons do not allow redirects to protocols
# other than HTTP, HTTPS or FTP.
if set_result:
self.add_warning(
_("Redirection to url `%(newurl)s' is not allowed.") %
{'newurl': redirected})
self.set_result(u"syntax OK")
return False
def check_redirection_domain (self, redirected, urlparts, set_result, response):
"""Return True if redirection domain is ok, else False."""
if urlparts[1] == self.urlparts[1]:
# URL domain did not change
return True
# URL domain changed
if self.recursion_level == 0 and urlparts[0] in ('http', 'https'):
# Add intern patterns for redirection of URLs given by the
# user for HTTP schemes.
pat = internpaturl.get_intern_pattern(redirected)
log.debug(LOG_CHECK, "Add intern pattern %r", pat)
self.aggregate.config['internlinks'].append(get_link_pat(pat))
return True
# check extern filter again
self.set_extern(redirected)
if self.extern[0] and self.extern[1]:
if set_result:
self.check301status(response)
self.add_info(_("The redirected URL is outside of the domain "
"filter, checked only syntax."))
self.set_result(u"filtered")
return False
return True
def check_redirection_robots (self, redirected, set_result):
"""Check robots.txt allowance for redirections. Return True if
allowed, else False."""
if self.allows_robots(redirected):
return True
if set_result:
self.add_warning(
_("Access to redirected URL denied by robots.txt, "
"checked only syntax."), tag=WARN_HTTP_ROBOTS_DENIED)
self.set_result(u"syntax OK")
return False
def check_redirection_recursion (self, redirected, set_result):
"""Check for recursive redirect. Return zero if no recursion
detected, max_redirects for recursion with HEAD request,
-1 otherwise."""
all_seen = [self.cache_url_key] + self.aliases
if redirected not in all_seen:
return 0
if self.method == "HEAD" and self.method_get_allowed:
# Microsoft servers tend to recurse HEAD requests
# fall back to the original url and use GET
return self.max_redirects
if set_result:
urls = "\n => ".join(all_seen + [redirected])
self.set_result(_("recursive redirection encountered:\n %(urls)s") %
{"urls": urls}, valid=False)
return -1
def check_redirection_newscheme (self, redirected, urlparts, set_result):
"""Check for HTTP(S)/FTP redirection. Return True for HTTP(S)
redirection, False for FTP."""
if urlparts[0] in ('http', 'https'):
return True
# ftp scheme
assert urlparts[0] == 'ftp', 'Invalid redirection %r' % redirected
newobj = get_url_from(
redirected, self.recursion_level, self.aggregate,
parent_url=self.parent_url, base_ref=self.base_ref,
line=self.line, column=self.column, name=self.name)
if set_result:
self.add_warning(
_("Redirection to URL `%(newurl)s' with different scheme"
" found; the original URL was `%(url)s'.") %
{"url": self.url, "newurl": newobj.url},
tag=WARN_HTTP_WRONG_REDIRECT)
self.set_result(u"syntax OK")
# append new object to queue
self.aggregate.urlqueue.put(newobj)
return False
def check301status (self, response):
"""If response page has been permanently moved add a warning."""
if response.status == 301 and not self.has301status: