import abc
import asyncio
import collections
import functools
import weakref
import async_timeout
from . import _errors, _utils
import logging # isort:skip
_log = logging.getLogger(__name__)
[docs]
class RPCBase(abc.ABC):
"""Base class for BitTorrent client RPC interfaces"""
# Abstract methods
@abc.abstractmethod
async def _connect(self):
"""Connect to RPC interface"""
@abc.abstractmethod
async def _disconnect(self):
"""
Disconnect from RPC interface
This method should not raise an exception if it fails. For example, if
the service at :attr:`url` is down, we can't logout, which shouldn't be
an error.
"""
@abc.abstractmethod
async def _call(self):
"""Call RPC method"""
# Abstract properties
@property
@abc.abstractmethod
def name(self):
"""Lowercase name of the BitTorrent client"""
@property
@abc.abstractmethod
def label(self):
"""Properly capitalized :attr:`name` of the BitTorrent client"""
def __repr__(self):
url = str(self.url)
if url:
return f'<{type(self).__name__} {url!r}>'
else:
return f'<{type(self).__name__}>'
# Properties
default_timeout = 60.0
"""Default :attr:`timeout`"""
@property
def timeout(self):
"""
Timeout in seconds for RPC calls
If this property is set to a falsy value, :attr:`default_timeout` is
used.
:raise ValueError: if set to something that can't be coerced into a
:class:`float`
"""
return getattr(self, '_timeout', self.default_timeout)
@timeout.setter
def timeout(self, timeout):
try:
self._timeout = float(timeout) if timeout else self.default_timeout
except (TypeError, ValueError):
raise _errors.ValueError('Invalid timeout')
else:
self._invalidate_http_client()
@property
@abc.abstractmethod
def URL(self):
"""
:class:`~.URL` subclass
Setting properties on an instance of this subclass to invalid values
should raise :class:`~.ValueError`. For example, :class:`~.RtorrentURL`
only accepts the schemes ``http``, ``scgi`` or ``file``.
"""
@property
def url(self):
"""
:class:`~.URL` of the RPC interface
Changing any of the properties will re-connect to the new URL on the
next :meth:`~.call`.
If this property is set to a falsy value, :attr:`~.URL.default` is
used.
:raise ValueError: if set to an invalid URL
"""
url = getattr(self, '_url', None)
if not url:
url = self._url = type(self).URL(
url=type(self).URL.default,
on_change=self._invalidate_http_client,
)
return url
@url.setter
def url(self, url):
self._url = type(self).URL(
url=url if url else type(self).URL.default,
on_change=self._invalidate_http_client,
)
self._invalidate_http_client()
@property
def proxy_url(self):
"""
SOCKS5, SOCKS4 or HTTP proxy :class:`~.URL` for tunneling the RPC connection
If this property is set to a falsy value, no proxy is used.
:raise ValueError: if set to an invalid URL
"""
return getattr(self, '_proxy_url', None)
@proxy_url.setter
def proxy_url(self, proxy_url):
if proxy_url:
self._proxy_url = _utils.URL(
url=proxy_url,
on_change=self._invalidate_http_client,
)
else:
self._proxy_url = None
self._invalidate_http_client()
@property
def status(self):
""":class:`~.ConnectionStatus` enum"""
return getattr(self, '_status', _utils.ConnectionStatus.disconnected)
@property
def is_connected(self):
"""
`True` if :attr:`status: is :attr:`~.ConnectionStatus.connected`, `False`
otherwise
"""
return self.status is _utils.ConnectionStatus.connected
# Callbacks
[docs]
def set_connecting_callback(self, callback, *args, **kwargs):
"""
Schedule callback to be called when a connection attempt is made
:param callback: Callable to call
All remaining arguments are passed to `callback` when it is called.
"""
assert callable(callback)
self._connection_callbacks['connecting'].append((callback, args, kwargs))
[docs]
def unset_connecting_callback(self, callback):
"""
Unschedule callback to be called when a connection attempt is made
:param callback: Callable to call
"""
assert callable(callback)
for cb in tuple(self._connection_callbacks['connecting']):
if cb[0] == callback:
self._connection_callbacks['connecting'].remove(cb)
[docs]
def set_connected_callback(self, callback, *args, **kwargs):
"""
Schedule callback to be called when a connection attempt was successful
:param callback: Callable to call
All remaining arguments are passed to `callback` when it is called.
"""
assert callable(callback)
self._connection_callbacks['connected'].append((callback, args, kwargs))
[docs]
def unset_connected_callback(self, callback):
"""
Unschedule callback to be called when a connection attempt was successful
:param callback: Callable to call
"""
assert callable(callback)
for cb in tuple(self._connection_callbacks['connected']):
if cb[0] == callback:
self._connection_callbacks['connected'].remove(cb)
[docs]
def set_disconnected_callback(self, callback, *args, **kwargs):
"""
Schedule callback to be called when the connection was lost or a connection
attempt has failed
:param callback: Callable to call
All remaining arguments are passed to `callback` when it is called.
"""
assert callable(callback)
self._connection_callbacks['disconnected'].append((callback, args, kwargs))
[docs]
def unset_disconnected_callback(self, callback):
"""
Unschedule callback to be called when the connection was lost or a
connection attempt has failed
:param callback: Callable to call
"""
assert callable(callback)
for cb in tuple(self._connection_callbacks['disconnected']):
if cb[0] == callback:
self._connection_callbacks['disconnected'].remove(cb)
@functools.cached_property
def _connection_callbacks(self):
return {
'connecting': [],
'connected': [],
'disconnected': [],
}
def _call_connection_callbacks(self, name):
for callback, args, kwargs in self._connection_callbacks[name]:
callback(*args, **kwargs)
# RPC methods
@functools.cached_property
def _connection_lock(self):
return asyncio.Lock()
[docs]
async def connect(self):
"""
Connect to RPC interface
Do nothing if :attr:`status` indicates we are already connected.
It is safe to call this method multiple times concurrently. The first
call will actually connect while the remaining calls wait for the first
call to finish. If the first call fails, each of the remaining calls
will become the first call, i.e. it will attempt to connect while the
others wait for it.
:raise ConnectionError: if the request failed
:raise AuthenticationError: if authentication failed
:raise TimeoutError: if there is no response after :attr:`timeout` seconds
:raise RPCError: if there is any miscommunication between us and the RPC
interface
"""
_log.debug('%s: connect(): Waiting for connection lock (status=%s)', self.label, self.status)
try:
async with self._connection_lock:
_log.debug('%s: connect(): Acquired connection lock (status=%s)', self.label, self.status)
if self.status is not _utils.ConnectionStatus.connected:
self._status = _utils.ConnectionStatus.connecting
self._call_connection_callbacks('connecting')
try:
async with async_timeout.timeout(self.timeout):
await self._connect()
except Exception as e:
_log.debug('%s: Failed to connect: %r', self.label, e)
await self._disconnect()
self._status = _utils.ConnectionStatus.disconnected
self._call_connection_callbacks('disconnected')
if isinstance(e, asyncio.TimeoutError):
raise _errors.TimeoutError(f'Timeout after {self.timeout} seconds')
else:
raise
else:
_log.debug('%s: Connected', self.label)
self._status = _utils.ConnectionStatus.connected
self._call_connection_callbacks('connected')
finally:
_log.debug('%s: connect(): Freed connection lock (status=%s)', self.label, self.status)
[docs]
async def disconnect(self):
"""
Disconnect from RPC interface
Do nothing if :attr:`status` indicates we are already disconnected.
It is safe to call this method multiple times concurrently. The first
call will actually disconnect while the remaining calls wait for the
first call to finish. If the first call fails, each of the remaining
call will become the first call, i.e. it will attempt to disconnect
while the others wait for it.
:attr:`status` is always :attr:`.ConnectionStatus.disconnected` when
this method returns, regardless of any raised exceptions.
:raise AuthenticationError: if authentication failed
:raise TimeoutError: if there is no response after :attr:`timeout` seconds
:raise RPCError: if there is any miscommunication between us and the RPC
interface
"""
_log.debug('%s: disconnect(): Waiting for connection lock (status=%s)', self.label, self.status)
try:
async with self._connection_lock:
_log.debug('%s: disconnect(): Acquired connection lock (status=%s)', self.label, self.status)
if self.status is not _utils.ConnectionStatus.disconnected:
try:
async with async_timeout.timeout(self.timeout):
await self._disconnect()
except asyncio.TimeoutError:
_log.debug('%s: disconnect(): Timeout', self.label)
raise _errors.TimeoutError(f'Timeout after {self.timeout} seconds')
finally:
self._status = _utils.ConnectionStatus.disconnected
self._call_connection_callbacks('disconnected')
finally:
await self._close_http_client()
_log.debug('%s: disconnect(): Freed connection lock (status=%s)', self.label, self.status)
[docs]
async def call(self, *args, **kwargs):
"""
Call RPC method and return the result
If :attr:`status` is not :attr:`.ConnectionStatus.connected`, call
:meth:`connect` first.
:raise AuthenticationError: if authentication failed
:raise ConnectionError: if the request failed
:raise TimeoutError: if there is no response after :attr:`timeout` seconds
:raise RPCError: if there is any miscommunication between us and the RPC
interface
:return: the return value of the RPC method
This should be decoded bytes, deserialized JSON, etc. Exceptions
from decoding and deserializing should be raised as
:class:`~.RPCError`.
"""
# _log.debug('%s: [%s] Calling: %s, %s', self.label, self.status, args, kwargs)
if self.status is not _utils.ConnectionStatus.connected:
_log.debug('%s: Auto-connecting', self.label)
await self.connect()
try:
async with async_timeout.timeout(self.timeout):
return await self._call(*args, **kwargs)
except asyncio.TimeoutError:
_log.debug('%s: call(): Timeout', self.label)
raise _errors.TimeoutError(f'Timeout after {self.timeout} seconds')
# Events
@functools.cached_property
def _event_handlers(self):
return collections.defaultdict(lambda: [])
[docs]
async def add_event_handler(self, event, handler, autoremove=False):
"""
Call callable when event happens
:param event: Name or other identifier of the event (refer to the client
documentation for valid values)
:param handler: Callable to be called when `event` happens (refer to the
client documentation for the call signature)
:param autoremove: Whether `handler` should be unregistered
automatically after it is garbage collected (see :mod:`weakref`)
If `handler` is already registered for `event`, do nothing.
Multiple handlers can be registered for the same event. They will be
called in the order they are added.
:raise NotImplementedError: if the client doesn't support events
"""
assert callable(handler), handler
if event not in self._event_handlers:
await self._subscribe(event)
if autoremove:
handler_ref = weakref.ref(handler, self._event_handlers[event].remove)
if handler_ref not in self._event_handlers[event]:
self._event_handlers[event].append(handler_ref)
_log.debug('Added handler reference for event %r: %r', event, handler_ref)
else:
if handler not in self._event_handlers[event]:
self._event_handlers[event].append(handler)
_log.debug('Added handler for event %r: %r', event, handler)
[docs]
async def remove_event_handler(self, event, handler):
"""
Stop calling callable when event happens
See :meth:`add_event_handler`.
If `handler` is not registered for `event`, do nothing.
:raise NotImplementedError: if the client doesn't support events
"""
event_handlers = self._event_handlers[event]
# Disconnect `handler` from `event`
if handler in event_handlers:
event_handlers.remove(handler)
# If there aren't any other handlers for `event`, unsubscribe
if not event_handlers:
del self._event_handlers[event]
await self._unsubscribe(event)
_log.debug('Removed handler for event %r: %r', event, handler)
[docs]
async def wait_for_event(self, event):
"""
Block until event happens
:param event: Name or other identifier of the event (refer to the client
documentation for valid values)
:raise NotImplementedError: if the client doesn't support events
"""
blocker = asyncio.Event()
def unblock_blocker(*_, **__):
blocker.set()
await self.add_event_handler(event, unblock_blocker)
try:
_log.debug('Waiting for event: %r', event)
await blocker.wait()
_log.debug('Done waiting for event: %r', event)
finally:
await self.remove_event_handler(event, unblock_blocker)
async def _emit_event(self, event, args=None, kwargs=None):
# This function is called by subclasses when they receive an event
args = args or ()
kwargs = kwargs or {}
for handler in self._event_handlers[event]:
# `handler` is a weakref.ref instance if it was registered with
# `autoremove=True`.
#
# NOTE: If we pass `handler` to _log.debug() here, it will keep
# another strong reference to `handler` and the test that
# covers dead weakrefs will fail.
if isinstance(handler, weakref.ref):
handler = handler()
if handler is None:
continue
# Because this method is likely being called by some background task
# that is waiting for events, any exception raised by `handler` will
# go unnoticed, so we stop the loop if that happens.
# There should be a better way to handle this, but I can't find it.
try:
if asyncio.iscoroutinefunction(handler):
await handler(*args, **kwargs)
else:
handler(*args, **kwargs)
except BaseException as e:
try:
loop = asyncio.get_running_loop()
except RuntimeError:
# Loop is already stopped
pass
else:
_log.debug('Stopping loop because event handler for %r '
'threw an exception: %r', event, e)
loop.stop()
# We probably raise this into the void, but not raising
# BaseException is really bad.
raise
async def _subscribe(self, event):
# Tell the client to send us a certain type of event
raise NotImplementedError(f'Events are not supported for {self.label}')
async def _unsubscribe(self, event):
# Tell the client to stop sending us a certain type of event
raise NotImplementedError(f'Events are not supported for {self.label}')
# Context manager
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc, tb):
_log.debug('%s: Disconnecting at end of context manager', self.label)
await self.disconnect()
# Make extra sure the HTTP client is closed
await self._close_http_client()
# HTTP client
@property
def _http_headers(self):
"""
Headers that are sent on HTTP requests
Subclass instances should modify these headers as they please. They are
automatically combined with any obligatory headers.
"""
headers = getattr(self, '__http_headers', None)
if headers is None:
headers = {}
setattr(self, '__http_headers', headers)
return headers
async def _get_http_client(self):
if getattr(self, '_http_client_is_invalidated', False):
delattr(self, '_http_client_is_invalidated')
_log.debug('%s: HTTP client was invalidated', self.label)
await self._close_http_client()
if not hasattr(self, '_http_client'):
proxy_url = self.proxy_url.with_auth if self.proxy_url else None
self._http_client = _utils.create_http_client(
auth=(self.url.username, self.url.password),
proxy_url=proxy_url,
)
_log.debug('%s: Created new HTTP client: %r', self.label, self._http_client)
return self._http_client
async def _close_http_client(self):
if hasattr(self, '_http_client'):
_log.debug('%s: Closing HTTP client: %r', self.label, self._http_client)
await self._http_client.aclose()
delattr(self, '_http_client')
def _invalidate_http_client(self):
if hasattr(self, '_http_client'):
_log.debug('%s: HTTP client is now invalidated: %r', self.label, self._http_client)
self._http_client_is_invalidated = True
self._status = _utils.ConnectionStatus.disconnected
async def _send_post_request(self, url, data=None, files=None):
# httpx wants dictionaries as `data` and everything else as `content`
if data is not None and not isinstance(data, dict):
content = data
data = None
else:
content = None
client = await self._get_http_client()
return await _utils.catch_connection_exceptions(
client.post(
timeout=float('inf'),
url=url,
headers=self._http_headers,
data=data,
content=content,
files=files,
),
)