Run pyupgrade --py37-plus --keep-percent-format

This commit is contained in:
Chris Mayo 2022-11-08 19:21:29 +00:00
parent ea2cb6c597
commit 0bb1576887
19 changed files with 34 additions and 44 deletions

View file

@ -22,15 +22,13 @@ def parse_bookmark_data(data):
Return iterator for bookmarks of the form (url, name).
Bookmarks are not sorted.
"""
for url, name in parse_bookmark_json(json.loads(data)):
yield url, name
yield from parse_bookmark_json(json.loads(data))
def parse_bookmark_json(data):
"""Parse complete JSON data for Chromium Bookmarks."""
for entry in data["roots"].values():
for url, name in parse_bookmark_node(entry):
yield url, name
yield from parse_bookmark_node(entry)
def parse_bookmark_node(node):
@ -39,5 +37,4 @@ def parse_bookmark_node(node):
yield node["url"], node["name"]
elif node["type"] == "folder":
for child in node["children"]:
for entry in parse_bookmark_node(child):
yield entry
yield from parse_bookmark_node(child)

View file

@ -48,8 +48,7 @@ def parse_plist(entry):
yield (url, title)
elif has_children(entry):
for child in entry[KEY_CHILDREN]:
for item in parse_plist(child):
yield item
yield from parse_plist(child)
def is_leaf(entry):

View file

@ -23,7 +23,6 @@ from urllib.request import urlopen
import time
import errno
import socket
import select
from io import BytesIO
from . import absolute_url, get_url_from
@ -538,7 +537,7 @@ class UrlBase:
trace.trace_on()
try:
self.local_check()
except (socket.error, select.error):
except OSError:
# on Unix, ctrl-c can raise
# error: (4, 'Interrupted system call')
etype, value = sys.exc_info()[:2]

View file

@ -119,7 +119,7 @@ def init():
global _default_foreground, _default_background, _default_style
try:
attrs = GetConsoleScreenBufferInfo().wAttributes
except (ArgumentError, WindowsError):
except (ArgumentError, OSError):
_default_foreground = GREY
_default_background = BLACK
_default_style = NORMAL

View file

@ -25,7 +25,7 @@ import sys
import codecs
# more supported languages are added in init()
supported_languages = set(['en'])
supported_languages = {'en'}
default_language = default_encoding = None
default_directory = None
default_domain = None

View file

@ -60,8 +60,7 @@ def application(environ, start_response):
status = '200 OK'
start_response(status, get_response_headers())
for output in checklink(form=form, env=environ):
yield output
yield from checklink(form=form, env=environ)
_supported_langs = ('de', 'C')
@ -113,7 +112,7 @@ class ThreadsafeIO:
"""Write given unicode data to buffer."""
assert isinstance(data, str)
if self.closed:
raise IOError("Write on closed I/O object")
raise OSError("Write on closed I/O object")
if data:
self.buf.append(data)

View file

@ -85,8 +85,7 @@ def get_plugins(modules, classes):
@rtype: iterator of class objects
"""
for module in modules:
for plugin in get_module_plugins(module, classes):
yield plugin
yield from get_module_plugins(module, classes)
def get_module_plugins(module, classes):

View file

