Custom Python Code

You can write your own code for pyrocore implementing custom features, by adding fields, your own command line scripts, or pyrotorque jobs. You probably need a solid grasp of Python for this.

Defining Custom Fields

Introduction

As mentioned in the Configuration Guide, the config.py script can be used to add custom logic to your setup. The most common use for this file is adding custom fields.

To add user-defined fields you can put code describing them into your ~/.pyroscope/config.py file. You can then use your custom field just like any built-in one, e.g. issue a command like rtcontrol --from-view incomplete \* -qco partial_done,name (see below examples). They’re also listed when you call rtcontrol --help-fields.

Basic Custom Field Code

The following is the framework you need to add before putting in your field definitions:

def _custom_fields():
    """ Yield custom field definitions.
    """
    # Import some commonly needed modules
    from pyrocore.torrent import engine, matching
    from pyrocore.util import fmt, os

    # PUT CUSTOM FIELD CODE HERE

# Register our factory with the system
custom_field_factories.append(_custom_fields)

In place of the # PUT CUSTOM FIELD CODE HERE comment you can add any combination of the examples below, or your own code. Be sure to do so at the correct indent level, the example snippets are left-aligned and need to be indented by 4 spaces.

Custom Field Examples

Adding rTorrent fields not supported by default

# Add rTorrent attributes not available by default
def get_tracker_field(obj, name, aggregator=sum):
    "Get an aggregated tracker field."
    return aggregator(obj._engine._rpc.t.multicall(obj._fields["hash"], 0, "t.%s=" % name)[0])

yield engine.OnDemandField(int, "peers_connected", "number of connected peers", matcher=matching.FloatFilter)
yield engine.DynamicField(int, "downloaders", "number of completed downloads", matcher=matching.FloatFilter,
    accessor=lambda o: get_tracker_field(o, "scrape_downloaded"))
yield engine.DynamicField(int, "seeds", "number of seeds", matcher=matching.FloatFilter,
    accessor=lambda o: get_tracker_field(o, "scrape_complete"))
yield engine.DynamicField(int, "leeches", "number of leeches", matcher=matching.FloatFilter,
    accessor=lambda o: get_tracker_field(o, "scrape_incomplete"))
yield engine.DynamicField(engine.untyped, "lastscraped", "time of last scrape", matcher=matching.TimeFilter,
    accessor=lambda o: get_tracker_field(o, "scrape_time_last", max),
    formatter=lambda dt: fmt.human_duration(float(dt), precision=2, short=True))

# Add peer attributes not available by default
def get_peer_data(obj, name, aggregator=None):
    "Get some peer data via a multicall."
    aggregator = aggregator or (lambda _: _)
    result = obj._engine._rpc.p.multicall(obj._fields["hash"], 0, "p.%s=" % name)
    return aggregator([i[0] for i in result])

yield engine.DynamicField(set, "peers_ip", "list of IP addresses for connected peers",
    matcher=matching.TaggedAsFilter, formatter=", ".join,
    accessor=lambda o: set(get_peer_data(o, "address")))

Checking that certain files are present

# Add file checkers
def has_nfo(obj):
    "Check for .NFO file."
    pathname = obj.path
    if pathname and os.path.isdir(pathname):
        return any(i.lower().endswith(".nfo") for i in os.listdir(pathname))
    else:
        return False if pathname else None

def has_thumb(obj):
    "Check for folder.jpg file."
    pathname = obj.path
    if pathname and os.path.isdir(pathname):
        return any(i.lower() == "folder.jpg" for i in os.listdir(pathname))
    else:
        return False if pathname else None

yield engine.DynamicField(engine.untyped, "has_nfo", "does download have a .NFO file?",
    matcher=matching.BoolFilter, accessor=has_nfo,
    formatter=lambda val: "NFO" if val else "!DTA" if val is None else "----")
yield engine.DynamicField(engine.untyped, "has_thumb", "does download have a folder.jpg file?",
    matcher=matching.BoolFilter, accessor=has_thumb,
    formatter=lambda val: "THMB" if val else "!DTA" if val is None else "----")

