Source code for aiobtclientrpc._utils

import asyncio
import enum
import inspect
import os
import re

from . import __project_name__, __version__, _errors

import logging  # isort:skip
_log = logging.getLogger(__name__)


def get_aioloop():
    """Return :class:`asyncio.AbstractEventLoop` instance"""
    # https://docs.python.org/3.10/library/asyncio-eventloop.html
    try:
        return asyncio.get_running_loop()
    except RuntimeError:
        # "no running event loop"
        # We need a loop before the application has started. We can't use
        # get_event_loop(), because that is going to be an alias for
        # get_running_loop() in Python >= 3.10. This is what get_event_loop()
        # does internally in Python 3.6.
        return asyncio.get_event_loop_policy().get_event_loop()


[docs] def clients(): """Return list of :class:`~.RPCBase` subclasses""" import aiobtclientrpc # isort:skip basecls = aiobtclientrpc.RPCBase subclses = set() for name, value in inspect.getmembers(aiobtclientrpc): if ( value is not basecls and isinstance(value, type) and issubclass(value, basecls) ): subclses.add(value) return sorted(subclses, key=lambda cls: cls.name)
[docs] def client(name, *args, **kwargs): """ Convenience function to instantiate a :class:`~.RPCBase` subclass :param str name: :attr:`~.RPCBase.name` of the client :param args: Positional arguments to pass to the :class:`~.RPCBase` subclass :param kwargs: Keyword arguments to pass to the :class:`~.RPCBase` subclass :raise ValueError: if there is no :class:`~.RPCBase` subclass with a matching `name` :return: :class:`~.RPCBase` instance """ for cls in clients(): if cls.name == name: return cls(*args, **kwargs) raise _errors.ValueError(f'No such client: {name}')
[docs] class ConnectionStatus(enum.Enum): """Current state of the client connection""" connecting = 'connecting' """Attempting to connect""" connected = 'connected' """Connection was established""" disconnected = 'disconnected' """Connection was either lost or terminated"""
class _URLMeta(type): def __init__(cls, name, bases, attrs): # Convert default URL to instance of `cls` cls.default = cls(cls.default)
[docs] class URL(metaclass=_URLMeta): """ URL of an RPC interface This implementation attempts to parse URLs more intuitively instead of following any specs. For example ``"localhost:1234"`` is interpreted as ``host=localhost, port=1234`` instead of ``scheme=localhost, path=1234``. :param str url: URL string :param str default: Fallback URL when `url` is falsy :param callable on_change: Callback that is called with no arguments when any property is modified :raise ValueError: if `url` is invalid """ _parts = ( 'scheme', 'username', 'password', 'host', 'port', 'path', ) @classmethod def _dict_from_string(cls, string): string = str(string).strip() parts = {k: None for k in cls._parts} # Scheme scheme_regex = re.compile(r'^(.*?)://') match = scheme_regex.search(string) if match: parts['scheme'] = match.group(1) or None string = scheme_regex.sub('', string) elif string.startswith(os.sep): # Assume file system path if URL starts with path separator parts['scheme'] = 'file' # File system paths don't have authentication, port, etc if parts['scheme'] == 'file': parts['path'] = string or None else: # Authentication auth_regex = re.compile(r'^(.*?):(.*?)@') match = auth_regex.search(string) if match: parts['username'] = match.group(1) or None parts['password'] = match.group(2) or None string = auth_regex.sub('', string) # Host host_regex = re.compile(r'^(.*?)(?=/|:|$)') match = host_regex.search(string) if match: parts['host'] = match.group(1) or None string = host_regex.sub('', string) # Port port_regex = re.compile(r'^:(.*?)(?=/|$)') match = port_regex.search(string) if match: parts['port'] = match.group(1) or None string = port_regex.sub('', string) # Path parts['path'] = string or None return parts @classmethod def _dict_from_any(cls, thing): if thing is None: return cls._dict_from_string('') elif isinstance(thing, str): return cls._dict_from_string(thing) # URL class is not in the global namespace yet, so we can't do # `isinstance(thing, URL)`. Instead, we do duck-typing and rely on # `with_auth` method providing what we expect. elif hasattr(thing, 'with_auth'): return cls._dict_from_string(thing.with_auth) else: raise TypeError(f'Unsupported type: {type(thing).__name__}: {thing!r}') default = '' """URL to use when ``url`` and ``default`` arguments are both falsy""" def __new__(cls, url=None, default=None, on_change=None): # Get instance self = super().__new__(cls) # Don't trigger any changes until we finished initialization self._on_change = None default_url = cls._dict_from_any(default or cls.default) if url: custom_url = cls._dict_from_any(url) else: custom_url = default_url for name in ('scheme', 'username', 'password', 'host', 'port', 'path'): if custom_url[name]: setattr(self, name, custom_url[name]) else: try: setattr(self, name, default_url[name]) except _errors.ValueError: setattr(self, name, None) # Initialization is complete - enable on_change callback self._on_change = on_change return self @property def scheme(self): """Scheme (e.g. ``"http"`` or ``"file"``)""" return self._scheme @scheme.setter def scheme(self, scheme): if not scheme: self._scheme = None else: self._scheme = str(scheme).lower() if self._on_change: self._on_change() @property def host(self): """Host name or IP address or `None`""" return self._host @host.setter def host(self, host): if not host: self._host = None else: self._host = str(host) if self._on_change: self._on_change() @property def port(self): """Port number or `None`""" return self._port @port.setter def port(self, port): if not port: self._port = None else: try: port = int(port) except (ValueError, TypeError): raise _errors.ValueError('Invalid port') else: if not 1 <= port <= 65535: raise _errors.ValueError('Invalid port') else: self._port = str(port) if self._on_change: self._on_change() @property def path(self): """File system path or request path or `None`""" return self._path @path.setter def path(self, path): self._path = str(path) if path else None if self._on_change: self._on_change() @property def username(self): """Username for authentication""" return self._username @username.setter def username(self, username): if not username: self._username = None else: self._username = str(username) if self._on_change: self._on_change() @property def password(self): """Password for authentication""" return self._password @password.setter def password(self, password): if not password: self._password = None else: self._password = str(password) if self._on_change: self._on_change() @property def without_auth(self): """URL string without :attr:`username` and :attr:`password`""" return self._as_string(with_auth=False) @property def with_auth(self): """URL string with :attr:`username` and :attr:`password`""" return self._as_string(with_auth=True) def _as_string(self, with_auth=False): parts = [] if self.scheme: parts.append(f'{self.scheme}://') if with_auth: if self.username or self.password: parts.append(f'{self.username or ""}:{self.password or ""}@') if self.host: parts.append(self.host) if self.port: parts.append(f':{self.port}') if self.path: parts.append(self.path) return ''.join(parts) def __str__(self): return self.without_auth def __eq__(self, other): if isinstance(other, type(self)): return self.with_auth == other.with_auth else: return NotImplemented def __ne__(self, other): eq = self.__eq__(other) return NotImplemented if eq is NotImplemented else not eq def __repr__(self): text = f'{type(self).__name__}({str(self)!r}' if self._on_change: text += f', on_change={self._on_change!r}' text += ')' return text
def create_http_client(*, auth=(None, None), proxy_url=None): """ Return :class:`httpx.AsyncClient` instance :param auth: Basic auth credentials as `(username, password)` tuple; if either value is falsy, don't do authentication :param proxy_url: URL of a SOCKS4, SOCKS5 or HTTP proxy """ import httpx # isort:skip kwargs = { # Timeouts are handled with async_timeout in RPCBase 'timeout': float('inf'), 'headers': { 'User-Agent': f'{__project_name__} {__version__}', }, } # Basic auth username, password = auth if username and password: kwargs['auth'] = httpx.BasicAuth(username, password) # SOCKS[4|5] or HTTP proxy if proxy_url: import httpx_socks # isort:skip kwargs['transport'] = httpx_socks.AsyncProxyTransport.from_url(proxy_url) return httpx.AsyncClient(**kwargs) async def catch_connection_exceptions(coro): """ Turn exceptions from network requests into :class:`~.ConnectionError` Proxy exceptions are also caught. The error message should be user-friendly. :param coro: Awaitable that performs a network request :return: return value of `coro` :raise ConnectionError: if any relevant exception is raised """ import httpx, httpx_socks # noqa:E401 isort:skip def prettify_msg(e): msg = e.strerror if getattr(e, 'strerror', None) else str(e) if not msg: msg = 'Unknown error' else: # Extract actual error, e.g. from # [Errno 111] Connect call failed ('::1', 5001, 0, 0) # Multiple exceptions: [Errno 111] Connect call failed ('::1', 5001, 0, 0), # [Errno 111] Connect call failed ('127.0.0.1', 5001) match = re.search(r'\[Errno -?\d+\]\s*(.*?)\s*(?:\[|\(|$)', msg) if match: msg = match.group(1) return msg try: return await coro except httpx.HTTPError as e: raise _errors.ConnectionError(prettify_msg(e)) except httpx_socks.ProxyError as e: raise _errors.ConnectionError(prettify_msg(e)) except ConnectionAbortedError: raise _errors.ConnectionError('Connection aborted') except ConnectionRefusedError: raise _errors.ConnectionError('Connection refused') except ConnectionResetError: raise _errors.ConnectionError('Connection reset') except OSError as e: # Any low-level exceptions and httpx_socks.ProxyConnectionError, which # is a subclass of OSError. raise _errors.ConnectionError(prettify_msg(e)) _basic_types = { bool, bytearray, bytes, dict, float, int, list, str, tuple, type(None), } def convert_to_basic_type(data): """ Convert subclassed instance of basic types (:class:`str`, :class:`list`, etc) to parent type Mappings and sequences are converted recursively. """ data_type = type(data) if data_type == list: return [ convert_to_basic_type(item) for item in data ] elif data_type == tuple: return tuple( convert_to_basic_type(item) for item in data ) elif data_type == dict: return { convert_to_basic_type(k): convert_to_basic_type(v) for k, v in data.items() } elif data_type in _basic_types: return data else: for t in _basic_types: if isinstance(data, t): return convert_to_basic_type(t(data)) raise TypeError(f'Unsupported basic type: {type(data).__name__}: {data!r}')