Source code for pyrocore.util.matching

# -*- coding: utf-8 -*-
# pylint: disable=
""" Torrent Item Filters.

    Copyright (c) 2009, 2010, 2011 The PyroScope Project <>
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License along
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
import re
import time
import shlex
import fnmatch
import operator

from pyrocore import error, config
from pyrocore.util import fmt, pymagic

log = pymagic.get_lazy_logger(__name__)

TRUE = set(("true", "t", "yes", "y", "1", "+",))
FALSE = set(("false", "f", "no", "n", "0", "-",))

[docs]def truth(val, context): """ Convert truth value in "val" to a boolean. """ try: 0 + val except TypeError: lower_val = val.lower() if lower_val in TRUE: return True elif lower_val in FALSE: return False else: raise FilterError("Bad boolean value %r in %r (expected one of '%s', or '%s')" % ( val, context, "' '".join(TRUE), "' '".join(FALSE) )) else: return bool(val)
def _time_ym_delta(timestamp, delta, months): """ Helper to add a year or month delta to a timestamp. """ timestamp = list(time.localtime(timestamp)) timestamp[int(months)] += delta return time.mktime(timestamp)
[docs]def unquote_pre_filter(pre_filter, _regex=re.compile(r'[\\]+')): """ Unquote a pre-filter condition. """ if pre_filter.startswith('"') and pre_filter.endswith('"'): # Unquote outer level pre_filter = pre_filter[1:-1] pre_filter = _regex.sub(lambda x:[:len( // 2], pre_filter) return pre_filter
[docs]class FilterError(error.UserError): """ (Syntax) error in filter. """
[docs]class Filter(object): """ Base class for all filters. """
[docs] def pre_filter(self): # pylint: disable=no-self-use """ Return rTorrent condition to speed up data transfer. """ return ''
[docs] def match(self, item): """ Return True if filter matches item. """ raise NotImplementedError()
def __call__(self, item): return self.match(item)
[docs]class CompoundFilterBase(Filter, list): """ List of filters. """
[docs]class CompoundFilterAll(CompoundFilterBase): """ List of filters that must all match (AND). """ def __str__(self): return ' '.join(str(i) for i in self)
[docs] def pre_filter(self): """ Return rTorrent condition to speed up data transfer. """ if len(self) == 1: return self[0].pre_filter() else: result = [x.pre_filter() for x in self if not isinstance(x, CompoundFilterBase)] result = [x for x in result if x] if result: if int(config.fast_query) == 1: return result[0] # using just one simple expression is safer else: # TODO: make this purely value-based (…) return 'and={%s}' % ','.join(result) return ''
[docs] def match(self, item): """ Return True if filter matches item. """ return all(i.match(item) for i in self)
[docs]class CompoundFilterAny(CompoundFilterBase): """ List of filters where at least one must match (OR). """ def __str__(self): if all(isinstance(i, FieldFilter) for i in self) and len(set(i._name for i in self)) == 1: return "%s,%s" % (str(self[0]), ','.join(i._condition for i in self[1:])) else: return "[ %s ]" % ' OR '.join(str(i) for i in self)
[docs] def pre_filter(self): """ Return rTorrent condition to speed up data transfer. """ if len(self) == 1: return self[0].pre_filter() # TODO: Find a safe way to do 'or' expressions return ''
[docs] def match(self, item): """ Return True if filter matches item. """ return any(i.match(item) for i in self)
[docs]class NegateFilter(Filter): """ Negate result of another filter (NOT). """ def __init__(self, inner): self._inner = inner def __str__(self): if isinstance(self._inner, FieldFilter): return "%s=!%s" % tuple(str(self._inner).split('=', 1)) elif isinstance(self._inner, CompoundFilterBase): return "[ NOT [ %s ] ]" % str(self._inner) else: return "[ NOT %s ]" % str(self._inner)
[docs] def pre_filter(self): """ Return rTorrent condition to speed up data transfer. """ inner = self._inner.pre_filter() if inner: if inner.startswith('"not=$') and inner.endswith('"') and '\\' not in inner: return inner[6:-1] # double negation, return inner command elif inner.startswith('"'): inner = '"$' + inner[1:] else: inner = '$' + inner return 'not=' + inner else: return ''
[docs] def match(self, item): """ Return True if filter matches item. """ return not self._inner.match(item)
[docs]class FieldFilter(Filter): """ Base class for all field filters. """ PRE_FILTER_FIELDS = dict( # alias="", hash="d.hash=", name="", message="d.message=", metafile="d.tied_to_file=", path="d.base_path=", # realpath="=", throttle="d.throttle_name=", # tracker="=", is_active="d.is_active=", is_complete="d.complete=", is_ignored="d.ignore_commands=", is_multi_file="d.is_multi_file=", is_open="d.is_open=", # done="=", down="d.down.rate=", # fno="=", prio="d.priority=", ratio="d.ratio=", size="d.size_bytes=", up="d.up.rate=", uploaded="", completed="d.custom=tm_completed", loaded="d.custom=tm_loaded", started="d.custom=tm_started", # stopped="", custom_tm_completed="d.custom=tm_completed", custom_tm_loaded="d.custom=tm_loaded", custom_tm_started="d.custom=tm_started", # XXX: bad result: rtcontrol -Q2 -o- -v tagged='!'new,foo # use a 'd.is_tagged=tag' command? tagged="d.custom=tags", ) #active last time a peer was connected #directory directory containing download data #files list of files in this item #is_ghost has no data file or directory? #is_private private flag set (no DHT/PEX)? #leechtime time taken from start to completion #seedtime total seeding time after completion #traits automatic classification of this item (audio, video, tv, movie, etc.) #views views this item is attached to #xfer transfer rate def __init__(self, name, value): """ Store field name and filter value for later evaluations. """ self._name = name self._condition = self._value = fmt.to_unicode(value) self.validate() def __str__(self): return fmt.to_utf8("%s=%s" % (self._name, self._condition))
[docs] def validate(self): """ Validate filter condition (template method). """
[docs]class EqualsFilter(FieldFilter): """ Filter fields equal to the given value. """
[docs] def match(self, item): """ Return True if filter matches item. """ result = self._value == getattr(item, self._name) #log.debug("%r for %r = %r, name %r, item %r" % ( # result, getattr(item, self._name), self._value, self._name, item)) return result
[docs]class PatternFilter(FieldFilter): """ Case-insensitive pattern filter, either a glob or a /regex/ pattern. """ CLEAN_PRE_VAL_RE = re.compile(r'(?:\[.*?]\])|(?:\(.*?]\))|(?:{.*?]})|(?:\\)') SPLIT_PRE_VAL_RE = re.compile(r'[^a-zA-Z0-9/_]+') SPLIT_PRE_GLOB_RE = re.compile(r'[?*]+')
[docs] def validate(self): """ Validate filter condition (template method). """ super(PatternFilter, self).validate() self._value = self._value.lower() self._is_regex = self._value.startswith('/') and self._value.endswith('/') if self._is_regex: self._matcher = re.compile(self._value[1:-1]).search else: self._matcher = lambda val: fnmatch.fnmatchcase(val, self._value)
[docs] def pre_filter(self): """ Return rTorrent condition to speed up data transfer. """ if self._name not in self.PRE_FILTER_FIELDS: return '' if not self._value: return '"equal={},cat="'.format(self.PRE_FILTER_FIELDS[self._name]) if self._is_regex: needle = self._value[1:-1] needle = self.CLEAN_PRE_VAL_RE.sub(' ', needle) needle = self.SPLIT_PRE_VAL_RE.split(needle) else: needle = self.CLEAN_PRE_VAL_RE.sub(' ', self._value) needle = self.SPLIT_PRE_GLOB_RE.split(needle) needle = list(sorted(needle, key=len))[-1] if needle: try: needle.encode('ascii') except UnicodeEncodeError: return '' else: return '"string.contains_i=${},{}"'.format( self.PRE_FILTER_FIELDS[self._name], needle) return ''
[docs] def match(self, item): """ Return True if filter matches item. """ val = (getattr(item, self._name) or '').lower() #log.debug("%r for %r ~ %r, name %r, item %r" % ( # self._matcher(val), val, self._value, self._name, item)) return self._matcher(val)
[docs]class FilesFilter(PatternFilter): """ Case-insensitive pattern filter on filenames in a torrent. """
[docs] def match(self, item): """ Return True if filter matches item. """ val = getattr(item, self._name) if val is not None: for fileinfo in val: if fnmatch.fnmatchcase(fileinfo.path.lower(), self._value): return True return False
[docs]class TaggedAsFilter(FieldFilter): """ Case-insensitive tags filter. Tag fields are white-space separated lists of tags. """
[docs] def pre_filter(self): """ Return rTorrent condition to speed up data transfer. """ if self._name in self.PRE_FILTER_FIELDS: if not self._value: return '"not=${}"'.format(self.PRE_FILTER_FIELDS[self._name]) else: val = self._value if self._exact: val = val.copy().pop() return '"string.contains_i=${},{}"'.format( self.PRE_FILTER_FIELDS[self._name], val) return ''
[docs] def validate(self): """ Validate filter condition (template method). """ super(TaggedAsFilter, self).validate() self._value = self._value.lower() # If the tag starts with '=', test on equality (just this tag, no others) if self._value.startswith('='): self._exact = True self._value = self._value[1:] else: self._exact = not self._value # For exact matches, value is the set to compare to if self._exact: # Empty tag means empty set, not set of one empty string self._value = set((self._value,)) if self._value else set()
[docs] def match(self, item): """ Return True if filter matches item. """ tags = getattr(item, self._name) or [] if self._exact: # Equality check return self._value == set(tags) else: # Is given tag in list? return self._value in tags
[docs]class BoolFilter(FieldFilter): """ Filter boolean values. """
[docs] def pre_filter(self): """ Return rTorrent condition to speed up data transfer. """ if self._name in self.PRE_FILTER_FIELDS: return '"equal={},value={}"'.format( self.PRE_FILTER_FIELDS[self._name], int(self._value)) return ''
[docs] def validate(self): """ Validate filter condition (template method). """ super(BoolFilter, self).validate() self._value = truth(str(self._value), self._condition) self._condition = "yes" if self._value else "no"
[docs] def match(self, item): """ Return True if filter matches item. """ val = getattr(item, self._name) or False return bool(val) is self._value
[docs]class NumericFilterBase(FieldFilter): """ Base class for numerical value filters. """
[docs] def validate(self): """ Validate filter condition (template method). """ super(NumericFilterBase, self).validate() self.not_null = False if self._value.startswith('+'): self._cmp = self._rt_cmp = 'greater' self._value = self._value[1:] elif self._value.startswith('-'): self._cmp = self._rt_cmp = 'less' self._value = self._value[1:] else: self._cmp = operator.eq self._rt_cmp = 'equal'
[docs] def match(self, item): """ Return True if filter matches item. """ if 0 and getattr(item, self._name): print "%r %r %r %r %r %r" % (self._cmp(float(getattr(item, self._name)), self._value), self._name, self._condition,, getattr(item, self._name), self._value) val = getattr(item, self._name) or 0 if self.not_null and self._value and not val: return False else: return self._cmp(float(val), self._value)
[docs]class FloatFilter(NumericFilterBase): """ Filter float values. """ FIELD_SCALE = dict( ratio=1000, )
[docs] def pre_filter(self): """ Return rTorrent condition to speed up data transfer. """ if self._name in self.PRE_FILTER_FIELDS: val = int(self._value * self.FIELD_SCALE.get(self._name, 1)) return '"{}=value=${},value={}"'.format( self._rt_cmp, self.PRE_FILTER_FIELDS[self._name], val) return ''
[docs] def validate(self): """ Validate filter condition (template method). """ super(FloatFilter, self).validate() try: self._value = float(self._value) except (ValueError, TypeError), exc: raise FilterError("Bad numerical value %r in %r (%s)" % (self._value, self._condition, exc))
[docs]class TimeFilter(NumericFilterBase): """ Filter UNIX timestamp values. """ TIMEDELTA_UNITS = dict( y=lambda t, d: _time_ym_delta(t, -d, False), m=lambda t, d: _time_ym_delta(t, -d, True), w=lambda t, d: t - d * 7 * 86400, d=lambda t, d: t - d * 86400, h=lambda t, d: t - d * 3600, i=lambda t, d: t - d * 60, s=lambda t, d: t - d, ) TIMEDELTA_RE = re.compile("^%s$" % ''.join( r"(?:(?P<%s>\d+)[%s%s])?" % (i, i, i.upper()) for i in "ymwdhis" ))
[docs] def pre_filter(self): """ Return rTorrent condition to speed up data transfer. """ if self._name in self.PRE_FILTER_FIELDS: # Adding a day of fuzz to avoid any possible timezone problems timestamp = self._value + ( -86400 if self._rt_cmp == 'greater' else 86400 if self._rt_cmp == 'less' else 0) return '"{}=value=${},value={}"'.format( self._rt_cmp, self.PRE_FILTER_FIELDS[self._name], int(timestamp)) return ''
[docs] def validate_time(self, duration=False): """ Validate filter condition (template method) for timestamps and durations. """ super(TimeFilter, self).validate() timestamp = now = time.time() if str(self._value).isdigit(): # Literal UNIX timestamp try: timestamp = float(self._value) except (ValueError, TypeError), exc: raise FilterError("Bad timestamp value %r in %r (%s)" % (self._value, self._condition, exc)) else: # Something human readable delta = self.TIMEDELTA_RE.match(self._value) ##print self.TIMEDELTA_RE.pattern if delta: # Time delta for unit, val in delta.groupdict().items(): if val: timestamp = self.TIMEDELTA_UNITS[unit](timestamp, int(val, 10)) if duration: timestamp = now - timestamp else: # Invert logic for time deltas (+ = older; - = within the delta range) if self._cmp == self._cmp = self._rt_cmp = 'greater' elif self._cmp == self._cmp = self._rt_cmp = 'less' else: # Assume it's an absolute date if '/' in self._value: # U.S. dtfmt = "%m/%d/%Y" elif '.' in self._value: # European dtfmt = "%d.%m.%Y" else: # Fall back to ISO dtfmt = "%Y-%m-%d" val = str(self._value).upper().replace(' ', 'T') if 'T' in val: # Time also given dtfmt += "T%H:%M:%S"[:3+3*val.count(':')] try: timestamp = time.mktime(time.strptime(val, dtfmt)) except (ValueError), exc: raise FilterError("Bad timestamp value %r in %r (%s)" % (self._value, self._condition, exc)) if duration: timestamp -= now self._value = timestamp
##print time.time() - self._value ##print time.localtime(time.time()) ##print time.localtime(self._value)
[docs] def validate(self): """ Validate filter condition (template method). """ self.validate_time(duration=False)
[docs]class TimeFilterNotNull(TimeFilter): """ Filter UNIX timestamp values, ignore unset values unless compared to 0. """
[docs] def validate(self): """ Validate filter condition (template method). """ super(TimeFilterNotNull, self).validate() self.not_null = True
[docs]class DurationFilter(TimeFilter): """ Filter durations in seconds. """
[docs] def validate(self): """ Validate filter condition (template method). """ super(DurationFilter, self).validate_time(duration=True)
[docs] def match(self, item): """ Return True if filter matches item. """ if getattr(item, self._name) is None: # Never match "N/A" items, except when "-0" was specified return False if self._value else self._cmp(-1, 0) else: return super(DurationFilter, self).match(item)
[docs]class ByteSizeFilter(NumericFilterBase): """ Filter size and bandwidth values. """ UNITS = dict(b=1, k=1024, m=1024**2, g=1024**3)
[docs] def pre_filter(self): """ Return rTorrent condition to speed up data transfer. """ if self._name in self.PRE_FILTER_FIELDS: return '"{}={},value={}"'.format( self._rt_cmp, self.PRE_FILTER_FIELDS[self._name], int(self._value)) return ''
[docs] def validate(self): """ Validate filter condition (template method). """ super(ByteSizeFilter, self).validate() # Get scale lower_val = str(self._value).lower() if any(lower_val.endswith(i) for i in self.UNITS): scale = self.UNITS[lower_val[-1]] self._value = self._value[:-1] else: scale = 1 # Get float value try: self._value = float(self._value) except (ValueError, TypeError), exc: raise FilterError("Bad numerical value %r in %r (%s)" % (self._value, self._condition, exc)) # Scale to bytes self._value = self._value * scale
[docs]class MagicFilter(FieldFilter): """ Filter that looks at the comparison value and automatically decides what type of filter to use. """
[docs] def validate(self): """ Validate filter condition (template method). """ val = self._condition.lower() if val and val[0] in "+-": val = val[1:] matcher = PatternFilter if not val or val.startswith('/') and val.endswith('/'): pass elif val.replace('.', '0').isdigit(): matcher = FloatFilter elif self._condition in (TRUE | FALSE): matcher = BoolFilter elif any(val.endswith(i) for i in ByteSizeFilter.UNITS) and val[:-1].isdigit(): matcher = ByteSizeFilter elif TimeFilter.TIMEDELTA_RE.match(val): matcher = TimeFilter self._inner = matcher(self._name, self._condition)
[docs] def match(self, item): """ Return True if filter matches item. """ return self._inner.match(item)
[docs]class ConditionParser(object): """ Filter condition parser. """ COMPARISON_OPS = { "<": "-%s", "<=": "!+%s", ">": "+%s", ">=": "!-%s", "<>": "!%s", "!=": "!%s", "~": "/%s/", } @classmethod
[docs] def AMENABLE(cls, _): """ Prefined lookup mode for typeless access to any field name. """ return {"matcher": MagicFilter}
def __init__(self, lookup, default_field=None, ident_re=r"[_A-Za-z][_A-Za-z0-9]*"): """ Initialize parser. The C{lookup} callback takes a C{name} argument and returns a dict describing that field, or None for an unknown field. If a dict is returned, the "matcher" key is supposed to be a C{Filter} instance; if it's None or missing, the field is not comparable. @param lookup: Field definition lookup callable. @param default_field: Optional default field name for conditions without an operator. @param ident_re: Regex describing valid field names. """ self.lookup = lookup self.default_field = default_field self.ident_re = ident_re def _create_filter(self, condition): """ Create a filter object from a textual condition. """ # "Normal" comparison operators? comparison = re.match(r"^(%s)(<[>=]?|>=?|!=|~)(.*)$" % self.ident_re, condition) if comparison: name, comparison, values = comparison.groups() if values and values[0] in "+-": raise FilterError("Comparison operator cannot be followed by '%s' in '%s'" % (values[0], condition)) values = self.COMPARISON_OPS[comparison] % values else: # Split name from value(s) try: name, values = condition.split('=', 1) except ValueError: if self.default_field: name, values = self.default_field, condition else: raise FilterError("Field name missing in '%s' (expected '=')" % condition) # Try to find field definition field = self.lookup(name) if not field: raise FilterError("Unknown field %r in %r" % (name, condition)) if field.get("matcher") is None: raise FilterError("Field %r cannot be used as a filter" % (name,)) # Make filters from values filters = [] for value in values.split(','): wrapper = None if value.startswith('!'): wrapper = NegateFilter value = value[1:] field_matcher = field["matcher"](name, value) filters.append(wrapper(field_matcher) if wrapper else field_matcher) # Return filters return CompoundFilterAny(filters) if len(filters) > 1 else filters[0] def _tree2str(self, tree, root=True): """ Convert parsed condition tree back to a (printable) string. """ try: # Keep strings as they are return '' + tree except (TypeError, ValueError): flat = ' '.join(self._tree2str(i, root=False) for i in tree) return flat if root else "[ %s ]" % flat
[docs] def parse(self, conditions): """ Parse filter conditions. @param conditions: multiple conditions. @type conditions: list or str """ conditions_text = conditions try: conditions = shlex.split(fmt.to_utf8(conditions)) except AttributeError: # Not a string, assume parsed tree conditions_text = self._tree2str(conditions) # Empty list? if not conditions: raise FilterError("No conditions given at all!") # NOT *must* appear at the start of a group negate = conditions[:1] == ["NOT"] if negate: conditions = conditions[1:] if not conditions: raise FilterError("NOT must be followed by some conditions!") # Handle grouping if '[' in conditions: tree = [[]] for term in conditions: if term == '[': tree.append([]) # new grouping elif term == ']': subtree = tree.pop() if not tree: raise FilterError("Unbalanced brackets, too many closing ']' in condition %r" % (conditions_text,)) tree[-1].append(subtree) # append finished group to containing level else: tree[-1].append(term) # append to current level if len(tree) > 1: raise FilterError("Unbalanced brackets, too many open '[' in condition %r" % (conditions_text,)) conditions = tree[0] # Prepare root matcher conditions = list(conditions) matcher = CompoundFilterAll() if "OR" in conditions: root = CompoundFilterAny() root.append(matcher) else: root = matcher # Go through conditions and parse them for condition in conditions: if condition == "OR": # Leading OR, or OR OR in sequence? if not matcher: raise FilterError("Left-hand side of OR missing in %r!" % (conditions_text,)) # Start next run of AND conditions matcher = CompoundFilterAll() root.append(matcher) elif isinstance(condition, list): matcher.append(self.parse(condition)) else: matcher.append(self._create_filter(condition)) # Trailing OR? if not matcher: raise FilterError("Right-hand side of OR missing in %r!" % (conditions_text,)) return NegateFilter(root) if negate else root