Calculating information about partial downloads

Note that the partial_done value can be a little lower than it actually should be, when chunks shared by different files are not yet complete; but it will eventually reach 100 when all selected chunks are downloaded in full.

# Fields for partial downloads
def partial_info(obj, name):
    "Helper for partial download info"
    try:
        return obj._fields[name]
    except KeyError:
        f_attr = ["get_completed_chunks", "get_size_chunks", "get_range_first", "get_range_second"]
        chunk_size = obj.fetch("chunk_size")
        prev_chunk = -1
        size, completed, chunks = 0, 0, 0
        for f in obj._get_files(f_attr):
            if f.prio: # selected?
                shared = int(f.range_first == prev_chunk)
                size += f.size
                completed += f.completed_chunks - shared
                chunks += f.size_chunks - shared
                prev_chunk = f.range_second - 1

        obj._fields["partial_size"] = size
        obj._fields["partial_missing"] = (chunks - completed) * chunk_size
        obj._fields["partial_done"] = 100.0 * completed / chunks if chunks else 0.0

        return obj._fields[name]

yield engine.DynamicField(int, "partial_size", "bytes selected for download",
    matcher=matching.ByteSizeFilter,
    accessor=lambda o: partial_info(o, "partial_size"))
yield engine.DynamicField(int, "partial_missing", "bytes missing from selected chunks",
    matcher=matching.ByteSizeFilter,
    accessor=lambda o: partial_info(o, "partial_missing"))
yield engine.DynamicField(float, "partial_done", "percent complete of selected chunks",
    matcher=matching.FloatFilter,
    accessor=lambda o: partial_info(o, "partial_done"))

Extract TV data from item name

This defines the tv_series and tv_episode fields, that are non-empty when the item name follows the “usual” naming conventions. Try it using something like rtcontrol loaded=-2w traits=tv -co tv_series,tv_episode,name.

# Map name field to TV series name, if applicable, else an empty string
from pyrocore.util import traits

def tv_mapper(obj, name, templ):
    "Helper for TV name mapping"
    try:
        return obj._fields[name]
    except KeyError:
        itemname = obj.name
        result = ""

        kind, info = traits.name_trait(itemname, add_info=True)
        if kind == "tv":
            try:
                info["show"] = ' '.join([i.capitalize() for i in info["show"].replace('.',' ').replace('_',' ').split()])
                result = templ % info
            except KeyError, exc:
                #print exc
                pass

        obj._fields[name] = result
        return result

yield engine.DynamicField(fmt.to_unicode, "tv_series", "series name of a TV item",
    matcher=matching.PatternFilter, accessor= lambda o: tv_mapper(o, "tv_series", "%(show)s"))
yield engine.DynamicField(fmt.to_unicode, "tv_episode", "series name and episode number of a TV item",
    matcher=matching.PatternFilter, accessor= lambda o: tv_mapper(o, "tv_episode", "%(show)s.S%(season)sE%(episode)s"))

Only start items that you have disk space for

This example works together with rTorrent Queue Manager, so that only items that pass a disk space check are actually started. Meaning you can safely employ automatic downloading via e.g. RSS, without fear of a disk full situation when your disk space housekeeping fails.

It is also much more robust than a schedule using rTorrent’s close_low_diskspace command – pre-allocation of a started item still eats up disk space, and stopping an item shortly afterwards doesn’t fix that. Which means if a RSS feed constantly adds new auto-started items, your disk will fill up when using only such a schedule.

The first step is to add a custom field that checks whether an item has room on the target device. As with the other examples, place this in your config.py (read the 1st two sections, before the “Examples” one).

# Disk space check
def has_room(obj):
    "Check disk space."
    pathname = obj.path
    try:
        if pathname and not os.path.exists(pathname):
            pathname = os.path.dirname(pathname)
        if pathname and os.path.exists(pathname):
            stats = os.statvfs(pathname)
            return (stats.f_bavail * stats.f_frsize - int(diskspace_threshold_mb) * 1024**2
                > obj.size * (1.0 - obj.done / 100.0))
        else:
            return None
    except UnicodeEncodeError as exc:
        raise ValueError('Problematic filename %r: %s' % (pathname, exc))

