Source code for aiobtclientrpc._rtorrent

import abc
import asyncio
import xmlrpc.client

from . import _base, _errors, _utils

import logging  # isort:skip
_log = logging.getLogger(__name__)


[docs] class RtorrentURL(_utils.URL): """rTorrent RPC URL""" default = 'scgi://127.0.0.1:5000' @property def scheme(self): """Valid schemes: ``file``, ``scgi``, ``http``, ``https``""" return super().scheme @scheme.setter def scheme(self, scheme): if not scheme or str(scheme).lower() in ('file', 'scgi', 'http', 'https'): _utils.URL.scheme.fset(self, scheme) else: raise _errors.ValueError('Scheme must be "file", "scgi", "http" or "https"') @_utils.URL.host.setter def host(self, host): if host and self.scheme == 'file': raise _errors.ValueError(f"{self.scheme} URLs don't have a host") else: _utils.URL.host.fset(self, host) @_utils.URL.port.setter def port(self, port): if port and self.scheme == 'file': raise _errors.ValueError(f"{self.scheme} URLs don't have a port") else: _utils.URL.port.fset(self, port) @_utils.URL.username.setter def username(self, username): if username and self.scheme in ('file', 'scgi'): raise _errors.ValueError(f"{self.scheme} URLs don't have a username") else: _utils.URL.username.fset(self, username) @_utils.URL.password.setter def password(self, password): if password and self.scheme in ('file', 'scgi'): raise _errors.ValueError(f"{self.scheme} URLs don't have a password") else: _utils.URL.password.fset(self, password) @_utils.URL.path.setter def path(self, path): if path and self.scheme == 'scgi': raise _errors.ValueError(f"{self.scheme} URLs don't have a path") else: _utils.URL.path.fset(self, path)
[docs] class RtorrentRPC(_base.RPCBase): """ RPC client for rTorrent URL formats: * ``[scgi://]HOST[:PORT]`` * ``[file://]SOCKET_PATH`` * ``http[s]://[USERNAME:PASSWORD@]HOST[:PORT][/PATH]`` References: * https://github.com/rakshasa/rtorrent/wiki/rTorrent-0.9-Comprehensive-Command-list-(WIP) * https://docs.python.org/3/library/xmlrpc.client.html * https://github.com/rakshasa/rtorrent/wiki/RPC-Setup-XMLRPC :raise ValueError: if any argument is invalid """ name = 'rtorrent' label = 'rTorrent' URL = RtorrentURL def __init__( self, url=None, *, scheme=None, host=None, port=None, username=None, password=None, timeout=None, proxy_url=None, ): # Set custom or default URL self.url = url # Update URL if scheme is not None: self.url.scheme = scheme if host is not None: self.url.host = host if port is not None: self.url.port = port if username is not None: self.url.username = username if password is not None: self.url.password = password self.timeout = timeout self.proxy_url = proxy_url async def _connect(self): # Close old proxy await self._disconnect() # Create new XMLRPC proxy self._xmlrpc = _AsyncServerProxy( url=self.url, proxy_url=self.proxy_url, ) # Maybe raise connection/authentication error await self._call('system.pid') async def _disconnect(self): if hasattr(self, '_xmlrpc'): await self._xmlrpc.close() delattr(self, '_xmlrpc') async def _call(self, method, *args): try: return await _utils.catch_connection_exceptions( self._xmlrpc.call(method, *args), ) except xmlrpc.client.ProtocolError as e: if e.errcode == 401: raise _errors.AuthenticationError('Authentication failed') else: msg = e.errmsg if e.errmsg else str(e) raise _errors.RPCError(msg) except xmlrpc.client.Fault as e: raise _errors.RPCError(e.faultString) async def _multicall_rt(self, *calls, raise_errors=True, as_dict=False): """ Make ``system.multicall`` RPC request :param calls: ``(method, parameter1, parameter2, ...)`` tuples :param bool raise_errors: Whether to raise the first error response as :class:`~.RPCError` or return it like a normal response :param bool as_dict: Whether to map method names to return values .. note:: Every method can only be called once if this is enabled. :raise RPCError: if `raise_errors` is `True` and any response contains a dictionary with a ``faultString`` key :return: :class:`list` or :class:`dict` of return values """ if as_dict: # Because method names are dictionary keys, every method can only be # called once. methods = [call[0] for call in calls] indexes_map = { method: [i for i in range(len(methods)) if methods[i] == method] for method in methods } for method, indexes in indexes_map.items(): if len(indexes) > 1: def pretty_call(i): method = calls[i][0] params = ', '.join(f'{param!r}' for param in calls[i][1:]) return f'{method}({params})' calls = ', '.join(pretty_call(i) for i in indexes) raise RuntimeError(f'Multiple {method} calls: {calls}') responses = await self.call('system.multicall', [ {'methodName': method, 'params': params} for method, *params in calls ]) return_values = [] for i in range(len(responses)): response = responses[i] if isinstance(response, dict) and 'faultString' in response: exc = _errors.RPCError(str(response['faultString'])) if raise_errors: raise exc else: return_values.append(exc) elif isinstance(response, list) and len(response) == 1: return_values.append(response[0]) else: raise RuntimeError(f'Unexpected response: {response!r}') assert len(return_values) == len(calls) if as_dict: return { method: return_value for (method, *params), return_value in zip(calls, return_values) } else: return return_values _supported_methods = ()
[docs] async def get_supported_method(self, *candidates): """ Get first supported method from multiple candidates :param candidates: Sequence of method names :raise ValueError: if no method name in `candidates` is supported """ if not self._supported_methods: type(self)._supported_methods = await self.call('system.listMethods') for method in candidates: if method in self._supported_methods: return method raise _errors.ValueError('Unsupported method(s): ' + ', '.join(f'{c!r}' for c in candidates))
class _AsyncServerProxy: def __init__(self, url, proxy_url=None): if url.scheme in ('http', 'https'): self._transport = _HttpTransport( url=url, proxy_url=proxy_url, ) elif url.scheme == 'scgi': self._transport = _ScgiHostTransport( url=url, proxy_url=proxy_url, ) elif url.scheme == 'file' and url.path: if proxy_url: raise _errors.ValueError(f'You cannot use a proxy to connect to {url}') else: self._transport = _ScgiSocketTransport(url=url) else: raise _errors.ValueError(f'Unsupported protocol: {url}') async def call(self, method_name, *params): request_data = xmlrpc.client.dumps( _utils.convert_to_basic_type(params), method_name, encoding='utf-8', allow_none=False, ).encode('utf-8', 'xmlcharrefreplace') # Return asynchronous iterator over chunks of bytes chunks = self._transport.request(request_data) return await self._parse_response(chunks) async def _parse_response(self, chunks): p, u = xmlrpc.client.getparser() async for chunk in chunks: p.feed(chunk) p.close() return_value = u.close() if len(return_value) == 1: return return_value[0] else: return return_value async def close(self): await self._transport.close() class TransportBase(abc.ABC): @abc.abstractmethod async def request(self, data): """ Send request and return asynchronous iterator over chunks of bytes """ @abc.abstractmethod async def close(self): """Close any existing connections""" class _HttpTransport(TransportBase): """Connect via reverse HTTP proxy""" def __init__(self, url, proxy_url=None): if url.scheme not in ('http', 'https'): raise _errors.ValueError(f'Unsupported protocol: {url.scheme}') else: if not url.path: url.path = '/RPC2' # Username and password are stored in self._http_client self._url = url.without_auth self._request_lock = asyncio.Lock() self._http_client = _utils.create_http_client( auth=(url.username, url.password), proxy_url=proxy_url.with_auth if proxy_url else None, ) async def close(self): async with self._request_lock: await self._http_client.aclose() async def request(self, data): async with self._request_lock: aiterator = self._request(data) async for chunk in aiterator: yield chunk async def _request(self, data): async with self._http_client.stream('POST', self._url, content=data) as response: if response.status_code != 200: raise xmlrpc.client.ProtocolError( url=self._url, errcode=response.status_code, errmsg=response.reason_phrase, headers=response.headers, ) else: aiterator = response.aiter_bytes() async for chunk in aiterator: yield chunk class _ScgiTransportBase(TransportBase, abc.ABC): """Base class for SCGI transports (network.scgi.*)""" async def close(self): pass async def request(self, data): reader, writer = await self._get_reader_writer() await self._send(writer, data) async for chunk in self._read(reader, writer, 1024): yield chunk @abc.abstractmethod async def _get_reader_writer(self): """Return `(:class:`StreamReader`, :class:`StreamWriter`)` tuple""" async def _read(self, reader, writer, chunk_size): try: headers_delim = b'\r\n\r\n' headers_done = False combined_headers = b'' while True: chunk = await reader.read(chunk_size) if not chunk: break elif headers_done: # Headers are already fully read yield chunk else: combined_headers += chunk if headers_delim in combined_headers: # Find and remove HTTP headers payload_start = combined_headers.index(headers_delim) + len(headers_delim) first_payload_chunk = combined_headers[payload_start:] if first_payload_chunk: yield first_payload_chunk headers_done = True combined_headers = b'' finally: writer.close() await writer.wait_closed() async def _send(self, writer, data): data_encoded = self._encode_request(data) writer.write(data_encoded) await writer.drain() def _encode_request(self, data): def encode_header(key, value): return key + b'\x00' + value + b'\x00' headers = ( encode_header(b'CONTENT_LENGTH', str(len(data)).encode('utf-8')) + encode_header(b'SCGI', b'1') + encode_header(b'REQUEST_METHOD', b'POST') + encode_header(b'REQUEST_URI', self._path) ) request = ( str(len(headers)).encode('utf-8') + b':' + headers + b',' + data ) return request class _ScgiHostTransport(_ScgiTransportBase): """Connect directly to rTorrent (network.scgi.open_port)""" def __init__(self, url, proxy_url=None): if url.scheme != 'scgi': raise _errors.ValueError(f'Unsupported protocol: {url.scheme}') self._host = url.host if not url.port: raise _errors.ValueError('No port specified') else: self._port = int(url.port) self._path = (url.path or '/RPC2').encode('utf-8') self._proxy_url = proxy_url async def _get_reader_writer(self): if self._proxy_url: import python_socks.async_.asyncio # isort:skip try: proxy = python_socks.async_.asyncio.Proxy.from_url(self._proxy_url.with_auth) except ValueError as e: raise _errors.ValueError(e) sock = await proxy.connect( dest_host=self._host, dest_port=self._port, # Timeouts are handled with async_timeout in RPCBase timeout=float('inf'), ) open_connection_kwargs = { 'sock': sock, } else: open_connection_kwargs = { 'host': self._host, 'port': self._port, } reader, writer = await asyncio.open_connection(**open_connection_kwargs) return reader, writer class _ScgiSocketTransport(_ScgiTransportBase): """Connect directly to rTorrent (network.scgi.open_local)""" def __init__(self, url): if url.scheme != 'file': raise _errors.ValueError(f'Unsupported protocol: {url.scheme}') self._socket_path = url.path self._path = b'/RPC2' async def _get_reader_writer(self): reader, writer = await asyncio.open_unix_connection(path=self._socket_path) return reader, writer