import json
from . import _base, _errors, _utils
import logging # isort:skip
_log = logging.getLogger(__name__)
[docs]
class TransmissionURL(_utils.URL):
"""Transmission RPC URL"""
default = 'http://localhost:9091/transmission/rpc'
@property
def scheme(self):
"""Valid schemes: ``http``, ``https``"""
return super().scheme
@scheme.setter
def scheme(self, scheme):
if scheme and scheme.lower() not in ('http', 'https'):
raise _errors.ValueError('Scheme must be "http" or "https"')
else:
_utils.URL.scheme.fset(self, scheme)
[docs]
class TransmissionRPC(_base.RPCBase):
"""
RPC client for Transmission
URL format: ``[http[s]://][USERNAME:PASSWORD@]HOST[:PORT][/PATH]``
Reference: https://github.com/transmission/transmission/blob/main/docs/rpc-spec.md
**Calling RPC methods**
RPC methods can be called like python functions:
>>> client.call("torrent-add", filename="path/to.torrent", paused=True)
If you need to pass argument names that contain characters that are illegal
for keyword arguments (e.g. "-"), provide a dictionary:
>>> client.call(
"torrent-add",
{"filename": "file.torrent", "download-dir": "/some/path"},
)
If you provide keyword arguments and a dictionary, values from the
dictionary overload keyword arguments.
:raise ValueError: if any argument is invalid
"""
name = 'transmission'
label = 'Transmission'
URL = TransmissionURL
def __init__(
self,
url=None,
*,
scheme=None,
host=None,
port=None,
path=None,
username=None,
password=None,
timeout=None,
proxy_url=None,
):
# Set custom or default URL
self.url = url
# Update URL
if scheme:
self.url.scheme = scheme
if host:
self.url.host = host
if port:
self.url.port = port
if path:
self.url.path = path
if username:
self.url.username = username
if password:
self.url.password = password
self.timeout = timeout
self.proxy_url = proxy_url
_auth_error_code = 401
_csrf_error_code = 409
_csrf_header = 'X-Transmission-Session-Id'
async def _request(self, method, tag=None, **parameters):
data = {'method': str(method)}
if parameters:
data['arguments'] = parameters
if tag:
try:
data['tag'] = int(float(tag))
except (TypeError, ValueError):
raise _errors.ValueError(f'Tag must be a number: {tag!r}')
try:
data_json = json.dumps(data)
except Exception:
raise _errors.ValueError(f'Failed to serialize to JSON: {data}')
response = await self._send_post_request(str(self.url), data=data_json)
if response.status_code == self._csrf_error_code and self._csrf_header in response.headers:
# Try again with CSRF header
_log.debug('Setting CSRF header: %s = %s', self._csrf_header, response.headers[self._csrf_header])
self._http_headers[self._csrf_header] = response.headers[self._csrf_header]
response = await self._send_post_request(str(self.url), data=data_json)
if response.status_code == self._auth_error_code:
raise _errors.AuthenticationError('Authentication failed')
elif response.status_code != 200:
raise RuntimeError(f'Unexpected response: {response!r}')
return response
async def _connect(self):
# There is no special login procedure, we just make a valid request to
# see if it basically works
await self._request('session-stats')
async def _disconnect(self):
# Forget CSRF header
self._http_headers.clear()
async def _call(self, method, args=None, **kwargs):
# Combine keyword arguments and dictionary (`args`)
if args:
kwargs.update(args)
response = await self._request(method, **kwargs)
try:
result = response.json()
except Exception:
raise _errors.RPCError(f'Unexpected response: {response.text}')
else:
message = result.get('result')
if message == 'success':
return result
else:
# Transmission errors are all lower case
raise _errors.RPCError(
message.capitalize(),
info=result.get('arguments', None),
)