yield engine.DynamicField(engine.untyped, "has_room",
    "check whether the download will fit on its target device",
    matcher=matching.BoolFilter, accessor=has_room,
    formatter=lambda val: "OK" if val else "??" if val is None else "NO")
globals().setdefault("diskspace_threshold_mb", "500")

And now, all you need is to add has_room=y to your job.queue.startable conditions in torque.ini. Done.

Important

To ensure that checking for free space works as expected, only start one item at a time, and enforce a delay after each start, so that disk space can be claimed before the next item’s check is performed.

In other words, use queue configuation values similar to these:

job.queue.schedule          = second=*/5
job.queue.start_at_once     = 1
job.queue.intermission      = 120

Note that you can set the threshold of space to keep free (in MiB) in the GLOBAL section of config.ini, and the default is 500MiB. You should keep your close_low_diskspace schedule for rTorrent as a fallback, and set diskspace_threshold_mb higher than the limit given there (so that normally, the low space check never triggers).

It’s a good idea to set diskspace_threshold_mb a good deal higher than the hard limit that close_low_diskspace enforces. That makes automatic downloading stop at the higher threshold, but leaves you with wiggle room for manual starting of important stuff that won’t be stopped just a moment later, as long as it fits into that gap between the two values.

Adding Custom Template Helpers

In templating contexts, there is an empty c namespace (think custom or config), just like h for helpers. You can populate that namespace with your own helpers as you need them, from simple string transformations to calling external programs or web interfaces.

The following example illustrates the concept, and belongs into ~/.pyroscope/config.py.

def _hostname(ip):
    """Helper to e.g. look up peer IPs."""
    import socket

    return socket.gethostbyaddr(ip)[0] if ip else ip

custom_template_helpers.hostname = _hostname

This demonstrates the call of that helper using a custom field, a real use-case would be to resolve peer IPs and the like.

$ rtcontrol -qo '{{d.fetch("custom_ip")}} → {{d.fetch("custom_ip") | c.hostname}}' // -/1
8.8.8.8 → google-public-dns-a.google.com

Writing Your Own Scripts

Introduction

The pyrocore Python package contains powerful helper classes that make remote access to rTorrent child’s play (see API Documentation). And your tools get the same Look & Feel like the built-in PyroScope commands, as long as you use the provided base class pyrocore.scripts.base.ScriptBaseWithConfig.

See for yourself:

#! /usr/bin/env python-pyrocore
# -*- coding: utf-8 -*-

# Enter the magic kingdom
from pyrocore import config
from pyrocore.scripts import base


class UserScript(base.ScriptBaseWithConfig):
    """
        Just some script you wrote.
    """

    # argument description for the usage information
    ARGS_HELP = "<arg_1>... <arg_n>"

    # set your own version
    VERSION = '1.0'

    # (optionally) define your licensing
    COPYRIGHT = u'Copyright (c) …'

    def add_options(self):
        """ Add program options.
        """
        super(UserScript, self).add_options()

        # basic options
        ##self.add_bool_option("-n", "--dry-run",
        ##    help="don't do anything, just tell what would happen")


    def mainloop(self):
        """ The main loop.
        """
        # Grab your magic wand
        proxy = config.engine.open()

        # Wave it
        torrents = list(config.engine.items())

        # Abracadabra
        print "You have loaded %d torrents tracked by %d trackers." % (
            len(torrents),
            len(set(i.alias for i in torrents)),
        )

        self.LOG.info("XMLRPC stats: %s" % proxy)


if __name__ == "__main__":
    base.ScriptBase.setup()
    UserScript().run()

Another full example is the dynamic seed throttle script.

Note

If you wondered about the first line referring to a python-pyrocore command, that is an alias the installation scripts create for the Python interpreter of the pyrocore virtualenv. This way, your script will always use the correct environment that actually offers the right packages.

