Fix handling of user/password info in URLs.

This commit is contained in:
Bastian Kleineidam 2012-06-10 22:07:42 +02:00
parent 838095cbd5
commit f107092a8a
8 changed files with 72 additions and 58 deletions

View file

@ -4,6 +4,8 @@ Fixes:
- checking: Catch any errors initializing the MIME database.
Closes: SF bug #3528450
- checking: Fix writing temporary files.
- checking: Properly handle URLs with user/password information.
Closes: SF bug #3529812
Changes:
- checking: Ignore URLs from local PHP files with execution
@ -17,7 +19,8 @@ Changes:
Features:
- installation: Support RPM building with cx_Freeze.
- installation: Added .desktop files for POSIX systems.
- checking: Allow writing of a memory dump file.
- checking: Allow writing of a memory dump file to debug memory
problems.
7.8 "Gangster Exchange" (released 12.5.2012)

View file

@ -1,5 +1,5 @@
# -*- coding: iso-8859-1 -*-
# Copyright (C) 2000-2011 Bastian Kleineidam
# Copyright (C) 2000-2012 Bastian Kleineidam
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
@ -19,7 +19,6 @@ Handle FTP links.
"""
import ftplib
import urllib
from cStringIO import StringIO
from .. import log, LOG_CHECK, LinkCheckerError, fileutil
@ -71,34 +70,24 @@ class FtpUrl (internpaturl.InternPatternUrl, proxysupport.ProxySupport):
self.files = []
return None
def get_user_password (self):
"""
Get credentials to use for login.
"""
if self.userinfo:
return urllib.splitpasswd(self.userinfo)
return super(FtpUrl, self).get_user_password()
def login (self):
"""
Log into ftp server and check the welcome message.
"""
# ready to connect
_user, _password = self.get_user_password()
host = self.urlparts[1]
key = ("ftp", host, _user, _password)
key = ("ftp", self.host, self.port, _user, _password)
conn = self.aggregate.connections.get(key)
if conn is not None and conn.sock is not None:
# reuse cached FTP connection
self.url_connection = conn
return
self.aggregate.connections.wait_for_host(host)
self.aggregate.connections.wait_for_host(self.host)
try:
self.url_connection = ftplib.FTP()
if log.is_debug(LOG_CHECK):
self.url_connection.set_debuglevel(1)
host, port = proxysupport.parse_host_port(host)
self.url_connection.connect(host, port)
self.url_connection.connect(self.host, self.port)
if _user is None:
self.url_connection.login()
elif _password is None:

View file

@ -132,6 +132,7 @@ class HttpUrl (internpaturl.InternPatternUrl, proxysupport.ProxySupport):
"""
# set the proxy, so a 407 status after this is an error
self.set_proxy(self.aggregate.config["proxy"].get(self.scheme))
self.construct_auth()
# check robots.txt
if not self.allows_robots(self.url):
# remove all previously stored results
@ -249,11 +250,7 @@ class HttpUrl (internpaturl.InternPatternUrl, proxysupport.ProxySupport):
tag=WARN_HTTP_AUTH_UNKNOWN)
return
if not self.auth:
_user, _password = self.get_user_password()
self.auth = "Basic " + \
httputil.encode_base64("%s:%s" % (_user, _password))
log.debug(LOG_CHECK,
"Authentication %s/%s", _user, _password)
self.construct_auth()
continue
if (self.headers and self.method == "HEAD" and
self.method_get_allowed):
@ -275,6 +272,18 @@ class HttpUrl (internpaturl.InternPatternUrl, proxysupport.ProxySupport):
self.aliases = []
self.fallback_get = True
def construct_auth (self):
"""Construct HTTP Basic authentication credentials if there
is user/password information available. Does not overwrite if
credentials have already been constructed."""
if self.auth:
return
_user, _password = self.get_user_password()
if _user is not None and _password is not None:
credentials = httputil.encode_base64("%s:%s" % (_user, _password))
self.auth = "Basic " + credentials
log.debug(LOG_CHECK, "Using basic authentication")
def get_content_type (self):
"""Return content MIME type or empty string."""
if self.content_type is None:
@ -308,6 +317,7 @@ class HttpUrl (internpaturl.InternPatternUrl, proxysupport.ProxySupport):
self.add_info(_("Redirected to `%(url)s'.") % {'url': newurl})
# norm base url - can raise UnicodeError from url.idna_encode()
redirected, is_idn = urlbase.url_norm(newurl)
# XXX recalculate authentication information when available
log.debug(LOG_CHECK, "Norm redirected to %r", redirected)
urlparts = strformat.url_unicode_split(redirected)
if not self.check_redirection_scheme(redirected, urlparts, set_result):
@ -354,6 +364,7 @@ class HttpUrl (internpaturl.InternPatternUrl, proxysupport.ProxySupport):
def check_redirection_domain (self, redirected, urlparts, set_result, response):
"""Return True if redirection domain is ok, else False."""
# XXX does not support user:pass@netloc format
if urlparts[1] != self.urlparts[1]:
# URL domain changed
if self.recursion_level == 0 and urlparts[0] in ('http', 'https'):
@ -524,11 +535,8 @@ class HttpUrl (internpaturl.InternPatternUrl, proxysupport.ProxySupport):
skip_accept_encoding=True)
# be sure to use the original host as header even for proxies
self.url_connection.putheader("Host", self.urlparts[1])
# userinfo is from http://user@pass:host/
if self.userinfo:
self.url_connection.putheader("Authorization", self.userinfo)
# auth is the -u and -p configuration options
elif self.auth:
if self.auth:
# HTTP authorization
self.url_connection.putheader("Authorization", self.auth)
if self.proxyauth:
self.url_connection.putheader("Proxy-Authorization",

View file

@ -1,5 +1,5 @@
# -*- coding: iso-8859-1 -*-
# Copyright (C) 2000-2009 Bastian Kleineidam
# Copyright (C) 2000-2012 Bastian Kleineidam
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
@ -19,7 +19,6 @@ Handle telnet: links.
"""
import telnetlib
import urllib
from . import urlbase
from .. import log, LOG_CHECK
@ -38,11 +37,8 @@ class TelnetUrl (urlbase.UrlBase):
# default port
if self.port is None:
self.port = 23
# split user/pass
if self.userinfo:
self.user, self.password = urllib.splitpasswd(self.userinfo)
else:
self.user, self.password = self.get_user_password()
# set user/pass
self.user, self.password = self.get_user_password()
def local_check (self):
"""

View file

@ -929,6 +929,9 @@ class UrlBase (object):
"""Get tuple (user, password) from configured authentication.
Both user and password can be None.
"""
if self.userinfo:
# URL itself has authentication info
return urllib.splitpasswd(self.userinfo)
return self.aggregate.config.get_user_password(self.url)
def parse_html (self):

View file

@ -233,42 +233,47 @@ def idna_encode (host):
def url_fix_host (urlparts):
"""Unquote and fix hostname. Returns is_idn."""
urlparts[1], is_idn = idna_encode(urllib.unquote(urlparts[1]).lower())
if not urlparts[1]:
urlparts[2] = urllib.unquote(urlparts[2])
return False
userpass, netloc = urllib.splituser(urlparts[1])
if userpass:
userpass = urllib.unquote(userpass)
netloc, is_idn = idna_encode(urllib.unquote(netloc).lower())
# a leading backslash in path causes urlsplit() to add the
# path components up to the first slash to host
# try to find this case...
i = urlparts[1].find("\\")
i = netloc.find("\\")
if i != -1:
# ...and fix it by prepending the misplaced components to the path
comps = urlparts[1][i:] # note: still has leading backslash
comps = netloc[i:] # note: still has leading backslash
if not urlparts[2] or urlparts[2] == '/':
urlparts[2] = comps
else:
urlparts[2] = "%s%s" % (comps, urllib.unquote(urlparts[2]))
urlparts[1] = urlparts[1][:i]
netloc = netloc[:i]
else:
# a leading ? in path causes urlsplit() to add the query to the
# host name
i = urlparts[1].find("?")
i = netloc.find("?")
if i != -1:
urlparts[1], urlparts[3] = urlparts[1].split('?', 1)
netloc, urlparts[3] = netloc.split('?', 1)
# path
urlparts[2] = urllib.unquote(urlparts[2])
if urlparts[1]:
userpass, host = urllib.splituser(urlparts[1])
if userpass:
# append AT for easy concatenation
userpass += "@"
else:
userpass = ""
if urlparts[0] in default_ports:
dport = default_ports[urlparts[0]]
host, port = splitport(host, port=dport)
if host.endswith("."):
host = host[:-1]
if port != dport:
host = "%s:%d" % (host, port)
urlparts[1] = userpass+host
if userpass:
# append AT for easy concatenation
userpass += "@"
else:
userpass = ""
if urlparts[0] in default_ports:
dport = default_ports[urlparts[0]]
host, port = splitport(netloc, port=dport)
if host.endswith("."):
host = host[:-1]
if port != dport:
host = "%s:%d" % (host, port)
netloc = host
urlparts[1] = userpass+netloc
return is_idn

View file

@ -1,5 +1,5 @@
# -*- coding: iso-8859-1 -*-
# Copyright (C) 2004-2010 Bastian Kleineidam
# Copyright (C) 2004-2012 Bastian Kleineidam
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
@ -36,6 +36,16 @@ class TestFtp (FtpServerTest):
u"real url %s" % url,
u"valid",
]
# ftp use/password
user = "anonymous"
passwd = "Ftp"
url = u"ftp://%s:%s@%s:%d/" % (user, passwd, self.host, self.port)
resultlines = [
u"url %s" % url,
u"cache key %s" % url,
u"real url %s" % url,
u"valid",
]
self.direct(url, resultlines)
# ftp one slash
url = u"ftp:/%s:%d/" % (self.host, self.port)

View file

@ -372,13 +372,13 @@ class TestUrl (unittest.TestCase):
def test_norm_with_auth (self):
# Test norming of URLs with authentication tokens.
url = "telnet://user@www.example.org"
url = "telnet://User@www.example.org"
nurl = url
self.urlnormtest(url, nurl)
url = "telnet://user:pass@www.example.org"
url = "telnet://User:Pass@www.example.org"
nurl = url
self.urlnormtest(url, nurl)
url = "http://user:pass@www.example.org/"
url = "http://User:Pass@www.example.org/"
nurl = url
self.urlnormtest(url, nurl)