Python Http连接池

Page content

HTTP是建立在TCP上面的,一次HTTP请求要经历TCP三次握手阶段,然后发送请求,得到相应数据,最后TCP断开连接。 如果我们要发出多个HTTP请求,每次都这么搞,那每次要握手、请求、断开,就比较浪费资源。 所以如果我们在HTTP(S)连接建立之后,可以复用已经建立的连接传输数据,就能省反复新建连接的开销。 这样有HTTP(S)连接池的用武之地。

在Python中最常用的是requests库,这个库依赖的是urllib,下面基于urllib3分析一下Http连接池的使用与实现。

requests.Session

requests使用极为方便,三行代码就能搞定。那在这种简洁的后面,requests到底做了哪些事情呢?

Basic Usage::

  >>> import requests
  >>> s = requests.Session()
  >>> s.get('https://httpbin.org/get')
  <Response [200]>

在get请求中,函数的调用链是:get() -> request() -> send() -> adapter.send()。 可见在requests.Session中,所有的请求都是通过HTTPAdapter发出去的。

    def send(self, request, **kwargs):
        """Send a given PreparedRequest.

        :rtype: requests.Response
        """
        # Set defaults that the hooks can utilize to ensure they always have
        # the correct parameters to reproduce the previous request.
        kwargs.setdefault('stream', self.stream)
        kwargs.setdefault('verify', self.verify)
        kwargs.setdefault('cert', self.cert)
        kwargs.setdefault('proxies', self.proxies)

        # It's possible that users might accidentally send a Request object.
        # Guard against that specific failure case.
        if isinstance(request, Request):
            raise ValueError('You can only send PreparedRequests.')

        # Set up variables needed for resolve_redirects and dispatching of hooks
        allow_redirects = kwargs.pop('allow_redirects', True)
        stream = kwargs.get('stream')
        hooks = request.hooks

        # Get the appropriate adapter to use
        adapter = self.get_adapter(url=request.url)

        # Start time (approximately) of the request
        start = preferred_clock()

        # Send the request
        r = adapter.send(request, **kwargs)

requests.Adapter

requests.Adapter实际上是对urllib3的封装。 如果我们想给一个域名加代理,都可以amount一个自定义的Adapter。

class HTTPAdapter(BaseAdapter):
    """The built-in HTTP Adapter for urllib3.

    Provides a general-case interface for Requests sessions to contact HTTP and
    HTTPS urls by implementing the Transport Adapter interface. This class will
    usually be created by the :class:`Session <Session>` class under the
    covers.

    :param pool_connections: The number of urllib3 connection pools to cache.
    :param pool_maxsize: The maximum number of connections to save in the pool.
    :param max_retries: The maximum number of retries each connection
        should attempt. Note, this applies only to failed DNS lookups, socket
        connections and connection timeouts, never to requests where data has
        made it to the server. By default, Requests does not retry failed
        connections. If you need granular control over the conditions under
        which we retry a request, import urllib3's ``Retry`` class and pass
        that instead.
    :param pool_block: Whether the connection pool should block for connections.

    Usage::

      >>> import requests
      >>> s = requests.Session()
      >>> a = requests.adapters.HTTPAdapter(max_retries=3)
      >>> s.mount('http://', a)
    """
    __attrs__ = ['max_retries', 'config', '_pool_connections', '_pool_maxsize',
                 '_pool_block']

    def __init__(self, pool_connections=DEFAULT_POOLSIZE,
                 pool_maxsize=DEFAULT_POOLSIZE, max_retries=DEFAULT_RETRIES,
                 pool_block=DEFAULT_POOLBLOCK):
        if max_retries == DEFAULT_RETRIES:
            self.max_retries = Retry(0, read=False)
        else:
            self.max_retries = Retry.from_int(max_retries)
        self.config = {}
        self.proxy_manager = {}

        super(HTTPAdapter, self).__init__()

        self._pool_connections = pool_connections
        self._pool_maxsize = pool_maxsize
        self._pool_block = pool_block

        self.init_poolmanager(pool_connections, pool_maxsize, block=pool_block)
  • pool_connections: 会传给PoolManager控制Pool的数量。
  • pool_maxsize: 是HTTPConnectionPool缓存的连接的数量。

urllib3.HTTPConnectionPool

HTTPSConnectionPool是连接池类,继承自ConnectionPool, RequestMethods。 HTTP连接池缓存的是TCP连接,这个链接是相对于客户端和服务器的,说简单点,就是针对一个URL(IP)的,所以连接池建立的时候要指定对哪一个host缓存连接。 所以对于多个域名,就需要建立多个ConnectionPool。

