diff --git a/tests/__init__.py b/tests/__init__.py index a7de3fd4..cf8a2945 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -46,10 +46,9 @@ def run_checked(cmd, ret_ok=(0,), **kwargs): return retcode - def run_silent(cmd): """Run given command without output.""" - null = open(os.name == 'nt' and ':NUL' or "/dev/null", 'w') + null = open(os.name == "nt" and ":NUL" or "/dev/null", "w") try: return run(cmd, stdout=null, stderr=subprocess.STDOUT) finally: @@ -58,13 +57,16 @@ def run_silent(cmd): def _need_func(testfunc, name): """Decorator skipping test if given testfunc fails.""" + def check_func(func): @wraps(func) def newfunc(*args, **kwargs): if not testfunc(): pytest.skip("%s is not available" % name) return func(*args, **kwargs) + return newfunc + return check_func @@ -80,6 +82,7 @@ def has_network(): pass return False + need_network = _need_func(has_network, "network") @@ -88,6 +91,7 @@ def has_msgfmt(): """Test if msgfmt is available.""" return run_silent(["msgfmt", "-V"]) == 0 + need_msgfmt = _need_func(has_msgfmt, "msgfmt") @@ -96,6 +100,7 @@ def has_posix(): """Test if this is a POSIX system.""" return os.name == "posix" + need_posix = _need_func(has_posix, "POSIX system") @@ -104,6 +109,7 @@ def has_windows(): """Test if this is a Windows system.""" return os.name == "nt" + need_windows = _need_func(has_windows, "Windows system") @@ -112,6 +118,7 @@ def has_linux(): """Test if this is a Linux system.""" return sys.platform.startswith("linux") + need_linux = _need_func(has_linux, "Linux system") @@ -130,6 +137,7 @@ def has_clamav(): pass return False + need_clamav = _need_func(has_clamav, "ClamAV") @@ -144,6 +152,7 @@ def has_proxy(): except Exception: return False + need_proxy = _need_func(has_proxy, "proxy") @@ -152,10 +161,12 @@ def has_pyftpdlib(): """Test if pyftpdlib is available.""" try: import pyftpdlib + return True except ImportError: return False + need_pyftpdlib = _need_func(has_pyftpdlib, "pyftpdlib") @@ -164,16 +175,19 @@ def has_biplist(): """Test if biplist is available.""" try: import biplist + return True except ImportError: return False + need_biplist = _need_func(has_biplist, "biplist") @lru_cache(1) def has_newsserver(server): import nntplib + try: nntp = nntplib.NNTP(server, usenetrc=False) nntp.quit() @@ -184,53 +198,62 @@ def has_newsserver(server): def need_newsserver(server): """Decorator skipping test if newsserver is not available.""" + def check_func(func): def newfunc(*args, **kwargs): if not has_newsserver(server): pytest.skip("Newsserver `%s' is not available" % server) return func(*args, **kwargs) + newfunc.__name__ = func.__name__ return newfunc - return check_func + return check_func @lru_cache(1) def has_x11(): """Test if DISPLAY variable is set.""" - return os.getenv('DISPLAY') is not None + return os.getenv("DISPLAY") is not None -need_x11 = _need_func(has_x11, 'X11') + +need_x11 = _need_func(has_x11, "X11") @lru_cache(1) def has_word(): """Test if Word is available.""" from linkcheck.plugins import parseword + return parseword.has_word() -need_word = _need_func(has_word, 'Word') + +need_word = _need_func(has_word, "Word") @lru_cache(1) def has_pdflib(): from linkcheck.plugins import parsepdf + return parsepdf.has_pdflib -need_pdflib = _need_func(has_pdflib, 'pdflib') + +need_pdflib = _need_func(has_pdflib, "pdflib") @contextmanager def _limit_time(seconds): """Raises LinkCheckerInterrupt if given number of seconds have passed.""" - if os.name == 'posix': + if os.name == "posix": + def signal_handler(signum, frame): raise LinkCheckerInterrupt("timed out") + old_handler = signal.getsignal(signal.SIGALRM) signal.signal(signal.SIGALRM, signal_handler) signal.alarm(seconds) yield - if os.name == 'posix': + if os.name == "posix": signal.alarm(0) if old_handler is not None: signal.signal(signal.SIGALRM, old_handler) @@ -238,6 +261,7 @@ def _limit_time(seconds): def limit_time(seconds, skip=False): """Limit test time to the given number of seconds, else fail or skip.""" + def run_limited(func): def new_func(*args, **kwargs): try: @@ -247,8 +271,10 @@ def limit_time(seconds, skip=False): if skip: pytest.skip("time limit of %d seconds exceeded" % seconds) assert False, msg + new_func.__name__ = func.__name__ return new_func + return run_limited @@ -262,7 +288,7 @@ def get_file(filename=None): return directory -if __name__ == '__main__': +if __name__ == "__main__": print("has clamav", has_clamav()) print("has network", has_network()) print("has msgfmt", has_msgfmt()) diff --git a/tests/cache/test_urlqueue.py b/tests/cache/test_urlqueue.py index 5ec702d0..a4b2ac25 100644 --- a/tests/cache/test_urlqueue.py +++ b/tests/cache/test_urlqueue.py @@ -20,21 +20,18 @@ from collections import namedtuple from linkcheck.cache.results import ResultCache from linkcheck.cache.urlqueue import Empty, NUM_PUTS_CLEANUP, UrlQueue -UrlData = namedtuple('UrlData', 'url cache_url aggregate has_result') -Aggregate = namedtuple('Aggregate', 'result_cache') +UrlData = namedtuple("UrlData", "url cache_url aggregate has_result") +Aggregate = namedtuple("Aggregate", "result_cache") class TestUrlQueue(unittest.TestCase): - def setUp(self): self.result_cache = ResultCache() self.urlqueue = UrlQueue() self.urldata1 = UrlData( url="Foo", cache_url="Foo", - aggregate=Aggregate( - result_cache=self.result_cache, - ), + aggregate=Aggregate(result_cache=self.result_cache,), has_result=True, ) @@ -72,9 +69,7 @@ class TestUrlQueue(unittest.TestCase): the item and it can be get only once """ self.urlqueue.put(self.urldata1) - cached_item = ( - self.result_cache.get_result(self.urldata1) - ) + cached_item = self.result_cache.get_result(self.urldata1) self.assertEqual(cached_item, None) self.assertEqual(self.urlqueue.get(), self.urldata1) with self.assertRaises(Empty): @@ -89,9 +84,7 @@ class TestUrlQueue(unittest.TestCase): urldata = UrlData( url="Bar", cache_url="Bar", - aggregate=Aggregate( - result_cache=self.result_cache, - ), + aggregate=Aggregate(result_cache=self.result_cache,), has_result=False, ) self.urlqueue.put(urldata) @@ -109,9 +102,7 @@ class TestUrlQueue(unittest.TestCase): urldata = UrlData( url="Bar", cache_url="Bar", - aggregate=Aggregate( - result_cache=self.result_cache, - ), + aggregate=Aggregate(result_cache=self.result_cache,), has_result=True, ) self.urlqueue.put(urldata) @@ -129,9 +120,7 @@ class TestUrlQueue(unittest.TestCase): urldata = UrlData( url="Bar", cache_url="Foo", - aggregate=Aggregate( - result_cache=self.result_cache, - ), + aggregate=Aggregate(result_cache=self.result_cache,), has_result=True, ) self.urlqueue.put(urldata) @@ -152,9 +141,7 @@ class TestUrlQueue(unittest.TestCase): UrlData( url="Bar", cache_url="Bar address %s" % i, - aggregate=Aggregate( - result_cache=self.result_cache, - ), + aggregate=Aggregate(result_cache=self.result_cache,), has_result=False, ), ) @@ -162,9 +149,7 @@ class TestUrlQueue(unittest.TestCase): urldata = UrlData( url="Bar", cache_url="Bar address", - aggregate=Aggregate( - result_cache=self.result_cache, - ), + aggregate=Aggregate(result_cache=self.result_cache,), has_result=False, ) self.result_cache.add_result("Bar address 2", "asdf") diff --git a/tests/checker/__init__.py b/tests/checker/__init__.py index 79c063e7..0e647dbe 100644 --- a/tests/checker/__init__.py +++ b/tests/checker/__init__.py @@ -35,21 +35,21 @@ class TestLogger(linkcheck.logger._Logger): """ Output logger for automatic regression tests. """ - + # don't attempt to collect this class because it has an __init__() __test__ = False - LoggerName = 'test' + LoggerName = "test" logparts = [ - 'cachekey', - 'realurl', - 'name', - 'base', - 'info', - 'warning', - 'result', - 'url', + "cachekey", + "realurl", + "name", + "base", + "info", + "warning", + "result", + "url", ] def __init__(self, **kwargs): @@ -58,10 +58,10 @@ class TestLogger(linkcheck.logger._Logger): output lines. """ args = self.get_args(kwargs) - args['parts'] = self.logparts + args["parts"] = self.logparts super(TestLogger, self).__init__(**args) # list of expected output lines - self.expected = args['expected'] + self.expected = args["expected"] # list of real output lines self.result = [] # diff between expected and real output @@ -74,11 +74,19 @@ class TestLogger(linkcheck.logger._Logger): # - split into entries (starting with a URL key) # - sort the entries and join together # - split the entries back into a list - return '\n'.join( - sorted(['url %s' % x.strip() for x in - re.split(r'^url .*?', '\n'.join(result_log), - flags=re.DOTALL | re.MULTILINE) - if x])).splitlines() + return "\n".join( + sorted( + [ + "url %s" % x.strip() + for x in re.split( + r"^url .*?", + "\n".join(result_log), + flags=re.DOTALL | re.MULTILINE, + ) + if x + ] + ) + ).splitlines() def start_output(self): """ @@ -90,42 +98,44 @@ class TestLogger(linkcheck.logger._Logger): """ Append logger output to self.result. """ - if self.has_part('url'): + if self.has_part("url"): url = "url %s" % url_data.base_url self.result.append(url) - if self.has_part('cachekey'): + if self.has_part("cachekey"): cache_key = url_data.cache_url if url_data.cache_url else None self.result.append("cache key %s" % cache_key) - if self.has_part('realurl'): + if self.has_part("realurl"): self.result.append("real url %s" % url_data.url) - if self.has_part('name') and url_data.name: + if self.has_part("name") and url_data.name: self.result.append("name %s" % url_data.name) - if self.has_part('base') and url_data.base_ref: + if self.has_part("base") and url_data.base_ref: self.result.append("baseurl %s" % url_data.base_ref) - if self.has_part('info'): + if self.has_part("info"): for info in url_data.info: - if "Last modified" not in info and \ - "is located in" not in info and \ - "Using proxy" not in info: + if ( + "Last modified" not in info + and "is located in" not in info + and "Using proxy" not in info + ): self.result.append("info %s" % info) - if self.has_part('warning'): + if self.has_part("warning"): for tag, warning in url_data.warnings: self.result.append("warning %s" % warning) - if self.has_part('result'): + if self.has_part("result"): self.result.append("valid" if url_data.valid else "error") - if self.has_part('line'): + if self.has_part("line"): self.result.append("line %s" % url_data.line) - if self.has_part('col'): + if self.has_part("col"): self.result.append("col %s" % url_data.column) - if self.has_part('size'): + if self.has_part("size"): self.result.append("size %s" % url_data.size) - if self.has_part('parent_url'): + if self.has_part("parent_url"): self.result.append("parent_url %s" % url_data.parent_url) - if self.has_part('page'): + if self.has_part("page"): self.result.append("page %s" % url_data.page) - if self.has_part('modified'): + if self.has_part("modified"): self.result.append("modified %s" % url_data.modified) - if self.has_part('content_type'): + if self.has_part("content_type"): self.result.append("content_type %s" % url_data.content_type) # note: do not append url_data.result since this is # platform dependent @@ -136,10 +146,15 @@ class TestLogger(linkcheck.logger._Logger): """ self.expected = self.normalize(self.expected) self.result = self.normalize(self.result) - self.diff = list(difflib.unified_diff(self.expected, self.result, - fromfile="expected", - tofile="result", - lineterm="")) + self.diff = list( + difflib.unified_diff( + self.expected, + self.result, + fromfile="expected", + tofile="result", + lineterm="", + ) + ) def get_file_url(filename): @@ -147,31 +162,31 @@ def get_file_url(filename): def add_fileoutput_config(config): - if os.name == 'posix': - devnull = '/dev/null' - elif os.name == 'nt': - devnull = 'NUL' + if os.name == "posix": + devnull = "/dev/null" + elif os.name == "nt": + devnull = "NUL" else: return for ftype in linkcheck.logger.LoggerNames: - if ftype in ('test', 'blacklist'): + if ftype in ("test", "blacklist"): continue logger = config.logger_new(ftype, fileoutput=1, filename=devnull) - config['fileoutput'].append(logger) + config["fileoutput"].append(logger) def get_test_aggregate(confargs, logargs, logger=TestLogger): """Initialize a test configuration object.""" config = linkcheck.configuration.Configuration() config.logger_add(logger) - config['recursionlevel'] = 1 - config['logger'] = config.logger_new(logger.LoggerName, **logargs) + config["recursionlevel"] = 1 + config["logger"] = config.logger_new(logger.LoggerName, **logargs) add_fileoutput_config(config) # uncomment for debugging - #config.init_logging(None, debug=["all"]) + # config.init_logging(None, debug=["all"]) config["verbose"] = True - config['threads'] = 0 - config['status'] = False + config["threads"] = 0 + config["status"] = False config["checkextern"] = True config.update(confargs) config.sanitize() @@ -182,13 +197,14 @@ class LinkCheckTest(unittest.TestCase): """ Functional test class with ability to test local files. """ + logger = TestLogger def setUp(self): """Ensure the current locale setting is the default. Otherwise, warnings will get translated and will break tests.""" super(LinkCheckTest, self).setUp() - linkcheck.init_i18n(loc='C') + linkcheck.init_i18n(loc="C") def norm(self, url, encoding="utf-8"): """Helper function to norm a url.""" @@ -198,8 +214,8 @@ class LinkCheckTest(unittest.TestCase): """Return current and data directory as dictionary. You can augment the dict with keyword attributes.""" d = { - 'curdir': get_file_url(os.getcwd()), - 'datadir': "tests/checker/data", + "curdir": get_file_url(os.getcwd()), + "datadir": "tests/checker/data", } d.update(kwargs) return d @@ -210,16 +226,20 @@ class LinkCheckTest(unittest.TestCase): ignoring empty lines and lines starting with a hash sign (#). """ resultfile = get_file("%s.result" % filename) - d = {'curdir': get_file_url(os.getcwd()), - 'datadir': get_file_url(get_file()), - } + d = { + "curdir": get_file_url(os.getcwd()), + "datadir": get_file_url(get_file()), + } # the webserver uses the first free port number - if hasattr(self, 'port'): - d['port'] = self.port + if hasattr(self, "port"): + d["port"] = self.port # all result files are encoded in utf-8 with codecs.open(resultfile, "r", "utf-8") as f: - return [line.rstrip('\r\n') % d for line in f - if line.strip() and not line.startswith('#')] + return [ + line.rstrip("\r\n") % d + for line in f + if line.strip() and not line.startswith("#") + ] def get_url(self, filename): """Get URL for given filename.""" @@ -230,38 +250,44 @@ class LinkCheckTest(unittest.TestCase): url = self.get_url(filename) if confargs is None: confargs = {} - logargs = {'expected': self.get_resultlines(filename)} + logargs = {"expected": self.get_resultlines(filename)} aggregate = get_test_aggregate(confargs, logargs, logger=self.logger) url_data = get_url_from(url, 0, aggregate, extern=(0, 0)) aggregate.urlqueue.put(url_data) linkcheck.director.check_urls(aggregate) - logger = aggregate.config['logger'] + logger = aggregate.config["logger"] diff = logger.diff if diff: msg = os.linesep.join([url] + diff) self.fail(msg) if logger.stats.internal_errors: - self.fail("%d internal errors occurred!" - % logger.stats.internal_errors) + self.fail("%d internal errors occurred!" % logger.stats.internal_errors) - def direct(self, url, resultlines, parts=None, recursionlevel=0, - confargs=None, url_encoding=None): + def direct( + self, + url, + resultlines, + parts=None, + recursionlevel=0, + confargs=None, + url_encoding=None, + ): """Check url with expected result.""" assert isinstance(url, str), repr(url) if confargs is None: - confargs = {'recursionlevel': recursionlevel} + confargs = {"recursionlevel": recursionlevel} else: - confargs['recursionlevel'] = recursionlevel - logargs = {'expected': resultlines} + confargs["recursionlevel"] = recursionlevel + logargs = {"expected": resultlines} if parts is not None: - logargs['parts'] = parts + logargs["parts"] = parts aggregate = get_test_aggregate(confargs, logargs) # initial URL has recursion level zero url_reclevel = 0 url_data = get_url_from(url, url_reclevel, aggregate, url_encoding=url_encoding) aggregate.urlqueue.put(url_data) linkcheck.director.check_urls(aggregate) - diff = aggregate.config['logger'].diff + diff = aggregate.config["logger"].diff if diff: l = ["Differences found testing %s" % url] l.extend(x.rstrip() for x in diff[2:]) diff --git a/tests/checker/ftpserver.py b/tests/checker/ftpserver.py index db756be8..6fd192f7 100644 --- a/tests/checker/ftpserver.py +++ b/tests/checker/ftpserver.py @@ -26,13 +26,14 @@ from . import LinkCheckTest TIMEOUT = 5 + class FtpServerTest(LinkCheckTest): """Start/stop an FTP server that can be used for testing.""" - def __init__(self, methodName='runTest'): + def __init__(self, methodName="runTest"): """Init test class and store default ftp server port.""" super(FtpServerTest, self).__init__(methodName=methodName) - self.host = 'localhost' + self.host = "localhost" self.port = None def setUp(self): @@ -62,7 +63,7 @@ def start_server(host, port): pytest.skip("pyftpdlib is not available") return authorizer = DummyAuthorizer() - datadir = os.path.join(os.path.dirname(__file__), 'data') + datadir = os.path.join(os.path.dirname(__file__), "data") authorizer.add_anonymous(datadir) # Instantiate FTP handler class diff --git a/tests/checker/httpserver.py b/tests/checker/httpserver.py index 97d66cf5..f4dcf2f0 100644 --- a/tests/checker/httpserver.py +++ b/tests/checker/httpserver.py @@ -49,10 +49,11 @@ class StoppableHttpRequestHandler(SimpleHTTPRequestHandler): """ pass + # serve .xhtml files as application/xhtml+xml -StoppableHttpRequestHandler.extensions_map.update({ - '.xhtml': 'application/xhtml+xml', -}) +StoppableHttpRequestHandler.extensions_map.update( + {".xhtml": "application/xhtml+xml",} +) class StoppableHttpServer(HTTPServer): @@ -79,15 +80,15 @@ class NoQueryHttpRequestHandler(StoppableHttpRequestHandler): """ Remove everything after a question mark. """ - i = self.path.find('?') + i = self.path.find("?") if i != -1: self.path = self.path[:i] def get_status(self): - dummy, status = self.path.rsplit('/', 1) + dummy, status = self.path.rsplit("/", 1) status = int(status) if status in self.responses: - return status + return status return 500 def do_GET(self): @@ -99,7 +100,7 @@ class NoQueryHttpRequestHandler(StoppableHttpRequestHandler): status = self.get_status() self.send_response(status) self.end_headers() - if status >= 200 and status not in (204, 304): + if status >= 200 and status not in (204, 304): self.wfile.write(b"testcontent") else: super(NoQueryHttpRequestHandler, self).do_GET() @@ -131,9 +132,9 @@ class NoQueryHttpRequestHandler(StoppableHttpRequestHandler): list = ["example1.txt", "example2.html", "example3"] for name in list: displayname = linkname = name - list_item = ( - '