Spaces:
Runtime error
Runtime error
| from __future__ import annotations | |
| import functools | |
| import logging | |
| import typing | |
| import warnings | |
| from types import TracebackType | |
| from urllib.parse import urljoin | |
| from ._collections import HTTPHeaderDict, RecentlyUsedContainer | |
| from ._request_methods import RequestMethods | |
| from .connection import ProxyConfig | |
| from .connectionpool import HTTPConnectionPool, HTTPSConnectionPool, port_by_scheme | |
| from .exceptions import ( | |
| LocationValueError, | |
| MaxRetryError, | |
| ProxySchemeUnknown, | |
| URLSchemeUnknown, | |
| ) | |
| from .response import BaseHTTPResponse | |
| from .util.connection import _TYPE_SOCKET_OPTIONS | |
| from .util.proxy import connection_requires_http_tunnel | |
| from .util.retry import Retry | |
| from .util.timeout import Timeout | |
| from .util.url import Url, parse_url | |
| if typing.TYPE_CHECKING: | |
| import ssl | |
| from typing_extensions import Self | |
| __all__ = ["PoolManager", "ProxyManager", "proxy_from_url"] | |
| log = logging.getLogger(__name__) | |
| SSL_KEYWORDS = ( | |
| "key_file", | |
| "cert_file", | |
| "cert_reqs", | |
| "ca_certs", | |
| "ca_cert_data", | |
| "ssl_version", | |
| "ssl_minimum_version", | |
| "ssl_maximum_version", | |
| "ca_cert_dir", | |
| "ssl_context", | |
| "key_password", | |
| "server_hostname", | |
| ) | |
| # Default value for `blocksize` - a new parameter introduced to | |
| # http.client.HTTPConnection & http.client.HTTPSConnection in Python 3.7 | |
| _DEFAULT_BLOCKSIZE = 16384 | |
| class PoolKey(typing.NamedTuple): | |
| """ | |
| All known keyword arguments that could be provided to the pool manager, its | |
| pools, or the underlying connections. | |
| All custom key schemes should include the fields in this key at a minimum. | |
| """ | |
| key_scheme: str | |
| key_host: str | |
| key_port: int | None | |
| key_timeout: Timeout | float | int | None | |
| key_retries: Retry | bool | int | None | |
| key_block: bool | None | |
| key_source_address: tuple[str, int] | None | |
| key_key_file: str | None | |
| key_key_password: str | None | |
| key_cert_file: str | None | |
| key_cert_reqs: str | None | |
| key_ca_certs: str | None | |
| key_ca_cert_data: str | bytes | None | |
| key_ssl_version: int | str | None | |
| key_ssl_minimum_version: ssl.TLSVersion | None | |
| key_ssl_maximum_version: ssl.TLSVersion | None | |
| key_ca_cert_dir: str | None | |
| key_ssl_context: ssl.SSLContext | None | |
| key_maxsize: int | None | |
| key_headers: frozenset[tuple[str, str]] | None | |
| key__proxy: Url | None | |
| key__proxy_headers: frozenset[tuple[str, str]] | None | |
| key__proxy_config: ProxyConfig | None | |
| key_socket_options: _TYPE_SOCKET_OPTIONS | None | |
| key__socks_options: frozenset[tuple[str, str]] | None | |
| key_assert_hostname: bool | str | None | |
| key_assert_fingerprint: str | None | |
| key_server_hostname: str | None | |
| key_blocksize: int | None | |
| def _default_key_normalizer( | |
| key_class: type[PoolKey], request_context: dict[str, typing.Any] | |
| ) -> PoolKey: | |
| """ | |
| Create a pool key out of a request context dictionary. | |
| According to RFC 3986, both the scheme and host are case-insensitive. | |
| Therefore, this function normalizes both before constructing the pool | |
| key for an HTTPS request. If you wish to change this behaviour, provide | |
| alternate callables to ``key_fn_by_scheme``. | |
| :param key_class: | |
| The class to use when constructing the key. This should be a namedtuple | |
| with the ``scheme`` and ``host`` keys at a minimum. | |
| :type key_class: namedtuple | |
| :param request_context: | |
| A dictionary-like object that contain the context for a request. | |
| :type request_context: dict | |
| :return: A namedtuple that can be used as a connection pool key. | |
| :rtype: PoolKey | |
| """ | |
| # Since we mutate the dictionary, make a copy first | |
| context = request_context.copy() | |
| context["scheme"] = context["scheme"].lower() | |
| context["host"] = context["host"].lower() | |
| # These are both dictionaries and need to be transformed into frozensets | |
| for key in ("headers", "_proxy_headers", "_socks_options"): | |
| if key in context and context[key] is not None: | |
| context[key] = frozenset(context[key].items()) | |
| # The socket_options key may be a list and needs to be transformed into a | |
| # tuple. | |
| socket_opts = context.get("socket_options") | |
| if socket_opts is not None: | |
| context["socket_options"] = tuple(socket_opts) | |
| # Map the kwargs to the names in the namedtuple - this is necessary since | |
| # namedtuples can't have fields starting with '_'. | |
| for key in list(context.keys()): | |
| context["key_" + key] = context.pop(key) | |
| # Default to ``None`` for keys missing from the context | |
| for field in key_class._fields: | |
| if field not in context: | |
| context[field] = None | |
| # Default key_blocksize to _DEFAULT_BLOCKSIZE if missing from the context | |
| if context.get("key_blocksize") is None: | |
| context["key_blocksize"] = _DEFAULT_BLOCKSIZE | |
| return key_class(**context) | |
| #: A dictionary that maps a scheme to a callable that creates a pool key. | |
| #: This can be used to alter the way pool keys are constructed, if desired. | |
| #: Each PoolManager makes a copy of this dictionary so they can be configured | |
| #: globally here, or individually on the instance. | |
| key_fn_by_scheme = { | |
| "http": functools.partial(_default_key_normalizer, PoolKey), | |
| "https": functools.partial(_default_key_normalizer, PoolKey), | |
| } | |
| pool_classes_by_scheme = {"http": HTTPConnectionPool, "https": HTTPSConnectionPool} | |
| class PoolManager(RequestMethods): | |
| """ | |
| Allows for arbitrary requests while transparently keeping track of | |
| necessary connection pools for you. | |
| :param num_pools: | |
| Number of connection pools to cache before discarding the least | |
| recently used pool. | |
| :param headers: | |
| Headers to include with all requests, unless other headers are given | |
| explicitly. | |
| :param \\**connection_pool_kw: | |
| Additional parameters are used to create fresh | |
| :class:`urllib3.connectionpool.ConnectionPool` instances. | |
| Example: | |
| .. code-block:: python | |
| import urllib3 | |
| http = urllib3.PoolManager(num_pools=2) | |
| resp1 = http.request("GET", "https://google.com/") | |
| resp2 = http.request("GET", "https://google.com/mail") | |
| resp3 = http.request("GET", "https://yahoo.com/") | |
| print(len(http.pools)) | |
| # 2 | |
| """ | |
| proxy: Url | None = None | |
| proxy_config: ProxyConfig | None = None | |
| def __init__( | |
| self, | |
| num_pools: int = 10, | |
| headers: typing.Mapping[str, str] | None = None, | |
| **connection_pool_kw: typing.Any, | |
| ) -> None: | |
| super().__init__(headers) | |
| self.connection_pool_kw = connection_pool_kw | |
| self.pools: RecentlyUsedContainer[PoolKey, HTTPConnectionPool] | |
| self.pools = RecentlyUsedContainer(num_pools) | |
| # Locally set the pool classes and keys so other PoolManagers can | |
| # override them. | |
| self.pool_classes_by_scheme = pool_classes_by_scheme | |
| self.key_fn_by_scheme = key_fn_by_scheme.copy() | |
| def __enter__(self) -> Self: | |
| return self | |
| def __exit__( | |
| self, | |
| exc_type: type[BaseException] | None, | |
| exc_val: BaseException | None, | |
| exc_tb: TracebackType | None, | |
| ) -> typing.Literal[False]: | |
| self.clear() | |
| # Return False to re-raise any potential exceptions | |
| return False | |
| def _new_pool( | |
| self, | |
| scheme: str, | |
| host: str, | |
| port: int, | |
| request_context: dict[str, typing.Any] | None = None, | |
| ) -> HTTPConnectionPool: | |
| """ | |
| Create a new :class:`urllib3.connectionpool.ConnectionPool` based on host, port, scheme, and | |
| any additional pool keyword arguments. | |
| If ``request_context`` is provided, it is provided as keyword arguments | |
| to the pool class used. This method is used to actually create the | |
| connection pools handed out by :meth:`connection_from_url` and | |
| companion methods. It is intended to be overridden for customization. | |
| """ | |
| pool_cls: type[HTTPConnectionPool] = self.pool_classes_by_scheme[scheme] | |
| if request_context is None: | |
| request_context = self.connection_pool_kw.copy() | |
| # Default blocksize to _DEFAULT_BLOCKSIZE if missing or explicitly | |
| # set to 'None' in the request_context. | |
| if request_context.get("blocksize") is None: | |
| request_context["blocksize"] = _DEFAULT_BLOCKSIZE | |
| # Although the context has everything necessary to create the pool, | |
| # this function has historically only used the scheme, host, and port | |
| # in the positional args. When an API change is acceptable these can | |
| # be removed. | |
| for key in ("scheme", "host", "port"): | |
| request_context.pop(key, None) | |
| if scheme == "http": | |
| for kw in SSL_KEYWORDS: | |
| request_context.pop(kw, None) | |
| return pool_cls(host, port, **request_context) | |
| def clear(self) -> None: | |
| """ | |
| Empty our store of pools and direct them all to close. | |
| This will not affect in-flight connections, but they will not be | |
| re-used after completion. | |
| """ | |
| self.pools.clear() | |
| def connection_from_host( | |
| self, | |
| host: str | None, | |
| port: int | None = None, | |
| scheme: str | None = "http", | |
| pool_kwargs: dict[str, typing.Any] | None = None, | |
| ) -> HTTPConnectionPool: | |
| """ | |
| Get a :class:`urllib3.connectionpool.ConnectionPool` based on the host, port, and scheme. | |
| If ``port`` isn't given, it will be derived from the ``scheme`` using | |
| ``urllib3.connectionpool.port_by_scheme``. If ``pool_kwargs`` is | |
| provided, it is merged with the instance's ``connection_pool_kw`` | |
| variable and used to create the new connection pool, if one is | |
| needed. | |
| """ | |
| if not host: | |
| raise LocationValueError("No host specified.") | |
| request_context = self._merge_pool_kwargs(pool_kwargs) | |
| request_context["scheme"] = scheme or "http" | |
| if not port: | |
| port = port_by_scheme.get(request_context["scheme"].lower(), 80) | |
| request_context["port"] = port | |
| request_context["host"] = host | |
| return self.connection_from_context(request_context) | |
| def connection_from_context( | |
| self, request_context: dict[str, typing.Any] | |
| ) -> HTTPConnectionPool: | |
| """ | |
| Get a :class:`urllib3.connectionpool.ConnectionPool` based on the request context. | |
| ``request_context`` must at least contain the ``scheme`` key and its | |
| value must be a key in ``key_fn_by_scheme`` instance variable. | |
| """ | |
| if "strict" in request_context: | |
| warnings.warn( | |
| "The 'strict' parameter is no longer needed on Python 3+. " | |
| "This will raise an error in urllib3 v2.1.0.", | |
| DeprecationWarning, | |
| ) | |
| request_context.pop("strict") | |
| scheme = request_context["scheme"].lower() | |
| pool_key_constructor = self.key_fn_by_scheme.get(scheme) | |
| if not pool_key_constructor: | |
| raise URLSchemeUnknown(scheme) | |
| pool_key = pool_key_constructor(request_context) | |
| return self.connection_from_pool_key(pool_key, request_context=request_context) | |
| def connection_from_pool_key( | |
| self, pool_key: PoolKey, request_context: dict[str, typing.Any] | |
| ) -> HTTPConnectionPool: | |
| """ | |
| Get a :class:`urllib3.connectionpool.ConnectionPool` based on the provided pool key. | |
| ``pool_key`` should be a namedtuple that only contains immutable | |
| objects. At a minimum it must have the ``scheme``, ``host``, and | |
| ``port`` fields. | |
| """ | |
| with self.pools.lock: | |
| # If the scheme, host, or port doesn't match existing open | |
| # connections, open a new ConnectionPool. | |
| pool = self.pools.get(pool_key) | |
| if pool: | |
| return pool | |
| # Make a fresh ConnectionPool of the desired type | |
| scheme = request_context["scheme"] | |
| host = request_context["host"] | |
| port = request_context["port"] | |
| pool = self._new_pool(scheme, host, port, request_context=request_context) | |
| self.pools[pool_key] = pool | |
| return pool | |
| def connection_from_url( | |
| self, url: str, pool_kwargs: dict[str, typing.Any] | None = None | |
| ) -> HTTPConnectionPool: | |
| """ | |
| Similar to :func:`urllib3.connectionpool.connection_from_url`. | |
| If ``pool_kwargs`` is not provided and a new pool needs to be | |
| constructed, ``self.connection_pool_kw`` is used to initialize | |
| the :class:`urllib3.connectionpool.ConnectionPool`. If ``pool_kwargs`` | |
| is provided, it is used instead. Note that if a new pool does not | |
| need to be created for the request, the provided ``pool_kwargs`` are | |
| not used. | |
| """ | |
| u = parse_url(url) | |
| return self.connection_from_host( | |
| u.host, port=u.port, scheme=u.scheme, pool_kwargs=pool_kwargs | |
| ) | |
| def _merge_pool_kwargs( | |
| self, override: dict[str, typing.Any] | None | |
| ) -> dict[str, typing.Any]: | |
| """ | |
| Merge a dictionary of override values for self.connection_pool_kw. | |
| This does not modify self.connection_pool_kw and returns a new dict. | |
| Any keys in the override dictionary with a value of ``None`` are | |
| removed from the merged dictionary. | |
| """ | |
| base_pool_kwargs = self.connection_pool_kw.copy() | |
| if override: | |
| for key, value in override.items(): | |
| if value is None: | |
| try: | |
| del base_pool_kwargs[key] | |
| except KeyError: | |
| pass | |
| else: | |
| base_pool_kwargs[key] = value | |
| return base_pool_kwargs | |
| def _proxy_requires_url_absolute_form(self, parsed_url: Url) -> bool: | |
| """ | |
| Indicates if the proxy requires the complete destination URL in the | |
| request. Normally this is only needed when not using an HTTP CONNECT | |
| tunnel. | |
| """ | |
| if self.proxy is None: | |
| return False | |
| return not connection_requires_http_tunnel( | |
| self.proxy, self.proxy_config, parsed_url.scheme | |
| ) | |
| def urlopen( # type: ignore[override] | |
| self, method: str, url: str, redirect: bool = True, **kw: typing.Any | |
| ) -> BaseHTTPResponse: | |
| """ | |
| Same as :meth:`urllib3.HTTPConnectionPool.urlopen` | |
| with custom cross-host redirect logic and only sends the request-uri | |
| portion of the ``url``. | |
| The given ``url`` parameter must be absolute, such that an appropriate | |
| :class:`urllib3.connectionpool.ConnectionPool` can be chosen for it. | |
| """ | |
| u = parse_url(url) | |
| if u.scheme is None: | |
| warnings.warn( | |
| "URLs without a scheme (ie 'https://') are deprecated and will raise an error " | |
| "in a future version of urllib3. To avoid this DeprecationWarning ensure all URLs " | |
| "start with 'https://' or 'http://'. Read more in this issue: " | |
| "https://github.com/urllib3/urllib3/issues/2920", | |
| category=DeprecationWarning, | |
| stacklevel=2, | |
| ) | |
| conn = self.connection_from_host(u.host, port=u.port, scheme=u.scheme) | |
| kw["assert_same_host"] = False | |
| kw["redirect"] = False | |
| if "headers" not in kw: | |
| kw["headers"] = self.headers | |
| if self._proxy_requires_url_absolute_form(u): | |
| response = conn.urlopen(method, url, **kw) | |
| else: | |
| response = conn.urlopen(method, u.request_uri, **kw) | |
| redirect_location = redirect and response.get_redirect_location() | |
| if not redirect_location: | |
| return response | |
| # Support relative URLs for redirecting. | |
| redirect_location = urljoin(url, redirect_location) | |
| if response.status == 303: | |
| # Change the method according to RFC 9110, Section 15.4.4. | |
| method = "GET" | |
| # And lose the body not to transfer anything sensitive. | |
| kw["body"] = None | |
| kw["headers"] = HTTPHeaderDict(kw["headers"])._prepare_for_method_change() | |
| retries = kw.get("retries") | |
| if not isinstance(retries, Retry): | |
| retries = Retry.from_int(retries, redirect=redirect) | |
| # Strip headers marked as unsafe to forward to the redirected location. | |
| # Check remove_headers_on_redirect to avoid a potential network call within | |
| # conn.is_same_host() which may use socket.gethostbyname() in the future. | |
| if retries.remove_headers_on_redirect and not conn.is_same_host( | |
| redirect_location | |
| ): | |
| new_headers = kw["headers"].copy() | |
| for header in kw["headers"]: | |
| if header.lower() in retries.remove_headers_on_redirect: | |
| new_headers.pop(header, None) | |
| kw["headers"] = new_headers | |
| try: | |
| retries = retries.increment(method, url, response=response, _pool=conn) | |
| except MaxRetryError: | |
| if retries.raise_on_redirect: | |
| response.drain_conn() | |
| raise | |
| return response | |
| kw["retries"] = retries | |
| kw["redirect"] = redirect | |
| log.info("Redirecting %s -> %s", url, redirect_location) | |
| response.drain_conn() | |
| return self.urlopen(method, redirect_location, **kw) | |
| class ProxyManager(PoolManager): | |
| """ | |
| Behaves just like :class:`PoolManager`, but sends all requests through | |
| the defined proxy, using the CONNECT method for HTTPS URLs. | |
| :param proxy_url: | |
| The URL of the proxy to be used. | |
| :param proxy_headers: | |
| A dictionary containing headers that will be sent to the proxy. In case | |
| of HTTP they are being sent with each request, while in the | |
| HTTPS/CONNECT case they are sent only once. Could be used for proxy | |
| authentication. | |
| :param proxy_ssl_context: | |
| The proxy SSL context is used to establish the TLS connection to the | |
| proxy when using HTTPS proxies. | |
| :param use_forwarding_for_https: | |
| (Defaults to False) If set to True will forward requests to the HTTPS | |
| proxy to be made on behalf of the client instead of creating a TLS | |
| tunnel via the CONNECT method. **Enabling this flag means that request | |
| and response headers and content will be visible from the HTTPS proxy** | |
| whereas tunneling keeps request and response headers and content | |
| private. IP address, target hostname, SNI, and port are always visible | |
| to an HTTPS proxy even when this flag is disabled. | |
| :param proxy_assert_hostname: | |
| The hostname of the certificate to verify against. | |
| :param proxy_assert_fingerprint: | |
| The fingerprint of the certificate to verify against. | |
| Example: | |
| .. code-block:: python | |
| import urllib3 | |
| proxy = urllib3.ProxyManager("https://localhost:3128/") | |
| resp1 = proxy.request("GET", "https://google.com/") | |
| resp2 = proxy.request("GET", "https://httpbin.org/") | |
| print(len(proxy.pools)) | |
| # 1 | |
| resp3 = proxy.request("GET", "https://httpbin.org/") | |
| resp4 = proxy.request("GET", "https://twitter.com/") | |
| print(len(proxy.pools)) | |
| # 3 | |
| """ | |
| def __init__( | |
| self, | |
| proxy_url: str, | |
| num_pools: int = 10, | |
| headers: typing.Mapping[str, str] | None = None, | |
| proxy_headers: typing.Mapping[str, str] | None = None, | |
| proxy_ssl_context: ssl.SSLContext | None = None, | |
| use_forwarding_for_https: bool = False, | |
| proxy_assert_hostname: None | str | typing.Literal[False] = None, | |
| proxy_assert_fingerprint: str | None = None, | |
| **connection_pool_kw: typing.Any, | |
| ) -> None: | |
| if isinstance(proxy_url, HTTPConnectionPool): | |
| str_proxy_url = f"{proxy_url.scheme}://{proxy_url.host}:{proxy_url.port}" | |
| else: | |
| str_proxy_url = proxy_url | |
| proxy = parse_url(str_proxy_url) | |
| if proxy.scheme not in ("http", "https"): | |
| raise ProxySchemeUnknown(proxy.scheme) | |
| if not proxy.port: | |
| port = port_by_scheme.get(proxy.scheme, 80) | |
| proxy = proxy._replace(port=port) | |
| self.proxy = proxy | |
| self.proxy_headers = proxy_headers or {} | |
| self.proxy_ssl_context = proxy_ssl_context | |
| self.proxy_config = ProxyConfig( | |
| proxy_ssl_context, | |
| use_forwarding_for_https, | |
| proxy_assert_hostname, | |
| proxy_assert_fingerprint, | |
| ) | |
| connection_pool_kw["_proxy"] = self.proxy | |
| connection_pool_kw["_proxy_headers"] = self.proxy_headers | |
| connection_pool_kw["_proxy_config"] = self.proxy_config | |
| super().__init__(num_pools, headers, **connection_pool_kw) | |
| def connection_from_host( | |
| self, | |
| host: str | None, | |
| port: int | None = None, | |
| scheme: str | None = "http", | |
| pool_kwargs: dict[str, typing.Any] | None = None, | |
| ) -> HTTPConnectionPool: | |
| if scheme == "https": | |
| return super().connection_from_host( | |
| host, port, scheme, pool_kwargs=pool_kwargs | |
| ) | |
| return super().connection_from_host( | |
| self.proxy.host, self.proxy.port, self.proxy.scheme, pool_kwargs=pool_kwargs # type: ignore[union-attr] | |
| ) | |
| def _set_proxy_headers( | |
| self, url: str, headers: typing.Mapping[str, str] | None = None | |
| ) -> typing.Mapping[str, str]: | |
| """ | |
| Sets headers needed by proxies: specifically, the Accept and Host | |
| headers. Only sets headers not provided by the user. | |
| """ | |
| headers_ = {"Accept": "*/*"} | |
| netloc = parse_url(url).netloc | |
| if netloc: | |
| headers_["Host"] = netloc | |
| if headers: | |
| headers_.update(headers) | |
| return headers_ | |
| def urlopen( # type: ignore[override] | |
| self, method: str, url: str, redirect: bool = True, **kw: typing.Any | |
| ) -> BaseHTTPResponse: | |
| "Same as HTTP(S)ConnectionPool.urlopen, ``url`` must be absolute." | |
| u = parse_url(url) | |
| if not connection_requires_http_tunnel(self.proxy, self.proxy_config, u.scheme): | |
| # For connections using HTTP CONNECT, httplib sets the necessary | |
| # headers on the CONNECT to the proxy. If we're not using CONNECT, | |
| # we'll definitely need to set 'Host' at the very least. | |
| headers = kw.get("headers", self.headers) | |
| kw["headers"] = self._set_proxy_headers(url, headers) | |
| return super().urlopen(method, url, redirect=redirect, **kw) | |
| def proxy_from_url(url: str, **kw: typing.Any) -> ProxyManager: | |
| return ProxyManager(proxy_url=url, **kw) | |