import asyncio
import struct
from . import _base, _errors, _utils
import logging # isort:skip
_log = logging.getLogger(__name__)
[docs]
class DelugeURL(_utils.URL):
"""Deluge RPC URL"""
default = 'localhost:58846'
@property
def scheme(self):
"""Valid schemes: ``None``"""
return super().scheme
@scheme.setter
def scheme(self, scheme):
if scheme:
raise _errors.ValueError("Deluge URLs don't have a scheme")
else:
_utils.URL.scheme.fset(self, scheme)
@property
def path(self):
"""Always `None`"""
return super().path
@path.setter
def path(self, path):
if path:
raise _errors.ValueError("Deluge URLs don't have a path")
else:
_utils.URL.path.fset(self, path)
[docs]
class DelugeRPC(_base.RPCBase):
"""
RPC client for Deluge
URL format: ``[USERNAME:PASSWORD@]HOST[:PORT]``
References:
https://deluge.readthedocs.io/en/latest/reference/index.html
https://www.rasterbar.com/products/libtorrent/manual-ref.html
https://git.deluge-torrent.org/deluge/tree/
**RPC methods**
RPC methods are only documented as Deluge code. Look for funtions decorated
with @export. The RPC method is the module name and the function name
concatenated with ".".
For example, the RPC method name of Daemon.get_method_list() in
deluge/core/daemon.py would be "daemon.get_method_list".
Arguments for RPC methods must be positional/keyword as specified in the
function's call signature.
**Events**
Like RPC methods, events are not properly documented. You can find event
names by grepping for the class name ``DelugeEvent``. The names of
subclasses of ``DelugeEvent`` are also event names.
.. warning:: The Deluge daemon does not complain about invalid event names
and silently accepts subscribtions to anything. Check your event names
carefully!
:raise ValueError: if any argument is invalid
"""
name = 'deluge'
label = 'Deluge'
URL = DelugeURL
def __init__(
self,
url=None,
*,
host=None,
port=None,
username=None,
password=None,
timeout=None,
proxy_url=None,
):
# Set custom or default URL
self.url = url
# Update URL
if host:
self.url.host = host
if port:
self.url.port = port
if username:
self.url.username = username
if password:
self.url.password = password
self.timeout = timeout
self.proxy_url = proxy_url
async def _connect(self):
# Close old client
await self._disconnect()
# Create new client
self._client = _DelugeRPCClient(
host=self.url.host,
port=self.url.port,
on_connection_lost=self._on_connection_lost,
proxy_url=self.proxy_url,
event_handler=self._emit_event,
)
await _utils.catch_connection_exceptions(
self._client.login(
username=self.url.username,
password=self.url.password,
),
)
# Subscribe to events again
for event_name in self._event_handlers:
# IMPORTANT: We can't use _subscribe() because it uses
# RPCBase.call(), which will see we're not connected yet and acquire
# the connection lock to connect. But the connection lock is already
# locked until this method returns, resulting in a deadlock.
await self._call('daemon.set_event_interest', [event_name])
def _on_connection_lost(self):
self._status = _utils.ConnectionStatus.disconnected
self._call_connection_callbacks('disconnected')
async def _disconnect(self):
if hasattr(self, '_client'):
await _utils.catch_connection_exceptions(
self._client.logout()
)
delattr(self, '_client')
async def _call(self, method, *args, **kwargs):
return await _utils.catch_connection_exceptions(
self._client.call(method, *args, **kwargs),
)
async def _subscribe(self, event_name):
_log.debug('Setting interest for event: %r', event_name)
await self.call('daemon.set_event_interest', [event_name])
async def _unsubscribe(self, event_name):
# This is not supported in Deluge, but we can just ignore the event by
# not having any callbacks. This is handled in RPCBase.
pass
class _DelugeRPCClient:
def __init__(self, host, port, proxy_url=None, on_connection_lost=None, event_handler=None):
self._host = str(host)
self._port = int(port)
self._on_connection_lost = on_connection_lost
self._proxy_url = proxy_url
self._loop = _utils.get_aioloop()
self._protocol = None
self._event_handler = event_handler
def _connection_lost(self):
self._protocol = None
if self._on_connection_lost:
self._on_connection_lost()
def _event_received(self, event_name, args):
if asyncio.iscoroutinefunction(self._event_handler):
coro = self._event_handler(event_name, args)
self._loop.create_task(coro)
else:
self._loop.call_soon(self._event_handler, event_name, args)
def _protocol_factory(self):
return _DelugeRPCProtocol(
on_connection_lost=self._connection_lost,
on_event_received=self._event_received,
)
def _create_ssl_context(self):
import ssl # isort:skip
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
return ctx
async def login(self, username, password):
if not self._protocol:
if self._proxy_url:
try:
import python_socks.async_.asyncio # isort:skip
proxy = python_socks.async_.asyncio.Proxy.from_url(self._proxy_url.with_auth)
except ValueError as e:
raise _errors.ValueError(e)
try:
sock = await proxy.connect(
dest_host=self._host,
dest_port=self._port,
# Timeouts are handled with async_timeout in RPCBase
timeout=float('inf'),
)
except python_socks.ProxyError as e:
raise _errors.ConnectionError(e)
create_connection_kwargs = {
'sock': sock,
'server_hostname': self._host,
'protocol_factory': self._protocol_factory,
'ssl': self._create_ssl_context(),
# Timeouts are handled with async_timeout in RPCBase
'ssl_handshake_timeout': float('inf'),
}
else:
create_connection_kwargs = {
'host': self._host,
'port': self._port,
'protocol_factory': self._protocol_factory,
'ssl': self._create_ssl_context(),
# Timeouts are handled with async_timeout in RPCBase
'ssl_handshake_timeout': float('inf'),
}
transport_, self._protocol = await _utils.catch_connection_exceptions(
self._loop.create_connection(**create_connection_kwargs)
)
# The server expects a client_version kwarg when calling
# daemon.login. We pretend to be the official client in case the
# server attempts to derive capabilities from the client_version.
daemon_version = await self.call('daemon.info')
_log.debug('Logging in as %r', username)
auth_level_ = await self.call(
'daemon.login',
username,
password,
client_version=daemon_version,
)
_log.debug('auth level: %r', auth_level_)
async def logout(self):
if self._protocol:
_log.debug('Logging out')
self._protocol.close()
async def call(self, method, *args, **kwargs):
request = _DelugeRPCRequest(
method=method,
args=args,
kwargs=kwargs,
)
return await self._protocol.send_request(request)
class _DelugeRPCProtocol(asyncio.Protocol):
PROTOCOL_VERSION = 1
# See https://docs.python.org/3/library/struct.html#format-strings
MESSAGE_HEADER_FORMAT = '!BI'
MESSAGE_HEADER_SIZE = struct.calcsize(MESSAGE_HEADER_FORMAT)
RPC_RESPONSE = 1
RPC_ERROR = 2
RPC_EVENT = 3
def __init__(
self,
on_connection_made=None,
on_connection_lost=None,
on_event_received=None,
):
self._on_connection_made = on_connection_made
self._on_connection_lost = on_connection_lost
self._on_event_received = on_event_received
self._reset_internal_state()
def _reset_internal_state(self):
self._transport = None
self._buffer = b''
self._message_length = 0
self._requests = {}
_DelugeRPCRequest.reset_id()
def connection_made(self, transport):
_log.debug('Made connection: %r', transport)
self._transport = transport
if self._on_connection_made:
self._on_connection_made()
def connection_lost(self, exception):
_log.debug('Lost connection: %r', exception)
# Don't leave any ongoing requests hanging
if self._requests:
request_exception = exception or _errors.ConnectionError('Connection lost')
_log.debug('Reporting exception to %d ongoing requests: %r',
len(self._requests), request_exception)
for request in self._requests.values():
if not request.future.done():
request.future.set_exception(request_exception)
self._requests.clear()
self.close()
self._reset_internal_state()
if self._on_connection_lost:
self._on_connection_lost()
def close(self):
if self._transport:
self._transport.close()
def data_received(self, data):
self._buffer += data
while len(self._buffer) >= self.MESSAGE_HEADER_SIZE:
if self._message_length == 0:
self._handle_new_message()
if len(self._buffer) >= self._message_length:
self._handle_complete_message()
else:
break
def _handle_new_message(self):
# Read header bytes
header = self._buffer[:self.MESSAGE_HEADER_SIZE]
# Remove the header from the buffer
self._buffer = self._buffer[self.MESSAGE_HEADER_SIZE:]
# Unpack header bytes into usable objects
protocol_version, self._message_length = struct.unpack(self.MESSAGE_HEADER_FORMAT, header)
if protocol_version != self.PROTOCOL_VERSION:
raise RuntimeError(f'Unsupported protocol version: {protocol_version}')
def _handle_complete_message(self):
# Consume message from buffer
data = self._buffer[:self._message_length]
self._buffer = self._buffer[self._message_length:]
self._message_length = 0
# Decode message
import rencode, zlib # noqa:E401 isort:skip
msg = rencode.loads(zlib.decompress(data), decode_utf8=True)
msg_type = msg[0]
# Handle message
if msg_type == self.RPC_EVENT:
event = msg[1]
args = msg[2]
_log.debug('RPC_EVENT: %s(%s)', event, ', '.join(repr(arg) for arg in args))
if self._on_event_received:
self._on_event_received(event, args)
elif msg_type == self.RPC_ERROR:
_log.debug('RPC_ERROR: %r', msg)
request_id, exc_clsname, exc_posargs, exc_kwargs, traceback = msg[1:6]
exc = self._create_rpc_error(exc_clsname, exc_posargs, exc_kwargs, traceback)
self._set_response(request_id, exc)
elif msg_type == self.RPC_RESPONSE:
_log.debug('RPC_RESPONSE: %r', msg)
request_id, response = msg[1:]
self._set_response(request_id, response)
else:
raise RuntimeError(f'Unknown RPC message type: {msg_type!r}')
def _create_rpc_error(self, clsname, posargs, kwargs, traceback):
if clsname == 'BadLoginError':
exc = _errors.AuthenticationError('Authentication failed')
elif clsname == 'NotAuthorizedError':
msg = 'Not authorized'
if len(posargs) >= 2:
auth_level, auth_level_required = posargs[:2]
msg += f': Your authorization level is {auth_level}, but you need {auth_level_required}'
exc = _errors.AuthenticationError(msg)
elif posargs and posargs[0]:
exc = _errors.RPCError(posargs[0])
elif clsname:
exc = _errors.RPCError(clsname)
else:
exc = _errors.RPCError('Unknown error')
_log.debug('Server traceback for %r:\n%s', exc, traceback)
return exc
def _set_response(self, request_id, response):
_log.debug('Setting response for request #%s: %r', request_id, response)
try:
request = self._requests[request_id]
except KeyError:
raise RuntimeError(f'Got response to unknown request #{request_id}: {response!r}')
try:
if request.future.done():
raise RuntimeError(f'Request #{request_id} already has a response')
else:
if isinstance(response, BaseException):
request.future.set_exception(response)
else:
request.future.set_result(response)
finally:
del self._requests[request_id]
def _transfer_messages(self, *data):
import rencode, zlib # noqa:E401 isort:skip
body = zlib.compress(rencode.dumps(_utils.convert_to_basic_type(data)))
body_len = len(body)
packed = struct.pack(
f'{self.MESSAGE_HEADER_FORMAT}{body_len}s',
self.PROTOCOL_VERSION,
body_len,
body,
)
self._transport.write(packed)
def send_request(self, request):
_log.debug('Sending request #%d: %r', request.id, request)
self._requests[request.id] = request
self._transfer_messages(request.format_message())
return request.future
class _DelugeRPCRequest:
_next_id = 0
def __init__(self, *, method, args, kwargs):
self.method = str(method)
self.args = tuple(args)
self.kwargs = dict(kwargs)
self.future = asyncio.Future()
self.id = type(self)._next_id
type(self)._next_id += 1
@classmethod
def reset_id(cls):
"""Reset automatically incremented :attr:`id` to ``0``"""
cls._next_id = 0
def format_message(self):
"""Return properly formatted request"""
return (self.id, self.method, self.args, self.kwargs)
def __repr__(self):
text = f'{self.method}('
posargs = ', '.join((repr(arg) for arg in self.args))
kwargs = ', '.join((f'{k}={v!r}' for k, v in self.kwargs.items()))
args = ', '.join((x for x in (posargs, kwargs) if x))
if args:
text += args
text += ')'
return text
def __eq__(self, other):
return (
type(other) is type(self)
and self.method == other.method
and self.args == other.args
and self.kwargs == other.kwargs
)
def __ne__(self, other):
return not self.__eq__(other)