class HTTPConnectionPool(ConnectionPool, RequestMethods):
    """
    :param maxsize:
        Number of connections to save that can be reused. More than 1 is useful
        in multithreaded situations. If ``block`` is set to False, more
        connections will be created but they will not be saved once they've
        been used.

    :param block:
        If set to True, no more than ``maxsize`` connections will be used at
        a time. When no free connections are available, the call will block
        until a connection has been released. This is a useful side effect for
        particular multithreaded situations where one does not want to use more
        than maxsize connections per host to prevent flooding.
    """
    def __init__(self, host, port=None, strict=False,
                timeout=Timeout.DEFAULT_TIMEOUT, maxsize=1, block=False,
                headers=None, retries=None,
                _proxy=None, _proxy_headers=None,
                **conn_kw):
        pass

需要注意的参数:

  • maxsize: maxsize指定的是每个pool中连接的数量。
  • block: 如果blcok参数是False(默认),不会创建超过maxsize数量的连接,如果pool中没有可用的连接,那么调用会一直阻塞,直到有一个连接被释放。

HTTPConnectionPool例子

# coding: utf-8
import threading
from urllib3 import HTTPConnectionPool

pool = HTTPConnectionPool('baidu.com', maxsize=2, block=True)


def http_get():
    for _ in range(3):
        r = pool.request('GET', '/', redirect=False)
        print("Request Status: {}; Connections: {}; Requests: {}; Thread Name: {}".format(
            r.status, pool.num_connections, pool.num_requests, threading.current_thread().getName(),
        ))


# 三个线程共用一个连接池
t1 = threading.Thread(target=http_get, name='t1')
t2 = threading.Thread(target=http_get, name='t2')
t3 = threading.Thread(target=http_get, name='t3')

t1.start()
t2.start()
t3.start()

输出:

Request Status: 200; Connections: 2; Requests: 2; Thread Name: t1
Request Status: 200; Connections: 2; Requests: 3; Thread Name: t2
Request Status: 200; Connections: 2; Requests: 4; Thread Name: t3
Request Status: 200; Connections: 2; Requests: 5; Thread Name: t2
Request Status: 200; Connections: 2; Requests: 6; Thread Name: t3
Request Status: 200; Connections: 2; Requests: 7; Thread Name: t2
Request Status: 200; Connections: 2; Requests: 8; Thread Name: t1
Request Status: 200; Connections: 2; Requests: 9; Thread Name: t1
Request Status: 200; Connections: 2; Requests: 9; Thread Name: t3

如果将block参数改为False,maxsize=2时,实际创建的Connections可以达到3。

Request Status: 200; Connections: 3; Requests: 3; Thread Name: t3
Request Status: 200; Connections: 3; Requests: 4; Thread Name: t2
Request Status: 200; Connections: 3; Requests: 5; Thread Name: t1
Request Status: 200; Connections: 3; Requests: 6; Thread Name: t3
Request Status: 200; Connections: 3; Requests: 7; Thread Name: t2
Request Status: 200; Connections: 3; Requests: 8; Thread Name: t1
Request Status: 200; Connections: 3; Requests: 9; Thread Name: t3
Request Status: 200; Connections: 3; Requests: 9; Thread Name: t1
Request Status: 200; Connections: 3; Requests: 10; Thread Name: t2

在多线程的环境中,多缓存一些连接可能带来性能上的提升,一般连接数等于线程数,这样保证所有的线程都有缓存的连接可用。 如果在程序中使用了协程,那么连接数应该设定呢?(TODO)

urllib3.PoolManager

如果要向不同的域名发请求,希望缓存多个域名的连接,就要有多个连接池。urllib3提供了PoolManager管理自己的ConnectionPool。

class PoolManager(RequestMethods):
    """
    :param num_pools:
        Number of connection pools to cache before discarding the least
        recently used pool.
    """
    def __init__(self, num_pools=10, headers=None, **connection_pool_kw):
        RequestMethods.__init__(self, headers)
        self.connection_pool_kw = connection_pool_kw
        self.pools = RecentlyUsedContainer(num_pools, dispose_func=lambda p: p.close())

        # Locally set the pool classes and keys so other PoolManagers can
        # override them.
        self.pool_classes_by_scheme = pool_classes_by_scheme
        self.key_fn_by_scheme = key_fn_by_scheme.copy()
  • num_pools:参数num_pools表示连接池的数量,其余参数将会传给Pool初始化。
  • pools: 是一个RecentlyUsedContainer。

