linkchecker/linkcheck/cache/connection.py
2012-10-13 13:36:28 +02:00

232 lines
8.6 KiB
Python

# -*- coding: iso-8859-1 -*-
# Copyright (C) 2005-2012 Bastian Kleineidam
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
"""
Store and retrieve open connections.
"""
import time
from .. import log, LOG_CACHE
from ..decorators import synchronized
from ..lock import get_lock, get_semaphore
from ..containers import enum
_lock = get_lock("connection")
_wait_lock = get_lock("connwait")
ConnectionTypes = ("ftp", "http", "https")
ConnectionState = enum("available", "busy")
DefaultLimits = dict(
http=4,
https=4,
ftp=2,
)
def get_connection_id(connection):
"""Return unique id for connection object."""
return id(connection)
def is_expired(curtime, conn_data):
"""Test if connection is expired."""
return (curtime+5.0) >= conn_data[2]
class ConnectionPool (object):
"""Thread-safe cache, storing a set of connections for URL retrieval."""
def __init__ (self, wait=0, limits=None):
"""
Initialize an empty connection dictionary which will have the form:
{(type, host, port) -> (lock, {id -> [connection, state, expiration time]})}
Connection can be any open connection object (HTTP, FTP, ...).
State is of type ConnectionState (either 'available' or 'busy').
Expiration time is the point of time in seconds when this
connection will be timed out.
The type is the connection type and an either 'ftp' or 'http'.
The host is the hostname as string, port the port number as an integer.
For each type, the maximum number of connection can be defined. The default
is 4 for http/1.0, 2 for http/1.1 and 2 for ftp.
"""
# open connections
self.connections = {}
# {host -> due time}
self.times = {}
# {host -> wait}
self.host_waits = {}
if wait < 0:
raise ValueError("negative wait value %d" % wait)
self.wait = wait
if limits is None:
self.limits = DefaultLimits
else:
self.limits = {}
for type in ConnectionTypes:
self.limits[type] = limits.get(type, DefaultLimits[type])
@synchronized(_wait_lock)
def host_wait (self, host, wait):
"""Set a host specific time to wait between requests."""
if wait < 0:
raise ValueError("negative wait value %d" % wait)
self.host_waits[host] = wait
@synchronized(_wait_lock)
def wait_for_host (self, host):
"""Honor wait time for given host."""
t = time.time()
if host in self.times:
due_time = self.times[host]
if due_time > t:
wait = due_time - t
log.debug(LOG_CACHE,
"waiting for %.01f seconds on connection to %s", wait, host)
time.sleep(wait)
t = time.time()
self.times[host] = t + self.host_waits.get(host, self.wait)
def _add (self, type, host, port, create_connection):
"""Add connection to the pool with given parameters.
@param type: the connection scheme (eg. http)
@ptype type: string
@param host: the hostname
@ptype host: string
@param port: the port number
@ptype port: int
@param create_connection: function to create a new connection object
@ptype create_connection: callable
@return: newly created connection
@rtype: HTTP(S)Connection or FTPConnection
"""
self.wait_for_host(host)
connection = create_connection(type, host, port)
cid = get_connection_id(connection)
expiration = None
conn_data = [connection, 'busy', expiration]
key = (type, host, port)
if key in self.connections:
lock, entries = self.connections[key]
entries[cid] = conn_data
else:
lock = get_semaphore("%s:%d" % (host, port), self.limits[type])
lock.acquire()
log.debug(LOG_CACHE, "Acquired lock for %s://%s:%d" % key)
entries = {cid: conn_data}
self.connections[key] = (lock, entries)
return connection
@synchronized(_lock)
def get (self, type, host, port, create_connection):
"""Get open connection if available or create a new one.
@param type: connection type
@ptype type: ConnectionType
@param host: hostname
@ptype host: string
@param port: port number
@ptype port: int
@return: Open connection object or None if none is available.
@rtype None or FTPConnection or HTTP(S)Connection
"""
assert type in ConnectionTypes, 'invalid type %r' % type
# 65536 == 2**16
assert 0 < port < 65536, 'invalid port number %r' % port
key = (type, host, port)
if key not in self.connections:
return self._add(type, host, port, create_connection)
lock, entries = self.connections[key]
if not lock.acquire(False):
log.debug(LOG_CACHE, "wait for %s connection to %s:%d",
type, host, port)
return lock
log.debug(LOG_CACHE, "Acquired lock for %s://%s:%d" % key)
# either a connection is available or a new one can be created
t = time.time()
delete_entries = []
try:
for id, conn_data in entries.items():
if conn_data[1] == ConnectionState.available:
if is_expired(t, conn_data):
delete_entries.append(id)
else:
conn_data[1] = ConnectionState.busy
log.debug(LOG_CACHE,
"reusing connection %s timing out in %.01f seconds",
key, (conn_data[2] - t))
return conn_data[0]
finally:
for id in delete_entries:
del entries[id]
# make a new connection
return self._add(type, host, port, create_connection)
@synchronized(_lock)
def release (self, type, host, port, connection, expiration=None):
"""Release a used connection."""
key = (type, host, port)
if key in self.connections:
lock, entries = self.connections[key]
id = get_connection_id(connection)
if id in entries:
log.debug(LOG_CACHE, "Release lock for %s://%s:%d and expiration %s", type, host, port, expiration)
# if the connection is reusable, set it to available, else delete it
if expiration is None:
del entries[id]
else:
entries[id][1] = ConnectionState.available
entries[id][2] = expiration
lock.release()
else:
log.warn(LOG_CACHE, "Release unknown connection %s://%s:%d from entries %s", type, host, port, entries.keys())
else:
log.warn(LOG_CACHE, "Release unknown connection %s://%s:%d", type, host, port)
@synchronized(_lock)
def remove_expired (self):
"""Remove expired or soon to be expired connections from this pool."""
t = time.time()
for lock, entries in self.connections.values():
delete_entries = []
for id, conn_data in entries.items():
if conn_data[1] == 'available' and (t+5.0) >= conn_data[2]:
try_close(conn_data[0])
delete_entries.add(id)
for id in delete_entries:
del entries[id]
lock.release()
log.debug(LOG_CACHE, "released lock for id %s", id)
@synchronized(_lock)
def clear (self):
"""Remove all connections from this cache, even if busy."""
for lock, entries in self.connections.values():
for conn_data in entries.values():
try_close(conn_data[0])
self.connections.clear()
def try_close (connection):
"""Close and remove a connection (not thread-safe, internal use only)."""
try:
connection.close()
except Exception:
# ignore close errors
pass