Custom Python Code¶
You can write your own code for pyrocore
implementing custom features,
by adding fields, your own command line scripts, or pyrotorque
jobs.
You probably need a solid grasp of Python for this.
Defining Custom Fields¶
Introduction¶
As mentioned in the Configuration Guide, the config.py
script can be used to add
custom logic to your setup. The most common use for this file is adding
custom fields.
To add user-defined fields you can put code describing them into your
~/.pyroscope/config.py
file. You can then use your custom field just
like any built-in one, e.g. issue a command like
rtcontrol --from-view incomplete \* -qco partial_done,name
(see
below examples). They’re also listed when you call
rtcontrol --help-fields
.
Basic Custom Field Code¶
The following is the framework you need to add before putting in your field definitions:
def _custom_fields():
""" Yield custom field definitions.
"""
# Import some commonly needed modules
from pyrocore.torrent import engine, matching
from pyrocore.util import fmt, os
# PUT CUSTOM FIELD CODE HERE
# Register our factory with the system
custom_field_factories.append(_custom_fields)
In place of the # PUT CUSTOM FIELD CODE HERE
comment you can add any
combination of the examples below, or your own code.
Be sure to do so at the correct indent level, the example snippets
are left-aligned and need to be indented by 4 spaces.
Custom Field Examples¶
Adding rTorrent fields not supported by default¶
# Add rTorrent attributes not available by default
def get_tracker_field(obj, name, aggregator=sum):
"Get an aggregated tracker field."
return aggregator(obj._engine._rpc.t.multicall(obj._fields["hash"], 0, "t.%s=" % name)[0])
yield engine.OnDemandField(int, "peers_connected", "number of connected peers", matcher=matching.FloatFilter)
yield engine.DynamicField(int, "downloaders", "number of completed downloads", matcher=matching.FloatFilter,
accessor=lambda o: get_tracker_field(o, "scrape_downloaded"))
yield engine.DynamicField(int, "seeds", "number of seeds", matcher=matching.FloatFilter,
accessor=lambda o: get_tracker_field(o, "scrape_complete"))
yield engine.DynamicField(int, "leeches", "number of leeches", matcher=matching.FloatFilter,
accessor=lambda o: get_tracker_field(o, "scrape_incomplete"))
yield engine.DynamicField(engine.untyped, "lastscraped", "time of last scrape", matcher=matching.TimeFilter,
accessor=lambda o: get_tracker_field(o, "scrape_time_last", max),
formatter=lambda dt: fmt.human_duration(float(dt), precision=2, short=True))
# Add peer attributes not available by default
def get_peer_data(obj, name, aggregator=None):
"Get some peer data via a multicall."
aggregator = aggregator or (lambda _: _)
result = obj._engine._rpc.p.multicall(obj._fields["hash"], 0, "p.%s=" % name)
return aggregator([i[0] for i in result])
yield engine.DynamicField(set, "peers_ip", "list of IP addresses for connected peers",
matcher=matching.TaggedAsFilter, formatter=", ".join,
accessor=lambda o: set(get_peer_data(o, "address")))
Checking that certain files are present¶
# Add file checkers
def has_nfo(obj):
"Check for .NFO file."
pathname = obj.path
if pathname and os.path.isdir(pathname):
return any(i.lower().endswith(".nfo") for i in os.listdir(pathname))
else:
return False if pathname else None
def has_thumb(obj):
"Check for folder.jpg file."
pathname = obj.path
if pathname and os.path.isdir(pathname):
return any(i.lower() == "folder.jpg" for i in os.listdir(pathname))
else:
return False if pathname else None
yield engine.DynamicField(engine.untyped, "has_nfo", "does download have a .NFO file?",
matcher=matching.BoolFilter, accessor=has_nfo,
formatter=lambda val: "NFO" if val else "!DTA" if val is None else "----")
yield engine.DynamicField(engine.untyped, "has_thumb", "does download have a folder.jpg file?",
matcher=matching.BoolFilter, accessor=has_thumb,
formatter=lambda val: "THMB" if val else "!DTA" if val is None else "----")
Calculating information about partial downloads¶
Note that the partial_done
value can be a little lower than it
actually should be, when chunks shared by different files are not yet
complete; but it will eventually reach 100
when all selected chunks
are downloaded in full.
# Fields for partial downloads
def partial_info(obj, name):
"Helper for partial download info"
try:
return obj._fields[name]
except KeyError:
f_attr = ["get_completed_chunks", "get_size_chunks", "get_range_first", "get_range_second"]
chunk_size = obj.fetch("chunk_size")
prev_chunk = -1
size, completed, chunks = 0, 0, 0
for f in obj._get_files(f_attr):
if f.prio: # selected?
shared = int(f.range_first == prev_chunk)
size += f.size
completed += f.completed_chunks - shared
chunks += f.size_chunks - shared
prev_chunk = f.range_second - 1
obj._fields["partial_size"] = size
obj._fields["partial_missing"] = (chunks - completed) * chunk_size
obj._fields["partial_done"] = 100.0 * completed / chunks if chunks else 0.0
return obj._fields[name]
yield engine.DynamicField(int, "partial_size", "bytes selected for download",
matcher=matching.ByteSizeFilter,
accessor=lambda o: partial_info(o, "partial_size"))
yield engine.DynamicField(int, "partial_missing", "bytes missing from selected chunks",
matcher=matching.ByteSizeFilter,
accessor=lambda o: partial_info(o, "partial_missing"))
yield engine.DynamicField(float, "partial_done", "percent complete of selected chunks",
matcher=matching.FloatFilter,
accessor=lambda o: partial_info(o, "partial_done"))
Extract TV data from item name¶
This defines the tv_series
and tv_episode
fields, that are
non-empty when the item name follows the “usual” naming conventions. Try
it using something like
rtcontrol loaded=-2w traits=tv -co tv_series,tv_episode,name
.
# Map name field to TV series name, if applicable, else an empty string
from pyrocore.util import traits
def tv_mapper(obj, name, templ):
"Helper for TV name mapping"
try:
return obj._fields[name]
except KeyError:
itemname = obj.name
result = ""
kind, info = traits.name_trait(itemname, add_info=True)
if kind == "tv":
try:
info["show"] = ' '.join([i.capitalize() for i in info["show"].replace('.',' ').replace('_',' ').split()])
result = templ % info
except KeyError, exc:
#print exc
pass
obj._fields[name] = result
return result
yield engine.DynamicField(fmt.to_unicode, "tv_series", "series name of a TV item",
matcher=matching.PatternFilter, accessor= lambda o: tv_mapper(o, "tv_series", "%(show)s"))
yield engine.DynamicField(fmt.to_unicode, "tv_episode", "series name and episode number of a TV item",
matcher=matching.PatternFilter, accessor= lambda o: tv_mapper(o, "tv_episode", "%(show)s.S%(season)sE%(episode)s"))
Only start items that you have disk space for¶
This example works together with rTorrent Queue Manager, so that only items that pass a disk space check are actually started. Meaning you can safely employ automatic downloading via e.g. RSS, without fear of a disk full situation when your disk space housekeeping fails.
It is also much more robust than a schedule using rTorrent’s close_low_diskspace
command
– pre-allocation of a started item still eats up disk space, and stopping an item
shortly afterwards doesn’t fix that. Which means if a RSS feed constantly adds new auto-started
items, your disk will fill up when using only such a schedule.
The first step is to add a custom field that checks whether an item has
room on the target device. As with the other examples, place this in
your config.py
(read the 1st two sections, before the “Examples” one).
# Disk space check
def has_room(obj):
"Check disk space."
pathname = obj.path
try:
if pathname and not os.path.exists(pathname):
pathname = os.path.dirname(pathname)
if pathname and os.path.exists(pathname):
stats = os.statvfs(pathname)
return (stats.f_bavail * stats.f_frsize - int(diskspace_threshold_mb) * 1024**2
> obj.size * (1.0 - obj.done / 100.0))
else:
return None
except UnicodeEncodeError as exc:
raise ValueError('Problematic filename %r: %s' % (pathname, exc))
yield engine.DynamicField(engine.untyped, "has_room",
"check whether the download will fit on its target device",
matcher=matching.BoolFilter, accessor=has_room,
formatter=lambda val: "OK" if val else "??" if val is None else "NO")
globals().setdefault("diskspace_threshold_mb", "500")
And now, all you need is to add has_room=y
to your
job.queue.startable
conditions in torque.ini
. Done.
Important
To ensure that checking for free space works as expected, only start one item at a time, and enforce a delay after each start, so that disk space can be claimed before the next item’s check is performed.
In other words, use queue configuation values similar to these:
job.queue.schedule = second=*/5
job.queue.start_at_once = 1
job.queue.intermission = 120
Note that you can set the threshold of space to keep free (in MiB) in
the GLOBAL
section of config.ini
, and the default is 500MiB.
You should keep your close_low_diskspace
schedule for rTorrent as a fallback,
and set diskspace_threshold_mb
higher than the limit given there
(so that normally, the low space check never triggers).
It’s a good idea to set diskspace_threshold_mb
a good deal higher than
the hard limit that close_low_diskspace
enforces.
That makes automatic downloading stop at the higher threshold,
but leaves you with wiggle room for manual starting of important stuff
that won’t be stopped just a moment later, as long as it fits into that gap
between the two values.
Adding Custom Template Helpers¶
In templating contexts, there is an empty c
namespace (think custom
or config
),
just like h
for helpers.
You can populate that namespace with your own helpers as you need them,
from simple string transformations to calling external programs or web interfaces.
The following example illustrates the concept, and belongs into ~/.pyroscope/config.py
.
def _hostname(ip):
"""Helper to e.g. look up peer IPs."""
import socket
return socket.gethostbyaddr(ip)[0] if ip else ip
custom_template_helpers.hostname = _hostname
This demonstrates the call of that helper using a custom field, a real use-case would be to resolve peer IPs and the like.
$ rtcontrol -qo '{{d.fetch("custom_ip")}} → {{d.fetch("custom_ip") | c.hostname}}' // -/1
8.8.8.8 → google-public-dns-a.google.com
Writing Your Own Scripts¶
Introduction¶
The pyrocore
Python package contains powerful helper classes that
make remote access to rTorrent child’s play (see API Documentation).
And your tools get the same Look & Feel like the built-in PyroScope
commands, as long as you use the provided base class
pyrocore.scripts.base.ScriptBaseWithConfig
.
See for yourself:
#! /usr/bin/env python-pyrocore
# -*- coding: utf-8 -*-
# Enter the magic kingdom
from pyrocore import config
from pyrocore.scripts import base
class UserScript(base.ScriptBaseWithConfig):
"""
Just some script you wrote.
"""
# argument description for the usage information
ARGS_HELP = "<arg_1>... <arg_n>"
# set your own version
VERSION = '1.0'
# (optionally) define your licensing
COPYRIGHT = u'Copyright (c) …'
def add_options(self):
""" Add program options.
"""
super(UserScript, self).add_options()
# basic options
##self.add_bool_option("-n", "--dry-run",
## help="don't do anything, just tell what would happen")
def mainloop(self):
""" The main loop.
"""
# Grab your magic wand
proxy = config.engine.open()
# Wave it
torrents = list(config.engine.items())
# Abracadabra
print "You have loaded %d torrents tracked by %d trackers." % (
len(torrents),
len(set(i.alias for i in torrents)),
)
self.LOG.info("XMLRPC stats: %s" % proxy)
if __name__ == "__main__":
base.ScriptBase.setup()
UserScript().run()
Another full example is the dynamic seed throttle script.
Note
If you wondered about the first line referring to a python-pyrocore
command, that is an alias the installation scripts create for
the Python interpreter of the pyrocore virtualenv. This way,
your script will always use the correct environment that actually
offers the right packages.
For simple calls, you can also use the rtxmlrpc
command on a shell
prompt, see Using ‘rtxmlrpc’ for that. For a reference of the rTorrent
XMLRPC interface, see rTorrent XMLRPC. Another common way to add your
own extensions is Defining Custom Fields, usable by rtcontrol
just
like built-in ones.
Interactive use in a Python shell¶
You can also access rTorrent interactively, like this:
>>> from pyrocore import connect
>>> rt = connect()
>>> len(set(i.tracker for i in rt.items()))
2
>>> rt.engine_software
'rTorrent 0.9.2/0.13.2'
>>> rt.uptime
1325.6771779060364
>>> proxy = rt.open()
>>> len(proxy.system.listMethods())
1033
Interactive use on a shell prompt¶
Besides connect
, there is another convenience function called view
.
If you install pythonpy into your pyrocore virtualenv,
that means you can do rtcontrol-like things with the full expressiveness of Python:
$ ~/.local/pyroscope/bin/pip -q install 'pythonpy'
$ ln -nfs ~/.local/pyroscope/bin/py ~/bin/rtpy
$ rtpy "[x.name for x in pyrocore.view('stopped') if x.size > 1.4*1024**3]"
robolinux64-mate3d-v9.3.iso
$ rtpy "sorted(x.name for x in pyrocore.view() if x.name.endswith('.iso'))"
Container Linux 1745.7.0.iso
debian-9.4.0-amd64-netinst.iso
debian-9.4.0-amd64-xfce-CD-1.iso
robolinux64-mate3d-v9.3.iso
$ rtpy "json.dumps(indent=4, sort_keys=True,
obj=[x.as_dict() for x in pyrocore.view() if 'robolinux' in x.name])"
[
{
"custom_m_alias": …
"name": "robolinux64-mate3d-v9.3.iso",
…
"size": 1527775232,
…
"up_total": 0
}
]
If you do not pass a view name, default
is assumed.
Using pyrocore
as a library in other projects¶
The example in the first section is an easy way to create user-defined
scripts. If you want to use pyrocore
’s features in another runtime
environment, you just have to load the configuration manually (what
pyrocore.scripts.base.ScriptBaseWithConfig
does for you otherwise).
# Details depend on the system you want to extend, of course
from some_system import plugin
from pyrocore import error
from pyrocore.util import load_config
def my_rtorrent_plugin():
""" Initialize plugin.
"""
try:
load_config.ConfigLoader().load()
except error.LoggableError as exc:
# Handle accordingly...
else:
# Do some other stuff...
plugin.register(my_rtorrent_plugin)
Example Scripts¶
Note
The following snippets are meant to be placed and executed within
the mainloop
of the script skeleton found in Introduction.
Accessing files in download items¶
To get all the files for several items at once, we combine
system.multicall
and f.multicall
to one big efficient mess.
from pprint import pprint, pformat
# The attributes we want to fetch
methods = [
"f.get_path",
"f.get_size_bytes",
"f.get_last_touched",
"f.get_priority",
"f.is_created",
"f.is_open",
]
# Build the multicall argument
f_calls = [method + '=' for method in methods]
calls = [{"methodName": "f.multicall", "params": [infohash, 0] + f_calls}
for infohash in self.args
]
# Make the calls
multicall = proxy.system.multicall
result = multicall(calls)
# Print the results
for infohash, (files,) in zip(self.args, result):
print ("~~~ %s [%d file(s)] " % (infohash, len(files))).ljust(78, '~')
pprint(files)
self.LOG.info("Multicall stats: %s" % multicall)
Core stats of active downloads¶
The rt-down-stats
script prints some statistics about currently active downloads,
particularly the range of expected arrival times.
It shows how nicely you can handle the result of config.engine.multicall
,
which is using Python’s namedtuple
under the hood,
based on a simple field list like this:
FIELDS = ('is_active', 'left_bytes', 'size_bytes', 'down.rate', 'priority')
MIN_STALLED_RATE = 5 * 1024
STALLED_PERCENT = 10
The first few lines of the mainloop
then use the multicall
helper method,
to make accessing the result list actually readable.
So instead of obscuring intent with numerical indexes or similar,
the actual names of the fetched attributes are used to access them.
def mainloop(self):
proxy = config.engine.open()
all_items = list(config.engine.multicall("incomplete", self.FIELDS))
pending = [d for d in all_items if not d.is_active and d.priority > 0]
print("Queued items: ",
fmt.human_size(sum(d.size_bytes for d in pending)),
'in', len(pending), 'item(s)',
'[{} free]'.format(fmt.human_size(disk_free(proxy.directory.default())).strip()))
items = [d for d in all_items if d.is_active]
if not items:
print("No active downloads!")
return
good_rates = [d.down_rate for d in items if d.down_rate > self.MIN_STALLED_RATE]
stalled_rate = max(
self.MIN_STALLED_RATE,
self.STALLED_PERCENT / 100 * sum(good_rates) / len(good_rates) if good_rates else 0)
stalled_count = sum(d.down_rate < stalled_rate for d in items)
global_down_rate = proxy.throttle.global_down.rate()
total_size = sum(d.size_bytes for d in items)
total_left = sum(d.left_bytes for d in items)
eta_list = [0]
if stalled_count < len(items):
eta_list = [d.left_bytes / d.down_rate for d in items if d.down_rate >= stalled_rate]
eta_max = total_left / (global_down_rate or 1)
stalled_info = ', {} stalled below {}/s'.format(
stalled_count, fmt.human_size(stalled_rate).strip()) if stalled_count else ''
print("Size left to download: ",
fmt.human_size(total_left), 'of', fmt.human_size(total_size).strip())
print("Overall download speed:", fmt.human_size(global_down_rate) + '/s')
print("ETA (min → max): ",
fmt_duration(min(eta_list)), '→', fmt_duration(eta_max), '…', fmt_duration(max(eta_list)),
'[{} item(s){}]'.format(len(items), stalled_info),
)
See the full rt-down-stats script for all the details. If you call it, this is what you get:
$ docs/examples/rt-down-stats.py -q
Size left to download: 997.0 MiB of 1.1 GiB
Overall download speed: 70.8 KiB/s
ETA (min / max): 3h 11m … 4h 40m [3 item(s)]
List Stuck Tracker Announces¶
The rt-stuck-trackers
script lists started items whose announces are stuck,
i.e. where last activity is older than the normal announce interval.
It shows how to use namedtuple
, as mentioned in the previous example,
on rTorrent entities other than download items
– in this case the tracker list of an item.
def mainloop(self):
import time
from urlparse import urlparse
from collections import namedtuple, Counter
from pyrobase import fmt
from pyrocore.util import xmlrpc
proxy = config.engine.open()
now = int(time.time())
fields = ('is_enabled is_busy url min_interval normal_interval'
' activity_time_last success_counter failed_counter scrape_counter').split()
t_multicall = namedtuple('multicall', fields)
rows = proxy.d.multicall('started', 'd.hash=', 't.multicall=,{}'.format(
','.join(['t.{}='.format(i) for i in fields])))
stuck = Counter()
view = 'tagged'
if self.options.to_tagged and view not in proxy.view.list():
proxy.view.add(xmlrpc.NOHASH, view)
print('{:>5s} {:>2s} {:>5s} {:>5s} {:>6s} {:>13s} {:40s} {}'
.format('S#', 'T#', 'OK', 'Error', 'Scrape', 'Last Announce',
'Infohash', 'Tracker Domain'))
for idx, (infohash, trackers) in enumerate(rows, 1):
trackers = [t_multicall(*t) for t in trackers]
if not any(t.is_enabled for t in trackers):
if self.options.stuck_only:
continue
if self.options.to_tagged:
proxy.d.views.push_back_unique(infohash, view)
proxy.view.set_visible(infohash, view)
domain = 'ALL TRACKERS DISABLED' if trackers else 'NO TRACKERS'
stuck[domain] += 1
print('{i:5d} {n:>2s} {n:>5s} {n:>5s} {n:>5s} {delta:>13s} {hash} {domain}'
.format(i=idx, n='-', hash=infohash, delta='N/A', domain=domain))
continue
for num, t in enumerate(trackers, 1):
if not t.is_enabled:
continue
delta = now - t.activity_time_last
if self.options.all or delta > t.normal_interval:
if self.options.to_tagged:
proxy.d.views.push_back_unique(infohash, view)
proxy.view.set_visible(infohash, view)
domain = urlparse(t.url).netloc.split(':')[0]
stuck[domain] += 1
print('{i:5d} {n:2d} '
'{t.success_counter:5d} {t.scrape_counter:5d} {t.failed_counter:5d} '
'{delta} {hash} {domain}'
.format(t=t, i=idx, n=num, hash=infohash, domain=domain,
delta=fmt.human_duration(t.activity_time_last,
precision=2, short=True)))
if sum(stuck.values()):
if self.options.to_tagged:
proxy.ui.current_view.set(view)
self.LOG.info("Stuck items: TOTAL={}, {}".format(sum(stuck.values()),
', '.join(['{}={}'.format(*i) for i in stuck.most_common()])))
self.LOG.debug("XMLRPC stats: %s" % proxy)
See the full rt-stuck-trackers script for all the details. If you call it, this is what you get:
$ docs/examples/rt-stuck-trackers.py -a
S# T# OK Error Scrape Last Announce Infohash Tracker Domain
1 1 180 4 0 53m 48s ago 00……………………FF tracker.example.com
INFO Stuck items: TOTAL=1, tracker.example.com=1
INFO Total time: 0.163 seconds.
The index shown is the item’s position in the started
view.
Writing Custom Jobs¶
First off, you really need to know a good amount of Python to be able to do this.
But if you do, you can easily add your own background processing,
more versatile and more efficient than calling rtcontrol
in a cron job.
The description here is terse, and mostly just tells you where to look for code examples,
and the basics of how a job implementation interacts with the core system.
Note
While some effort will be spent on keeping the API backwards compatible, there is no guarantee of a stable API. Follow the commit log and changelogs of releases to get notified when you need to adapt your code.
Jobs are created during pyrotorque
startup and registered with the scheduler.
Configuration is taken from the [TORQUE]
section of torque.ini
,
and any job.«job-name».«param-name»
setting contributes to a job named job-name
.
The handler
, schedule
, and active
settings are used by the core,
the rest is passed to the handler
class for customization and depends on the job type.
To locate the job implementation, handler
contains a module.path:ClassName
coordinate of its class.
So job.foo.handler = my.code:FooJob
registers FooJob
under the name foo
.
This means a job can be scheduled several times,
given the right configuration and if the job implementation is designed for it.
The given module must be importable of course,
i.e. pip install
it into your pyrocore
virtualenv.
The schedule
defines the call frequency of the job’s run
method,
and active
allows to easily disable a job without removing its configuration
– which is used to provide all the default jobs and their settings.
A job with active = False
is simply ignored and not added to the scheduler on startup.
The most simple of jobs is the EngineStats
one.
Click on the link and then on [source]
to see its source code.
Some noteworthy facts:
- the initializer gets passed a
config
parameter, holding all the settings fromtorque.ini
for a particular job instance, with thejob.«name»
prefix removed. pyrocore.config
is imported asconfig_ini
, to not clash with theconfig
dict passed into jobs.- create a
LOG
attribute as shown, for your logging needs. - to interact with rTorrent, open a proxy connection in
run
. - the
InfluxDB
job shows how to access config parameters, e.g.self.config.dbname
. - raise
UserError
in the initializer to report configuration mishaps and preventpyrotorque
from starting.
More complex jobs that you can look at are the
pyrocore.torrent.watch.TreeWatch
and
pyrocore.torrent.queue.QueueManager
ones.