mirror of
https://github.com/Hopiu/linkchecker.git
synced 2026-03-16 22:10:26 +00:00
Fix handling of user/password info in URLs.
This commit is contained in:
parent
838095cbd5
commit
f107092a8a
8 changed files with 72 additions and 58 deletions
|
|
@ -4,6 +4,8 @@ Fixes:
|
|||
- checking: Catch any errors initializing the MIME database.
|
||||
Closes: SF bug #3528450
|
||||
- checking: Fix writing temporary files.
|
||||
- checking: Properly handle URLs with user/password information.
|
||||
Closes: SF bug #3529812
|
||||
|
||||
Changes:
|
||||
- checking: Ignore URLs from local PHP files with execution
|
||||
|
|
@ -17,7 +19,8 @@ Changes:
|
|||
Features:
|
||||
- installation: Support RPM building with cx_Freeze.
|
||||
- installation: Added .desktop files for POSIX systems.
|
||||
- checking: Allow writing of a memory dump file.
|
||||
- checking: Allow writing of a memory dump file to debug memory
|
||||
problems.
|
||||
|
||||
|
||||
7.8 "Gangster Exchange" (released 12.5.2012)
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
# -*- coding: iso-8859-1 -*-
|
||||
# Copyright (C) 2000-2011 Bastian Kleineidam
|
||||
# Copyright (C) 2000-2012 Bastian Kleineidam
|
||||
#
|
||||
# This program is free software; you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
|
|
@ -19,7 +19,6 @@ Handle FTP links.
|
|||
"""
|
||||
|
||||
import ftplib
|
||||
import urllib
|
||||
from cStringIO import StringIO
|
||||
|
||||
from .. import log, LOG_CHECK, LinkCheckerError, fileutil
|
||||
|
|
@ -71,34 +70,24 @@ class FtpUrl (internpaturl.InternPatternUrl, proxysupport.ProxySupport):
|
|||
self.files = []
|
||||
return None
|
||||
|
||||
def get_user_password (self):
|
||||
"""
|
||||
Get credentials to use for login.
|
||||
"""
|
||||
if self.userinfo:
|
||||
return urllib.splitpasswd(self.userinfo)
|
||||
return super(FtpUrl, self).get_user_password()
|
||||
|
||||
def login (self):
|
||||
"""
|
||||
Log into ftp server and check the welcome message.
|
||||
"""
|
||||
# ready to connect
|
||||
_user, _password = self.get_user_password()
|
||||
host = self.urlparts[1]
|
||||
key = ("ftp", host, _user, _password)
|
||||
key = ("ftp", self.host, self.port, _user, _password)
|
||||
conn = self.aggregate.connections.get(key)
|
||||
if conn is not None and conn.sock is not None:
|
||||
# reuse cached FTP connection
|
||||
self.url_connection = conn
|
||||
return
|
||||
self.aggregate.connections.wait_for_host(host)
|
||||
self.aggregate.connections.wait_for_host(self.host)
|
||||
try:
|
||||
self.url_connection = ftplib.FTP()
|
||||
if log.is_debug(LOG_CHECK):
|
||||
self.url_connection.set_debuglevel(1)
|
||||
host, port = proxysupport.parse_host_port(host)
|
||||
self.url_connection.connect(host, port)
|
||||
self.url_connection.connect(self.host, self.port)
|
||||
if _user is None:
|
||||
self.url_connection.login()
|
||||
elif _password is None:
|
||||
|
|
|
|||
|
|
@ -132,6 +132,7 @@ class HttpUrl (internpaturl.InternPatternUrl, proxysupport.ProxySupport):
|
|||
"""
|
||||
# set the proxy, so a 407 status after this is an error
|
||||
self.set_proxy(self.aggregate.config["proxy"].get(self.scheme))
|
||||
self.construct_auth()
|
||||
# check robots.txt
|
||||
if not self.allows_robots(self.url):
|
||||
# remove all previously stored results
|
||||
|
|
@ -249,11 +250,7 @@ class HttpUrl (internpaturl.InternPatternUrl, proxysupport.ProxySupport):
|
|||
tag=WARN_HTTP_AUTH_UNKNOWN)
|
||||
return
|
||||
if not self.auth:
|
||||
_user, _password = self.get_user_password()
|
||||
self.auth = "Basic " + \
|
||||
httputil.encode_base64("%s:%s" % (_user, _password))
|
||||
log.debug(LOG_CHECK,
|
||||
"Authentication %s/%s", _user, _password)
|
||||
self.construct_auth()
|
||||
continue
|
||||
if (self.headers and self.method == "HEAD" and
|
||||
self.method_get_allowed):
|
||||
|
|
@ -275,6 +272,18 @@ class HttpUrl (internpaturl.InternPatternUrl, proxysupport.ProxySupport):
|
|||
self.aliases = []
|
||||
self.fallback_get = True
|
||||
|
||||
def construct_auth (self):
|
||||
"""Construct HTTP Basic authentication credentials if there
|
||||
is user/password information available. Does not overwrite if
|
||||
credentials have already been constructed."""
|
||||
if self.auth:
|
||||
return
|
||||
_user, _password = self.get_user_password()
|
||||
if _user is not None and _password is not None:
|
||||
credentials = httputil.encode_base64("%s:%s" % (_user, _password))
|
||||
self.auth = "Basic " + credentials
|
||||
log.debug(LOG_CHECK, "Using basic authentication")
|
||||
|
||||
def get_content_type (self):
|
||||
"""Return content MIME type or empty string."""
|
||||
if self.content_type is None:
|
||||
|
|
@ -308,6 +317,7 @@ class HttpUrl (internpaturl.InternPatternUrl, proxysupport.ProxySupport):
|
|||
self.add_info(_("Redirected to `%(url)s'.") % {'url': newurl})
|
||||
# norm base url - can raise UnicodeError from url.idna_encode()
|
||||
redirected, is_idn = urlbase.url_norm(newurl)
|
||||
# XXX recalculate authentication information when available
|
||||
log.debug(LOG_CHECK, "Norm redirected to %r", redirected)
|
||||
urlparts = strformat.url_unicode_split(redirected)
|
||||
if not self.check_redirection_scheme(redirected, urlparts, set_result):
|
||||
|
|
@ -354,6 +364,7 @@ class HttpUrl (internpaturl.InternPatternUrl, proxysupport.ProxySupport):
|
|||
|
||||
def check_redirection_domain (self, redirected, urlparts, set_result, response):
|
||||
"""Return True if redirection domain is ok, else False."""
|
||||
# XXX does not support user:pass@netloc format
|
||||
if urlparts[1] != self.urlparts[1]:
|
||||
# URL domain changed
|
||||
if self.recursion_level == 0 and urlparts[0] in ('http', 'https'):
|
||||
|
|
@ -524,11 +535,8 @@ class HttpUrl (internpaturl.InternPatternUrl, proxysupport.ProxySupport):
|
|||
skip_accept_encoding=True)
|
||||
# be sure to use the original host as header even for proxies
|
||||
self.url_connection.putheader("Host", self.urlparts[1])
|
||||
# userinfo is from http://user@pass:host/
|
||||
if self.userinfo:
|
||||
self.url_connection.putheader("Authorization", self.userinfo)
|
||||
# auth is the -u and -p configuration options
|
||||
elif self.auth:
|
||||
if self.auth:
|
||||
# HTTP authorization
|
||||
self.url_connection.putheader("Authorization", self.auth)
|
||||
if self.proxyauth:
|
||||
self.url_connection.putheader("Proxy-Authorization",
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
# -*- coding: iso-8859-1 -*-
|
||||
# Copyright (C) 2000-2009 Bastian Kleineidam
|
||||
# Copyright (C) 2000-2012 Bastian Kleineidam
|
||||
#
|
||||
# This program is free software; you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
|
|
@ -19,7 +19,6 @@ Handle telnet: links.
|
|||
"""
|
||||
|
||||
import telnetlib
|
||||
import urllib
|
||||
from . import urlbase
|
||||
from .. import log, LOG_CHECK
|
||||
|
||||
|
|
@ -38,11 +37,8 @@ class TelnetUrl (urlbase.UrlBase):
|
|||
# default port
|
||||
if self.port is None:
|
||||
self.port = 23
|
||||
# split user/pass
|
||||
if self.userinfo:
|
||||
self.user, self.password = urllib.splitpasswd(self.userinfo)
|
||||
else:
|
||||
self.user, self.password = self.get_user_password()
|
||||
# set user/pass
|
||||
self.user, self.password = self.get_user_password()
|
||||
|
||||
def local_check (self):
|
||||
"""
|
||||
|
|
|
|||
|
|
@ -929,6 +929,9 @@ class UrlBase (object):
|
|||
"""Get tuple (user, password) from configured authentication.
|
||||
Both user and password can be None.
|
||||
"""
|
||||
if self.userinfo:
|
||||
# URL itself has authentication info
|
||||
return urllib.splitpasswd(self.userinfo)
|
||||
return self.aggregate.config.get_user_password(self.url)
|
||||
|
||||
def parse_html (self):
|
||||
|
|
|
|||
|
|
@ -233,42 +233,47 @@ def idna_encode (host):
|
|||
|
||||
def url_fix_host (urlparts):
|
||||
"""Unquote and fix hostname. Returns is_idn."""
|
||||
urlparts[1], is_idn = idna_encode(urllib.unquote(urlparts[1]).lower())
|
||||
if not urlparts[1]:
|
||||
urlparts[2] = urllib.unquote(urlparts[2])
|
||||
return False
|
||||
userpass, netloc = urllib.splituser(urlparts[1])
|
||||
if userpass:
|
||||
userpass = urllib.unquote(userpass)
|
||||
netloc, is_idn = idna_encode(urllib.unquote(netloc).lower())
|
||||
# a leading backslash in path causes urlsplit() to add the
|
||||
# path components up to the first slash to host
|
||||
# try to find this case...
|
||||
i = urlparts[1].find("\\")
|
||||
i = netloc.find("\\")
|
||||
if i != -1:
|
||||
# ...and fix it by prepending the misplaced components to the path
|
||||
comps = urlparts[1][i:] # note: still has leading backslash
|
||||
comps = netloc[i:] # note: still has leading backslash
|
||||
if not urlparts[2] or urlparts[2] == '/':
|
||||
urlparts[2] = comps
|
||||
else:
|
||||
urlparts[2] = "%s%s" % (comps, urllib.unquote(urlparts[2]))
|
||||
urlparts[1] = urlparts[1][:i]
|
||||
netloc = netloc[:i]
|
||||
else:
|
||||
# a leading ? in path causes urlsplit() to add the query to the
|
||||
# host name
|
||||
i = urlparts[1].find("?")
|
||||
i = netloc.find("?")
|
||||
if i != -1:
|
||||
urlparts[1], urlparts[3] = urlparts[1].split('?', 1)
|
||||
netloc, urlparts[3] = netloc.split('?', 1)
|
||||
# path
|
||||
urlparts[2] = urllib.unquote(urlparts[2])
|
||||
if urlparts[1]:
|
||||
userpass, host = urllib.splituser(urlparts[1])
|
||||
if userpass:
|
||||
# append AT for easy concatenation
|
||||
userpass += "@"
|
||||
else:
|
||||
userpass = ""
|
||||
if urlparts[0] in default_ports:
|
||||
dport = default_ports[urlparts[0]]
|
||||
host, port = splitport(host, port=dport)
|
||||
if host.endswith("."):
|
||||
host = host[:-1]
|
||||
if port != dport:
|
||||
host = "%s:%d" % (host, port)
|
||||
urlparts[1] = userpass+host
|
||||
if userpass:
|
||||
# append AT for easy concatenation
|
||||
userpass += "@"
|
||||
else:
|
||||
userpass = ""
|
||||
if urlparts[0] in default_ports:
|
||||
dport = default_ports[urlparts[0]]
|
||||
host, port = splitport(netloc, port=dport)
|
||||
if host.endswith("."):
|
||||
host = host[:-1]
|
||||
if port != dport:
|
||||
host = "%s:%d" % (host, port)
|
||||
netloc = host
|
||||
urlparts[1] = userpass+netloc
|
||||
return is_idn
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
# -*- coding: iso-8859-1 -*-
|
||||
# Copyright (C) 2004-2010 Bastian Kleineidam
|
||||
# Copyright (C) 2004-2012 Bastian Kleineidam
|
||||
#
|
||||
# This program is free software; you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
|
|
@ -36,6 +36,16 @@ class TestFtp (FtpServerTest):
|
|||
u"real url %s" % url,
|
||||
u"valid",
|
||||
]
|
||||
# ftp use/password
|
||||
user = "anonymous"
|
||||
passwd = "Ftp"
|
||||
url = u"ftp://%s:%s@%s:%d/" % (user, passwd, self.host, self.port)
|
||||
resultlines = [
|
||||
u"url %s" % url,
|
||||
u"cache key %s" % url,
|
||||
u"real url %s" % url,
|
||||
u"valid",
|
||||
]
|
||||
self.direct(url, resultlines)
|
||||
# ftp one slash
|
||||
url = u"ftp:/%s:%d/" % (self.host, self.port)
|
||||
|
|
|
|||
|
|
@ -372,13 +372,13 @@ class TestUrl (unittest.TestCase):
|
|||
|
||||
def test_norm_with_auth (self):
|
||||
# Test norming of URLs with authentication tokens.
|
||||
url = "telnet://user@www.example.org"
|
||||
url = "telnet://User@www.example.org"
|
||||
nurl = url
|
||||
self.urlnormtest(url, nurl)
|
||||
url = "telnet://user:pass@www.example.org"
|
||||
url = "telnet://User:Pass@www.example.org"
|
||||
nurl = url
|
||||
self.urlnormtest(url, nurl)
|
||||
url = "http://user:pass@www.example.org/"
|
||||
url = "http://User:Pass@www.example.org/"
|
||||
nurl = url
|
||||
self.urlnormtest(url, nurl)
|
||||
|
||||
|
|
|
|||
Loading…
Reference in a new issue