@ -226,7 +226,7 @@ class _Logger(abc.ABC):
os.makedirs(path)
self.fd = self.create_fd()
self.close_fd = True
except IOError:
except OSError:
msg = sys.exc_info()[1]
log.warn(
LOG_CHECK,
@ -255,13 +255,13 @@ class _Logger(abc.ABC):
if self.fd is not None:
try:
self.flush()
except IOError:
except OSError:
# ignore flush errors
pass
if self.close_fd:
try:
self.fd.close()
except IOError:
except OSError:
# ignore close errors
pass
self.fd = None
@ -308,7 +308,7 @@ class _Logger(abc.ABC):
else:
try:
self.fd.write(s, **args)
except IOError:
except OSError:
msg = sys.exc_info()[1]
log.warn(
LOG_CHECK,
@ -436,7 +436,7 @@ class _Logger(abc.ABC):
if hasattr(self, "fd"):
try:
self.fd.flush()
except (IOError, AttributeError):
except (OSError, AttributeError):
pass
def log_internal_error(self):
@ -453,7 +453,7 @@ class _Logger(abc.ABC):
@rtype: unicode
"""
if modified is not None:
return modified.strftime("%Y-%m-%d{0}%H:%M:%S.%fZ".format(sep))
return modified.strftime(f"%Y-%m-%d{sep}%H:%M:%S.%fZ")
return ""

View file

@ -85,7 +85,7 @@ class FailuresLogger(_Logger):
"""
Read a previously stored failures from file fd.
"""
with open(self.filename, 'r', encoding=self.output_encoding,
with open(self.filename, encoding=self.output_encoding,
errors=self.codec_errors) as fd:
for line in fd:
line = line.rstrip()

View file

@ -112,8 +112,8 @@ class SQLLogger(_Logger):
% {
'table': self.dbname,
'base_url': sqlify(url_data.base_url),
'url_parent': sqlify((url_data.parent_url)),
'base_ref': sqlify((url_data.base_ref)),
'url_parent': sqlify(url_data.parent_url),
'base_ref': sqlify(url_data.base_ref),
'valid': intify(url_data.valid),
'result': sqlify(url_data.result),
'warning': sqlify(os.linesep.join(x[1] for x in url_data.warnings)),

View file

@ -47,7 +47,7 @@ def resolve_host(host):
# canonical name, socket address)
# add first ip of socket address
ips.append(res[4][0])
except socket.error:
except OSError:
log.info(LOG_CHECK, "Ignored invalid host %r", host)
return ips

View file

@ -65,10 +65,8 @@ class _ParserPlugin(_PluginBase):
def get_plugin_modules(folders):
"""Get plugin modules for given folders."""
for folder in folders:
for module in loader.get_folder_modules(folder, 'linkcheck.dummy'):
yield module
for module in loader.get_package_modules('plugins', __path__):
yield module
yield from loader.get_folder_modules(folder, 'linkcheck.dummy')
yield from loader.get_package_modules('plugins', __path__)
def get_plugin_classes(modules):

View file

@ -61,7 +61,7 @@ class UrlAnchorCheck:
if any(x for x in self.anchors if x[0] == decoded_anchor):
return
if self.anchors:
anchornames = sorted(set("`%s'" % x[0] for x in self.anchors))
anchornames = sorted({"`%s'" % x[0] for x in self.anchors})
anchors = ", ".join(anchornames)
else:
anchors = "-"

View file

@ -19,7 +19,6 @@ Store and retrieve country names for IPs.
from . import _ConnectionPlugin
import os
import sys
import socket
from ..lock import get_lock
from ..decorators import synchronized
from .. import log, LOG_PLUGIN
@ -105,7 +104,7 @@ def get_location(host):
return None
try:
record = get_geoip_record(host)
except (geoip_error, socket.error):
except (geoip_error, OSError):
log.debug(LOG_PLUGIN, "Geoip error for %r", host, exception=True)
# ignore lookup errors
return None

View file

@ -64,7 +64,7 @@ def has_word():
key = winreg.OpenKey(winreg.HKEY_CLASSES_ROOT, "Word.Application")
winreg.CloseKey(key)
return True
except (EnvironmentError, ImportError):
except (OSError, ImportError):
pass
return False

View file

@ -94,7 +94,7 @@ class ClamdScanner:
if i != -1:
port = int(data[i + 5:])
break
except socket.error:
except OSError:
self.sock.close()
raise
if port is None:
@ -103,7 +103,7 @@ class ClamdScanner:
wsock = create_socket(socket.AF_INET, socket.SOCK_STREAM)
try:
wsock.connect(sockinfo[0][4])
except socket.error:
except OSError:
wsock.close()
raise
return wsock
@ -199,7 +199,7 @@ class ClamavConfig(dict):
addr = self['LocalSocket']
try:
sock.connect(addr)
except socket.error:
except OSError:
sock.close()
raise
return sock
@ -211,7 +211,7 @@ class ClamavConfig(dict):
sock = create_socket(socket.AF_INET, socket.SOCK_STREAM)
try:
sock.connect(sockinfo[0][4])
except socket.error:
except OSError:
sock.close()
raise
return sock
@ -225,7 +225,7 @@ def scan(data, clamconf):
"""
try:
scanner = ClamdScanner(clamconf)
except socket.error:
except OSError:
errmsg = _("Could not connect to ClamAV daemon.")
return ([], [errmsg])
try:

View file

@ -25,7 +25,7 @@ if socket.has_ipv6:
try:
socket.socket(socket.AF_INET6, socket.SOCK_STREAM).close()
has_ipv6 = True
except socket.error as msg:
except OSError as msg:
# only catch these one:
# socket.error: (97, 'Address family not supported by protocol')
# socket.error: (10047, 'Address family not supported by protocol')

View file

@ -224,7 +224,7 @@ class LinkCheckTest(TestBase):
if hasattr(self, "port"):
d["port"] = self.port
# all result files are encoded in utf-8
with open(resultfile, "r", encoding="utf-8") as f:
with open(resultfile, encoding="utf-8") as f:
return [
line.rstrip("\r\n") % d
for line in f

View file

@ -43,8 +43,8 @@ class TestLFUCache(unittest.TestCase):
self.assertTrue(not self.d)
self.d["a"] = 1
self.d["b"] = 2
self.assertEqual(set([1, 2]), set(self.d.values()))
self.assertEqual(set([1, 2]), set(self.d.itervalues()))
self.assertEqual({1, 2}, set(self.d.values()))
self.assertEqual({1, 2}, set(self.d.itervalues()))
def test_popitem(self):
self.assertTrue(not self.d)