recurse into ftp links

git-svn-id: https://linkchecker.svn.sourceforge.net/svnroot/linkchecker/trunk/linkchecker@1839 e7d03fd6-7b0d-0410-9947-9c21f3af8025
This commit is contained in:
calvin 2004-09-20 18:30:44 +00:00
parent ee71260815
commit 5201de5cb6
3 changed files with 116 additions and 43 deletions

View file

@ -17,6 +17,11 @@
Type: documentation
Changed: linkchecker, linkchecker.1
* Recursion into FTP directories and parseable files has been
implemented.
Type: feature
Changed: linkcheck/checker/ftpurl.py
1.13.4 "Shaun of the dead" (released 17.9.2004)
* Catch HTTP cookie errors and add a warning.
Type: bugfix

2
TODO
View file

@ -2,8 +2,6 @@ Next releases:
- rethink intern/extern stuff
- recursion into FTP links
- when an URL is found in the cache and it has a broken anchor, the broken
anchor name is not displayed as a warning

View file

@ -17,18 +17,30 @@
# Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
import ftplib
import time
import urllib
import cStringIO as StringIO
import linkcheck
import urlbase
import proxysupport
import httpurl
# XXX downloading of files is missing!
class FtpUrl (urlbase.UrlBase, proxysupport.ProxySupport):
"""Url link with ftp scheme."""
def __init__ (self, base_url, recursion_level, consumer,
parent_url = None,
base_ref = None, line=0, column=0, name=""):
super(FtpUrl, self).__init__(base_url, recursion_level, consumer,
parent_url=parent_url, base_ref=base_ref,
line=line, column=column, name=name)
# list of files for recursion
self.files = []
# last part of URL filename
self.filename = None
def check_connection (self):
# proxy support (we support only http)
self.set_proxy(self.consumer.config["proxy"].get(self.scheme))
@ -50,26 +62,13 @@ class FtpUrl (urlbase.UrlBase, proxysupport.ProxySupport):
else:
_user, _password = self.get_user_password()
self.login(_user, _password)
filename = self.cwd()
if filename:
self.listfile(filename)
return None
def is_html (self):
if linkcheck.checker.extensions['html'].search(self.url):
return True
return False
def is_parseable (self):
for ro in linkcheck.checker.extensions.values():
if ro.search(self.url):
return True
return False
def parse_url (self):
for key, ro in linkcheck.checker.extensions.items():
if ro.search(self.url):
return getattr(self, "parse_"+key)()
self.filename = self.cwd()
self.listfile(self.filename)
if self.is_directory():
self.url_connection.cwd(self.filename)
self.files = self.get_files()
else:
self.files = []
return None
def login (self, _user, _password):
@ -97,35 +96,106 @@ class FtpUrl (urlbase.UrlBase, proxysupport.ProxySupport):
#self.add_info(info)
def cwd (self):
"""change to URL parent directory"""
# leeched from webcheck
dirs = self.urlparts[2].split('/')
"""Change to URL parent directory. Return filename of last path
component.
"""
dirname = self.urlparts[2].strip('/')
dirs = dirname.split('/')
filename = dirs.pop()
if len(dirs) and not dirs[0]:
del dirs[0]
self.url_connection.cwd('/')
for d in dirs:
self.url_connection.cwd(d)
return filename
def listfile (self, filename):
"""see if filename is in the current FTP directory"""
# it could be a directory if the trailing slash was forgotten
try:
self.url_connection.cwd(filename)
self.add_warning(_("Missing trailing directory slash in ftp url"))
return
except ftplib.error_perm:
pass
files = self.url_connection.nlst()
if filename not in files:
linkcheck.log.debug(linkcheck.LOG_CHECK,
"FTP file %r not found in %s",
filename, str(files))
linkcheck.log.debug(linkcheck.LOG_CHECK, "FTP files %s", str(files))
if filename and filename not in files:
raise ftplib.error_perm, "550 File not found"
def get_files (self):
"""Get list of filenames in directory. Subdirectories have an
ending slash.
"""
# Rudimentary LIST output parsing. An entry is assumed to have
# the following form:
# drwxr-xr-x 8 root wheel 1024 Jan 3 1994 foo
# Symbolic links are assumed to have the following form:
# drwxr-xr-x 8 root wheel 1024 Jan 3 1994 foo -> bar
files = []
def add_entry (line):
linkcheck.log.debug(linkcheck.LOG_CHECK, "Directory entry %r",
line)
parts = line.split()
if len(parts) >= 8:
if parts[-2] == "->":
# symbolic link
fname = parts[-3]
else:
fname = parts[-1]
if fname not in (".", ".."):
if line.startswith("d"):
# a directory
fname += "/"
files.append(fname)
self.url_connection.dir(add_entry)
return files
def is_html (self):
if linkcheck.checker.extensions['html'].search(self.url):
return True
return False
def is_parseable (self):
if self.is_directory():
return True
for ro in linkcheck.checker.extensions.values():
if ro.search(self.url):
return True
return False
def is_directory (self):
# it could be a directory if the trailing slash was forgotten
if self.filename is not None and not self.url.endswith('/'):
try:
self.url_connection.cwd(self.filename)
self.add_warning(_("Missing trailing directory slash in ftp url"))
self.url += '/'
self.cwd()
except ftplib.error_perm, msg:
pass
return self.url.endswith('/')
def parse_url (self):
if self.is_directory():
return self.parse_html()
for key, ro in linkcheck.checker.extensions.items():
if ro.search(self.url):
return getattr(self, "parse_"+key)()
return None
def get_content (self):
if not self.valid:
return ""
if self.has_content:
return self.data
t = time.time()
if self.is_directory():
self.data = linkcheck.checker.get_index_html(self.files)
else:
# download file in BINARY mode
buf = StringIO.StringIO()
def stor_data (s):
buf.write(s)
self.url_connection.retrbinary(ftpcmd, stor_data)
self.data = buf.getvalue()
buf.close()
self.dltime = time.time() - t
self.dlsize = len(self.data)
self.has_content = True
return self.data
def close_connection (self):
try:
self.url_connection.close()
except ftplib.Error:
pass
self.url_connection.close()
self.url_connection = None