For simple calls, you can also use the rtxmlrpc command on a shell prompt, see Using ‘rtxmlrpc’ for that. For a reference of the rTorrent XMLRPC interface, see rTorrent XMLRPC. Another common way to add your own extensions is Defining Custom Fields, usable by rtcontrol just like built-in ones.

Interactive use in a Python shell

You can also access rTorrent interactively, like this:

>>> from pyrocore import connect
>>> rt = connect()
>>> len(set(i.tracker for i in rt.items()))
2
>>> rt.engine_software
'rTorrent 0.9.2/0.13.2'
>>> rt.uptime
1325.6771779060364
>>> proxy = rt.open()
>>> len(proxy.system.listMethods())
1033

Interactive use on a shell prompt

Besides connect, there is another convenience function called view. If you install pythonpy into your pyrocore virtualenv, that means you can do rtcontrol-like things with the full expressiveness of Python:

$ ~/.local/pyroscope/bin/pip -q install 'pythonpy'
$ ln -nfs ~/.local/pyroscope/bin/py ~/bin/rtpy
$ rtpy "[x.name for x in pyrocore.view('stopped') if x.size > 1.4*1024**3]"
robolinux64-mate3d-v9.3.iso
$ rtpy "sorted(x.name for x in pyrocore.view() if x.name.endswith('.iso'))"
Container Linux 1745.7.0.iso
debian-9.4.0-amd64-netinst.iso
debian-9.4.0-amd64-xfce-CD-1.iso
robolinux64-mate3d-v9.3.iso
$ rtpy "json.dumps(indent=4, sort_keys=True,
        obj=[x.as_dict() for x in pyrocore.view() if 'robolinux' in x.name])"
[
    {
        "custom_m_alias": …
        "name": "robolinux64-mate3d-v9.3.iso",

        "size": 1527775232,

        "up_total": 0
    }
]

If you do not pass a view name, default is assumed.

Using pyrocore as a library in other projects

The example in the first section is an easy way to create user-defined scripts. If you want to use pyrocore’s features in another runtime environment, you just have to load the configuration manually (what pyrocore.scripts.base.ScriptBaseWithConfig does for you otherwise).

# Details depend on the system you want to extend, of course
from some_system import plugin
from pyrocore import error
from pyrocore.util import load_config

def my_rtorrent_plugin():
    """ Initialize plugin.
    """
    try:
        load_config.ConfigLoader().load()
    except error.LoggableError as exc:
        # Handle accordingly...
    else:
        # Do some other stuff...

plugin.register(my_rtorrent_plugin)

Example Scripts

Note

The following snippets are meant to be placed and executed within the mainloop of the script skeleton found in Introduction.

Accessing files in download items

To get all the files for several items at once, we combine system.multicall and f.multicall to one big efficient mess.

from pprint import pprint, pformat

# The attributes we want to fetch
methods = [
    "f.get_path",
    "f.get_size_bytes",
    "f.get_last_touched",
    "f.get_priority",
    "f.is_created",
    "f.is_open",
]

# Build the multicall argument
f_calls = [method + '=' for method in methods]
calls = [{"methodName": "f.multicall", "params": [infohash, 0] + f_calls}
    for infohash in self.args
]

# Make the calls
multicall = proxy.system.multicall
result = multicall(calls)

# Print the results
for infohash, (files,) in zip(self.args, result):
    print ("~~~ %s [%d file(s)] " % (infohash, len(files))).ljust(78, '~')
    pprint(files)
self.LOG.info("Multicall stats: %s" % multicall)

Core stats of active downloads

The rt-down-stats script prints some statistics about currently active downloads, particularly the range of expected arrival times.

It shows how nicely you can handle the result of config.engine.multicall, which is using Python’s namedtuple under the hood, based on a simple field list like this:


    FIELDS = ('is_active', 'left_bytes', 'size_bytes', 'down.rate', 'priority')
    MIN_STALLED_RATE = 5 * 1024
    STALLED_PERCENT = 10