从PoolManager获取pool实际就是根据pool_key从pools中get一下,没有get到就创建一个新的ConnectionPool。

    def connection_from_pool_key(self, pool_key, request_context=None):
        """
        Get a :class:`ConnectionPool` based on the provided pool key.

        ``pool_key`` should be a namedtuple that only contains immutable
        objects. At a minimum it must have the ``scheme``, ``host``, and
        ``port`` fields.
        """
        with self.pools.lock:
            # If the scheme, host, or port doesn't match existing open
            # connections, open a new ConnectionPool.
            pool = self.pools.get(pool_key)
            if pool:
                return pool

            # Make a fresh ConnectionPool of the desired type
            scheme = request_context['scheme']
            host = request_context['host']
            port = request_context['port']
            pool = self._new_pool(scheme, host, port, request_context=request_context)
            self.pools[pool_key] = pool

        return pool

PoolManager例子

from urllib3 import PoolManager

manager = PoolManager(3)

r = manager.request('GET', 'http://baidu.com/')
print(r.status)

RecentlyUsedContainer

RecentlyUsedContainer的代码非常漂亮,是一个线程安全类字典的container,基于OrderedDict实现了RecentlyUsed算法。 OrderedDict是按添加次序排序的字典。RecentlyUsedContainer中声明了一个名为ContainerCls的OrderDict类变量。

RecentlyUsed具体实现就是,在get的时候,从ContainerCls中pop出对应的pool,放到最前面去; 在set的时候,如果初始化的maxsize大小设置为10,那么需要建立第11个连接池的时候,最旧的一个连接池就被销毁。

class RecentlyUsedContainer(MutableMapping):
    """
    Provides a thread-safe dict-like container which maintains up to
    ``maxsize`` keys while throwing away the least-recently-used keys beyond
    ``maxsize``.

    :param maxsize:
        Maximum number of recent elements to retain.

    :param dispose_func:
        Every time an item is evicted from the container,
        ``dispose_func(value)`` is called.  Callback which will get called
    """

    ContainerCls = OrderedDict

    def __init__(self, maxsize=10, dispose_func=None):
        self._maxsize = maxsize
        self.dispose_func = dispose_func

        self._container = self.ContainerCls()
        self.lock = RLock()

    def __getitem__(self, key):
        # Re-insert the item, moving it to the end of the eviction line.
        with self.lock:
            item = self._container.pop(key)
            self._container[key] = item
            return item

    def __setitem__(self, key, value):
        evicted_value = _Null
        with self.lock:
            # Possibly evict the existing value of 'key'
            evicted_value = self._container.get(key, _Null)
            self._container[key] = value

            # If we didn't evict an existing value, we might have to evict the
            # least recently used item from the beginning of the container.
            if len(self._container) > self._maxsize:
                _key, evicted_value = self._container.popitem(last=False)

        if self.dispose_func and evicted_value is not _Null:
            self.dispose_func(evicted_value)

    def __delitem__(self, key):
        with self.lock:
            value = self._container.pop(key)

        if self.dispose_func:
            self.dispose_func(value)

    def __len__(self):
        with self.lock:
            return len(self._container)

    def __iter__(self):
        raise NotImplementedError('Iteration over this class is unlikely to be threadsafe.')

    def clear(self):
        with self.lock:
            # Copy pointers to all values, then wipe the mapping
            values = list(itervalues(self._container))
            self._container.clear()

        if self.dispose_func:
            for value in values:
                self.dispose_func(value)

    def keys(self):
        with self.lock:
            return list(iterkeys(self._container))

request Session封装例子

实际使用中,我们也可以通过继承封装一下requests.Session,方便设置一些个性化的参数。

需要特别注意的是,pool_connections指定的不是连接的数量,而是连接池的数量,一般默认的10就够用了。 pool_maxsize指定的才是每个pool中最大连接数量。

import uuid

from requests import adapters
from requests.sessions import Session

DEFAULT_TIMEOUT = 30
SESSION_DEFAULT_NUM_POOLS = 10
SESSION_DEFAULT_POOL_MAXSIZE = 10


class MySession(Session):
    X_REQUEST_ID_HEADER = 'X-Request-ID'

    def __init__(self, num_pools=SESSION_DEFAULT_NUM_POOLS, pool_maxsize=SESSION_DEFAULT_POOL_MAXSIZE):
        super(MySession, self).__init__()
        self.mount("http://", adapters.HTTPAdapter(pool_connections=num_pools, pool_maxsize=pool_maxsize))
        self.mount("https://", adapters.HTTPAdapter(pool_connections=num_pools, pool_maxsize=pool_maxsize))

    def request(self, url, headers=None, timeout=None, **kwargs):
        timeout = timeout or DEFAULT_TIMEOUT
        headers = headers or {}
        if not headers.get(self.X_REQUEST_ID_HEADER):
            headers[self.X_REQUEST_ID_HEADER] = uuid.uuid4().hex

        return super(MySession, self).request('POST', url, headers=headers, timeout=timeout, **kwargs)