mirror of
https://github.com/Hopiu/linkchecker.git
synced 2026-04-27 17:44:42 +00:00
Merge pull request #442 from cjmayo/unicode_safe
Remove strformat.unicode_safe()
This commit is contained in:
commit
282f8c970d
13 changed files with 68 additions and 79 deletions
|
|
@ -21,6 +21,8 @@ addons:
|
|||
apt:
|
||||
packages:
|
||||
- clamav-daemon
|
||||
- geoip-database
|
||||
- libgeoip-dev
|
||||
install:
|
||||
- pip install tox-travis coveralls
|
||||
before_script:
|
||||
|
|
|
|||
|
|
@ -21,7 +21,7 @@ import os
|
|||
import html
|
||||
import urllib.parse
|
||||
|
||||
from .. import strformat, url as urlutil, log, LOG_CHECK
|
||||
from .. import url as urlutil, log, LOG_CHECK
|
||||
|
||||
MAX_FILESIZE = 1024 * 1024 * 10 # 10MB
|
||||
|
||||
|
|
@ -103,16 +103,10 @@ def get_url_from(
|
|||
@type extern: tuple(int, int) or None
|
||||
"""
|
||||
if base_url is not None:
|
||||
base_url = strformat.unicode_safe(base_url)
|
||||
# left strip for detection of URL scheme
|
||||
base_url_stripped = base_url.lstrip()
|
||||
else:
|
||||
base_url_stripped = base_url
|
||||
if parent_url is not None:
|
||||
parent_url = strformat.unicode_safe(parent_url)
|
||||
if base_ref is not None:
|
||||
base_ref = strformat.unicode_safe(base_ref)
|
||||
name = strformat.unicode_safe(name)
|
||||
url = absolute_url(base_url_stripped, base_ref, parent_url).lower()
|
||||
if ":" in url:
|
||||
scheme = url.split(":", 1)[0].lower()
|
||||
|
|
|
|||
|
|
@ -16,6 +16,9 @@
|
|||
"""
|
||||
Handle http links.
|
||||
"""
|
||||
|
||||
import urllib.parse
|
||||
|
||||
import requests
|
||||
|
||||
# The validity of SSL certs is ignored to be able
|
||||
|
|
@ -34,7 +37,6 @@ import re
|
|||
from .. import (
|
||||
log,
|
||||
LOG_CHECK,
|
||||
strformat,
|
||||
mimeutil,
|
||||
url as urlutil,
|
||||
LinkCheckerError,
|
||||
|
|
@ -50,9 +52,6 @@ from requests.sessions import REDIRECT_STATI
|
|||
HEADER_ENCODING = "iso-8859-1"
|
||||
HTTP_SCHEMAS = ('http://', 'https://')
|
||||
|
||||
# helper alias
|
||||
unicode_safe = strformat.unicode_safe
|
||||
|
||||
# match for robots meta element content attribute
|
||||
nofollow_re = re.compile(r"\bnofollow\b", re.IGNORECASE)
|
||||
|
||||
|
|
@ -111,7 +110,7 @@ class HttpUrl(internpaturl.InternPatternUrl, proxysupport.ProxySupport):
|
|||
# Note that content-encoding causes size differences since
|
||||
# the content data is always decoded.
|
||||
try:
|
||||
self.size = int(self.getheader("Content-Length"))
|
||||
self.size = int(self.headers["Content-Length"])
|
||||
except (ValueError, OverflowError):
|
||||
pass
|
||||
else:
|
||||
|
|
@ -273,7 +272,7 @@ class HttpUrl(internpaturl.InternPatternUrl, proxysupport.ProxySupport):
|
|||
# Reset extern and recalculate
|
||||
self.extern = None
|
||||
self.set_extern(newurl)
|
||||
self.urlparts = strformat.url_unicode_split(newurl)
|
||||
self.urlparts = list(urllib.parse.urlsplit(newurl))
|
||||
self.build_url_parts()
|
||||
self.url_connection = response
|
||||
self.headers = response.headers
|
||||
|
|
@ -285,17 +284,6 @@ class HttpUrl(internpaturl.InternPatternUrl, proxysupport.ProxySupport):
|
|||
# run connection plugins for old connection
|
||||
self.aggregate.plugin_manager.run_connection_plugins(self)
|
||||
|
||||
def getheader(self, name, default=None):
|
||||
"""Get decoded header value.
|
||||
|
||||
@return: decoded header value or default of not found
|
||||
@rtype: unicode or type of default
|
||||
"""
|
||||
value = self.headers.get(name)
|
||||
if value is None:
|
||||
return default
|
||||
return unicode_safe(value, encoding=HEADER_ENCODING)
|
||||
|
||||
def check_response(self):
|
||||
"""Check final result and log it."""
|
||||
if (
|
||||
|
|
@ -315,7 +303,8 @@ class HttpUrl(internpaturl.InternPatternUrl, proxysupport.ProxySupport):
|
|||
|
||||
if self.url_connection.status_code == 429:
|
||||
self.add_warning(
|
||||
"Rate limited (Retry-After: %s)" % self.getheader(_("Retry-After")),
|
||||
"Rate limited (Retry-After: %s)"
|
||||
% self.headers.get(_("Retry-After")),
|
||||
tag=WARN_URL_RATE_LIMITED,
|
||||
)
|
||||
|
||||
|
|
@ -353,7 +342,7 @@ class HttpUrl(internpaturl.InternPatternUrl, proxysupport.ProxySupport):
|
|||
value = self.headers['Refresh'].strip()
|
||||
mo = refresh_re.match(value)
|
||||
if mo:
|
||||
url = unicode_safe(mo.group("url"))
|
||||
url = mo.group("url")
|
||||
name = "Refresh: header"
|
||||
self.add_url(url, name=name)
|
||||
if 'Content-Location' in self.headers:
|
||||
|
|
|
|||
|
|
@ -16,15 +16,18 @@
|
|||
"""
|
||||
Intern URL pattern support.
|
||||
"""
|
||||
|
||||
import re
|
||||
import urllib.parse
|
||||
|
||||
from . import urlbase, absolute_url
|
||||
from .. import strformat, url as urlutil
|
||||
from .. import url as urlutil
|
||||
|
||||
|
||||
def get_intern_pattern(url):
|
||||
"""Return intern pattern for given URL. Redirections to the same
|
||||
domain with or without "www." prepended are allowed."""
|
||||
parts = strformat.url_unicode_split(url)
|
||||
parts = urllib.parse.urlsplit(url)
|
||||
scheme = parts[0].lower()
|
||||
domain = parts[1].lower()
|
||||
domain, is_idn = urlutil.idna_encode(domain)
|
||||
|
|
|
|||
|
|
@ -53,9 +53,6 @@ from .const import (
|
|||
)
|
||||
from ..url import url_fix_wayback_query
|
||||
|
||||
# helper alias
|
||||
unicode_safe = strformat.unicode_safe
|
||||
|
||||
# schemes that are invalid with an empty hostname
|
||||
scheme_requires_host = ("ftp", "http", "telnet")
|
||||
|
||||
|
|
@ -389,7 +386,7 @@ class UrlBase:
|
|||
self.build_url()
|
||||
self.check_url_warnings()
|
||||
except tuple(ExcSyntaxList) as msg:
|
||||
self.set_result(unicode_safe(msg), valid=False)
|
||||
self.set_result(str(msg), valid=False)
|
||||
else:
|
||||
self.set_cache_url()
|
||||
|
||||
|
|
@ -440,7 +437,7 @@ class UrlBase:
|
|||
urlparts[2] = url_fix_wayback_query(urlparts[2])
|
||||
self.url = urlutil.urlunsplit(urlparts)
|
||||
# split into (modifiable) list
|
||||
self.urlparts = strformat.url_unicode_split(self.url)
|
||||
self.urlparts = list(urllib.parse.urlsplit(self.url))
|
||||
self.build_url_parts()
|
||||
# and unsplit again
|
||||
self.url = urlutil.urlunsplit(self.urlparts)
|
||||
|
|
@ -529,7 +526,7 @@ class UrlBase:
|
|||
'host': self.host,
|
||||
'msg': value,
|
||||
}
|
||||
self.set_result(unicode_safe(value), valid=False)
|
||||
self.set_result(value, valid=False)
|
||||
|
||||
def check_content(self):
|
||||
"""Check content of URL.
|
||||
|
|
@ -776,7 +773,7 @@ class UrlBase:
|
|||
"""
|
||||
Return serialized url check data as unicode string.
|
||||
"""
|
||||
return unicode_safe(sep).join(
|
||||
return sep.join(
|
||||
[
|
||||
"%s link" % self.scheme,
|
||||
"base_url=%r" % self.base_url,
|
||||
|
|
|
|||
|
|
@ -21,7 +21,6 @@ from io import StringIO
|
|||
import os
|
||||
|
||||
from . import _Logger
|
||||
from .. import strformat
|
||||
|
||||
Columns = (
|
||||
"urlname",
|
||||
|
|
@ -134,7 +133,7 @@ class CSVLogger(_Logger):
|
|||
row.append(url_data.level)
|
||||
if self.has_part("modified"):
|
||||
row.append(self.format_modified(url_data.modified))
|
||||
self.writerow(map(strformat.unicode_safe, row))
|
||||
self.writerow(row)
|
||||
self.flush()
|
||||
|
||||
def writerow(self, row):
|
||||
|
|
|
|||
|
|
@ -22,7 +22,6 @@ import sys
|
|||
import socket
|
||||
from ..lock import get_lock
|
||||
from ..decorators import synchronized
|
||||
from ..strformat import unicode_safe
|
||||
from .. import log, LOG_PLUGIN
|
||||
|
||||
|
||||
|
|
@ -112,9 +111,9 @@ def get_location(host):
|
|||
return None
|
||||
value = ""
|
||||
if record and record.get("city"):
|
||||
value += unicode_safe(record["city"])
|
||||
value += record["city"]
|
||||
if record and record.get("country_name"):
|
||||
if value:
|
||||
value += ", "
|
||||
value += unicode_safe(record["country_name"])
|
||||
value += record["country_name"]
|
||||
return value
|
||||
|
|
|
|||
|
|
@ -30,7 +30,7 @@ except ImportError:
|
|||
has_pdflib = False
|
||||
else:
|
||||
has_pdflib = True
|
||||
from .. import log, LOG_PLUGIN, strformat
|
||||
from .. import log, LOG_PLUGIN
|
||||
|
||||
|
||||
def search_url(obj, url_data, pageno, seen_objs):
|
||||
|
|
@ -44,14 +44,7 @@ def search_url(obj, url_data, pageno, seen_objs):
|
|||
if isinstance(obj, dict):
|
||||
for key, value in obj.items():
|
||||
if key == 'URI':
|
||||
if isinstance(value, str):
|
||||
url = value
|
||||
else:
|
||||
# URIs should be 7bit ASCII encoded, but be safe and encode
|
||||
# to unicode
|
||||
# XXX this does not use an optional specified base URL
|
||||
url = strformat.unicode_safe(value)
|
||||
url_data.add_url(url, page=pageno)
|
||||
url_data.add_url(value.decode("ascii"), page=pageno)
|
||||
else:
|
||||
search_url(value, url_data, pageno, seen_objs)
|
||||
elif isinstance(obj, list):
|
||||
|
|
|
|||
|
|
@ -26,10 +26,8 @@ import re
|
|||
import textwrap
|
||||
import os
|
||||
import time
|
||||
import urllib.parse
|
||||
import locale
|
||||
import pydoc
|
||||
from . import i18n
|
||||
|
||||
# some handy time constants
|
||||
SECONDS_PER_MINUTE = 60
|
||||
|
|
@ -37,25 +35,6 @@ SECONDS_PER_HOUR = 60 * SECONDS_PER_MINUTE
|
|||
SECONDS_PER_DAY = 24 * SECONDS_PER_HOUR
|
||||
|
||||
|
||||
def unicode_safe(s, encoding=i18n.default_encoding, errors='replace'):
|
||||
"""Get unicode string without raising encoding errors. Unknown
|
||||
characters of the given encoding will be ignored.
|
||||
|
||||
@param s: the string to be decoded
|
||||
@type s: any object except None
|
||||
@return: if s is already unicode, return s unchanged; else return
|
||||
decoded unicode string of str(s)
|
||||
@rtype: unicode
|
||||
"""
|
||||
assert s is not None, "argument to unicode_safe was None"
|
||||
if isinstance(s, str):
|
||||
# s is already unicode, nothing to do
|
||||
return s
|
||||
elif isinstance(s, bytes):
|
||||
return s.decode(encoding, errors)
|
||||
return str(s)
|
||||
|
||||
|
||||
def ascii_safe(s):
|
||||
"""Get ASCII string without raising encoding errors. Unknown
|
||||
characters of the given encoding will be ignored.
|
||||
|
|
@ -70,11 +49,6 @@ def ascii_safe(s):
|
|||
return s
|
||||
|
||||
|
||||
def url_unicode_split(url):
|
||||
"""Like urllib.parse.urlsplit(), but always returning unicode parts."""
|
||||
return [unicode_safe(s) for s in urllib.parse.urlsplit(url)]
|
||||
|
||||
|
||||
def unquote(s, matching=False):
|
||||
"""Remove leading and ending single and double quotes.
|
||||
The quotes need to match if matching is True. Only one quote from each
|
||||
|
|
|
|||
|
|
@ -220,6 +220,16 @@ def has_x11():
|
|||
need_x11 = _need_func(has_x11, "X11")
|
||||
|
||||
|
||||
@lru_cache(1)
|
||||
def has_geoip():
|
||||
from linkcheck.plugins import locationinfo
|
||||
|
||||
return locationinfo.geoip is not None
|
||||
|
||||
|
||||
need_geoip = _need_func(has_geoip, "geoip")
|
||||
|
||||
|
||||
@lru_cache(1)
|
||||
def has_word():
|
||||
"""Test if Word is available."""
|
||||
|
|
|
|||
32
tests/test_locationinfo.py
Normal file
32
tests/test_locationinfo.py
Normal file
|
|
@ -0,0 +1,32 @@
|
|||
# Copyright (C) 2020 Chris Mayo
|
||||
#
|
||||
# This program is free software; you can redistribute it and/or modify
|
||||
# it under the terms of the GNU General Public License as published by
|
||||
# the Free Software Foundation; either version 2 of the License, or
|
||||
# (at your option) any later version.
|
||||
#
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU General Public License for more details.
|
||||
#
|
||||
# You should have received a copy of the GNU General Public License along
|
||||
# with this program; if not, write to the Free Software Foundation, Inc.,
|
||||
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
|
||||
"""
|
||||
Test LocationInfo plugin.
|
||||
|
||||
Requires GeoIP (libgeoip-dev to build) and geoip-database
|
||||
"""
|
||||
|
||||
import unittest
|
||||
|
||||
from linkcheck.plugins import locationinfo
|
||||
from tests import need_geoip
|
||||
|
||||
|
||||
class TestLocationInfo(unittest.TestCase):
|
||||
@need_geoip
|
||||
def test_get_location(self):
|
||||
location = locationinfo.get_location("8.8.8.8")
|
||||
self.assertEqual(location, "United States")
|
||||
|
|
@ -126,10 +126,6 @@ class TestStrFormat(unittest.TestCase):
|
|||
duration(60 * 60 * 24 * 365 + 60 * 60 * 24 + 2), "1 year, 1 day"
|
||||
)
|
||||
|
||||
def test_unicode_safe(self):
|
||||
unicode_safe = linkcheck.strformat.unicode_safe
|
||||
self.assertEqual(unicode_safe("a"), "a")
|
||||
|
||||
def test_ascii_safe(self):
|
||||
ascii_safe = linkcheck.strformat.ascii_safe
|
||||
self.assertEqual(ascii_safe("a"), "a")
|
||||
|
|
|
|||
1
tox.ini
1
tox.ini
|
|
@ -11,6 +11,7 @@ deps =
|
|||
pytest-cov
|
||||
miniboa >= 1.0.8
|
||||
biplist
|
||||
GeoIP
|
||||
|
||||
[testenv]
|
||||
usedevelop = true
|
||||
|
|
|
|||
Loading…
Reference in a new issue