The first few lines of the mainloop then use the multicall helper method, to make accessing the result list actually readable. So instead of obscuring intent with numerical indexes or similar, the actual names of the fetched attributes are used to access them.

    def mainloop(self):
        proxy = config.engine.open()
        all_items = list(config.engine.multicall("incomplete", self.FIELDS))

        pending = [d for d in all_items if not d.is_active and d.priority > 0]
        print("Queued items:          ",
            fmt.human_size(sum(d.size_bytes for d in pending)),
            'in', len(pending), 'item(s)',
            '[{} free]'.format(fmt.human_size(disk_free(proxy.directory.default())).strip()))

        items = [d for d in all_items if d.is_active]
        if not items:
            print("No active downloads!")
            return

        good_rates = [d.down_rate for d in items if d.down_rate > self.MIN_STALLED_RATE]
        stalled_rate = max(
            self.MIN_STALLED_RATE,
            self.STALLED_PERCENT / 100 * sum(good_rates) / len(good_rates) if good_rates else 0)
        stalled_count = sum(d.down_rate < stalled_rate for d in items)
        global_down_rate = proxy.throttle.global_down.rate()

        total_size = sum(d.size_bytes for d in items)
        total_left = sum(d.left_bytes for d in items)
        eta_list = [0]
        if stalled_count < len(items):
            eta_list = [d.left_bytes / d.down_rate for d in items if d.down_rate >= stalled_rate]
        eta_max = total_left / (global_down_rate or 1)

        stalled_info = ', {} stalled below {}/s'.format(
            stalled_count, fmt.human_size(stalled_rate).strip()) if stalled_count else ''
        print("Size left to download: ",
            fmt.human_size(total_left), 'of', fmt.human_size(total_size).strip())
        print("Overall download speed:", fmt.human_size(global_down_rate) + '/s')
        print("ETA (min → max):       ",
            fmt_duration(min(eta_list)), '→', fmt_duration(eta_max), '…', fmt_duration(max(eta_list)),
            '[{} item(s){}]'.format(len(items), stalled_info),
        )

See the full rt-down-stats script for all the details. If you call it, this is what you get:

$ docs/examples/rt-down-stats.py -q
Size left to download:   997.0 MiB of 1.1 GiB
Overall download speed:   70.8 KiB/s
ETA (min / max):        3h 11m … 4h 40m [3 item(s)]

List Stuck Tracker Announces

The rt-stuck-trackers script lists started items whose announces are stuck, i.e. where last activity is older than the normal announce interval.

It shows how to use namedtuple, as mentioned in the previous example, on rTorrent entities other than download items – in this case the tracker list of an item.

    def mainloop(self):
        import time
        from urlparse import urlparse
        from collections import namedtuple, Counter

        from pyrobase import fmt
        from pyrocore.util import xmlrpc

        proxy = config.engine.open()
        now = int(time.time())
        fields = ('is_enabled is_busy url min_interval normal_interval'
                  ' activity_time_last success_counter failed_counter scrape_counter').split()
        t_multicall = namedtuple('multicall', fields)
        rows = proxy.d.multicall('started', 'd.hash=', 't.multicall=,{}'.format(
            ','.join(['t.{}='.format(i) for i in fields])))
        stuck = Counter()

        view = 'tagged'
        if self.options.to_tagged and view not in proxy.view.list():
            proxy.view.add(xmlrpc.NOHASH, view)

        print('{:>5s}  {:>2s}  {:>5s}  {:>5s} {:>6s}  {:>13s}  {:40s}  {}'
              .format('S#', 'T#', 'OK', 'Error', 'Scrape', 'Last Announce',
                      'Infohash', 'Tracker Domain'))
        for idx, (infohash, trackers) in enumerate(rows, 1):
            trackers = [t_multicall(*t) for t in trackers]

            if not any(t.is_enabled for t in trackers):
                if self.options.stuck_only:
                    continue
                if self.options.to_tagged:
                    proxy.d.views.push_back_unique(infohash, view)
                    proxy.view.set_visible(infohash, view)
                domain = 'ALL TRACKERS DISABLED' if trackers else 'NO TRACKERS'
                stuck[domain] += 1
                print('{i:5d}  {n:>2s}  {n:>5s}  {n:>5s}  {n:>5s}  {delta:>13s}  {hash}  {domain}'
                      .format(i=idx, n='-', hash=infohash, delta='N/A', domain=domain))
                continue

            for num, t in enumerate(trackers, 1):
                if not t.is_enabled:
                    continue

                delta = now - t.activity_time_last
                if self.options.all or delta > t.normal_interval:
                    if self.options.to_tagged:
                        proxy.d.views.push_back_unique(infohash, view)
                        proxy.view.set_visible(infohash, view)
                    domain = urlparse(t.url).netloc.split(':')[0]
                    stuck[domain] += 1

                    print('{i:5d}  {n:2d}  '
                          '{t.success_counter:5d}  {t.scrape_counter:5d}  {t.failed_counter:5d}  '
                          '{delta}  {hash}  {domain}'
                          .format(t=t, i=idx, n=num, hash=infohash, domain=domain,
                                  delta=fmt.human_duration(t.activity_time_last,
                                                           precision=2, short=True)))

        if sum(stuck.values()):
            if self.options.to_tagged:
                proxy.ui.current_view.set(view)
            self.LOG.info("Stuck items: TOTAL={}, {}".format(sum(stuck.values()),
                ', '.join(['{}={}'.format(*i) for i in stuck.most_common()])))
        self.LOG.debug("XMLRPC stats: %s" % proxy)

See the full rt-stuck-trackers script for all the details. If you call it, this is what you get:

$ docs/examples/rt-stuck-trackers.py -a
   S#  T#     OK  Error Scrape  Last Announce  Infohash      Tracker Domain
    1   1    180      4      0    53m 48s ago  00……………………FF  tracker.example.com
INFO     Stuck items: TOTAL=1, tracker.example.com=1
INFO     Total time: 0.163 seconds.

The index shown is the item’s position in the started view.

Writing Custom Jobs

First off, you really need to know a good amount of Python to be able to do this. But if you do, you can easily add your own background processing, more versatile and more efficient than calling rtcontrol in a cron job. The description here is terse, and mostly just tells you where to look for code examples, and the basics of how a job implementation interacts with the core system.

Note

While some effort will be spent on keeping the API backwards compatible, there is no guarantee of a stable API. Follow the commit log and changelogs of releases to get notified when you need to adapt your code.

Jobs are created during pyrotorque startup and registered with the scheduler. Configuration is taken from the [TORQUE] section of torque.ini, and any job.«job-name».«param-name» setting contributes to a job named job-name. The handler, schedule, and active settings are used by the core, the rest is passed to the handler class for customization and depends on the job type.

To locate the job implementation, handler contains a module.path:ClassName coordinate of its class. So job.foo.handler = my.code:FooJob registers FooJob under the name foo. This means a job can be scheduled several times, given the right configuration and if the job implementation is designed for it. The given module must be importable of course, i.e. pip install it into your pyrocore virtualenv.

The schedule defines the call frequency of the job’s run method, and active allows to easily disable a job without removing its configuration – which is used to provide all the default jobs and their settings. A job with active = False is simply ignored and not added to the scheduler on startup.

The most simple of jobs is the EngineStats one. Click on the link and then on [source] to see its source code. Some noteworthy facts:

  • the initializer gets passed a config parameter, holding all the settings from torque.ini for a particular job instance, with the job.«name» prefix removed.
  • pyrocore.config is imported as config_ini, to not clash with the config dict passed into jobs.
  • create a LOG attribute as shown, for your logging needs.
  • to interact with rTorrent, open a proxy connection in run.
  • the InfluxDB job shows how to access config parameters, e.g. self.config.dbname.
  • raise UserError in the initializer to report configuration mishaps and prevent pyrotorque from starting.

More complex jobs that you can look at are the pyrocore.torrent.watch.TreeWatch and pyrocore.torrent.queue.QueueManager ones.