Source code for webgrid.filters

from __future__ import absolute_import
import calendar
import datetime as dt
from decimal import Decimal as D
import inspect

from blazeutils import tolist
from blazeutils.dates import ensure_date, ensure_datetime
from dateutil.parser import parse
from dateutil.relativedelta import relativedelta, SU
from sqlalchemy.sql import or_, and_
import sqlalchemy as sa
import six
from werkzeug.datastructures import ImmutableDict

from .extensions import (
    gettext,
    lazy_gettext as _
)
from . import types, validators

try:
    import arrow
except ImportError:
    arrow = None
try:
    from sqlalchemy_utils.types import ArrowType
except ImportError:
    ArrowType = None


class UnrecognizedOperator(ValueError):
    pass


[docs]class Operator(object): """Filter operator representing name and potential inputs. See webgrid.filters.ops for a collection of predefined operators. Args: key (str): Internal identifier to be used in code and in request args. display (str): Label for rendering the operator. field_type (str): Input field spec. Can be: - None: no input fields - input: single freeform text input - 2inputs: two freeform text inputs - select: single select box - select+input: select box as first input, freeform text as second hint (str, optional): Input field hint to show in UI. Defaults to None. """ def __init__(self, key, display, field_type, hint=None): self.key = key self.display = display self.field_type = field_type self.hint = hint def __eq__(self, other): return self.key == getattr(other, 'key', other) def __hash__(self): return hash(self.key)
class ops(object): eq = Operator('eq', _('is'), 'input') not_eq = Operator('!eq', _('is not'), 'input') is_ = Operator('is', _('is'), 'select') not_is = Operator('!is', _('is not'), 'select') empty = Operator('empty', _('empty'), None) not_empty = Operator('!empty', _('not empty'), None) contains = Operator('contains', _('contains'), 'input') not_contains = Operator('!contains', _('doesn\'t contain'), 'input') less_than_equal = Operator('lte', _('less than or equal'), 'input') greater_than_equal = Operator('gte', _('greater than or equal'), 'input') between = Operator('between', _('between'), '2inputs') not_between = Operator('!between', _('not between'), '2inputs') in_past = Operator('past', _('in the past'), None) in_future = Operator('future', _('in the future'), None) days_ago = Operator('da', _('days ago'), 'input', 'days') less_than_days_ago = Operator('ltda', _('less than days ago'), 'input', 'days') more_than_days_ago = Operator('mtda', _('more than days ago'), 'input', 'days') today = Operator('today', _('today'), None) this_week = Operator('thisweek', _('this week'), None) last_week = Operator('lastweek', _('last week'), None) in_less_than_days = Operator('iltd', _('in less than days'), 'input', 'days') in_more_than_days = Operator('imtd', _('in more than days'), 'input', 'days') in_days = Operator('ind', _('in days'), 'input', 'days') this_month = Operator('thismonth', _('this month'), None) last_month = Operator('lastmonth', _('last month'), None) select_month = Operator('selmonth', _('select month'), 'select+input') this_year = Operator('thisyear', _('this year'), None)
[docs]class FilterBase(object): """Base filter class interface for webgrid filters. Contains filter operators, inputs, render information, and the inner workings to apply the filter to the database query as needed. Args: sa_col (Expression): SQLAlchemy expression to which we apply operator/values. default_op (str, optional): UI shortcut to enable filter with the specified op if none is given by the user. Defaults to None. default_value1 (str, optional): Use with `default_op` to set input. Defaults to None. default_value2 (str, optional): Use with `default_op` to set input. Defaults to None. dialect (str, optional): DB dialect in use, for filters supporting multiple DBMS platforms. Defaults to None. Class attributes: operators (tuple(Operator)): Available filter operators in the order they should appear. primary_op (str, optional): Key of operator to be selected automatically when the filter is added in UI. Defaults to first available filter op. init_attrs_for_instance (tuple(str)): Attributes to set when copying filter instance. Should not normally need to be set. input_types (tuple(str)): Possible input types for renderer to make available. Can be "input", "input2", and/or "select". Defaults to `("input", )`. receives_list (bool): Filter takes a list of values in its `set` method. Defaults to False. is_aggregate (bool): Filter applies to HAVING clause instead of WHERE Instance attributes: op (str): Operator key set by the user or programmatically. value1 (Any): First input value following validation processing. value2 (Any): Second input value following validation processing. value1_set_with (str): First input value raw from `set` call. value2_set_with (str): Second input value raw from `set` call. error (bool): True if input processing encountered a validation error. """ operators = ops.eq, ops.not_eq, ops.empty, ops.not_empty # one operator may be specified as the "primary", i.e. the one to select when filter is added # note, the renderer is responsible for using this operator primary_op = None # if needed, specifiy the name of attributes set on the static instance of # this class that should get copied over to a new instance by # new_instance() init_attrs_for_instance = () # current HTML renderer allows for "input", "input2", and/or "select" input_types = 'input', # match operators to the HTML5 type(s) html_input_types = {} # does this filter take a list of values in it's set() method receives_list = False # does this filter apply to the HAVING clause is_aggregate = False def __init__(self, sa_col=None, default_op=None, default_value1=None, default_value2=None, dialect=None): # attributes from static instance self.sa_col = sa_col self._default_op = default_op self.default_op = None self.default_value1 = default_value1 self.default_value2 = default_value2 self.dialect = dialect # attributes that will start fresh for each instance self.op = None self.value1 = None self.value2 = None self.value1_set_with = None self.value2_set_with = None self._op_keys = None self.error = False # find the outermost call to a subclass's init method so we can store the exact arguments # used to construct it outermost = None frame = inspect.currentframe() # Can't use inspect.stack() here because when called from within a Jinja template, # inspect.getframeinfo raises an exception while frame: if frame.f_code.co_name != '__init__' or \ not isinstance(frame.f_locals.get('self'), FilterBase): break outermost = inspect.getargvalues(frame) frame = frame.f_back self._vargs = [outermost.locals[a] for a in outermost.args[1:]] + \ list(outermost.locals[outermost.varargs] if outermost.varargs else []) self._kwargs = outermost.locals[outermost.keywords] if outermost.keywords else {} @property def is_active(self): """Filter is active if op is set and input requirements are met.""" operator_by_key = {op.key: op for op in self.operators} return self.op is not None and not self.error and ( operator_by_key[self.op].field_type is None or self.value1 is not None ) @property def is_display_active(self): """Filter display is active (i.e. show as a selected filter in UI) if op is set.""" return self.op is not None @property def op_keys(self): """List of Operator keys used by this filter.""" if self._op_keys is None: self._op_keys = [op.key for op in self.operators] return self._op_keys def _default_value(self, value): if callable(value): return value() return value
[docs] def set(self, op, value1, value2=None): """Set filter operator and input values. Stores the raw inputs, then processes the inputs for validation. Applies the default operator if needed. Args: op (str): Operator key. value1 (Any): First filter input value. Pass None if the operator takes no input. value2 (Any, optional): Second filter input value. Defaults to None. Raises: validators.ValueInvalid: One or more inputs did not validate. """ if not op: self.default_op = self._default_op() if callable(self._default_op) else self._default_op self.op = self.default_op self.using_default_op = (self.default_op is not None) if self.op is None: return if not op and self.using_default_op: value1 = self._default_value(self.default_value1) value2 = self._default_value(self.default_value2) # set values used in display first, since processing validation may # raise exceptions v_required = validators.RequiredValidator() v_oneof = validators.OneOfValidator(self.op_keys) self.op = v_oneof.process(v_required.process(op or self.default_op)) self.value1_set_with = value1 self.value2_set_with = value2 try: self.value1 = self.process(value1, False) self.value2 = self.process(value2, True) except validators.ValueInvalid: self.error = True raise
[docs] def raise_unrecognized_op(self): """Specified operator was not in the filter's list.""" raise UnrecognizedOperator(_('unrecognized operator: {op}', op=self.op))
[docs] def apply(self, query): """Query modifier to apply the needed clauses for filter inputs.""" filter_method = query.filter if not self.is_aggregate else query.having if self.op == self.default_op and self.value1 is None: return query if self.op == ops.eq: return filter_method(self.sa_col == self.value1) if self.op == ops.not_eq: return filter_method(self.sa_col != self.value1) if self.op == ops.empty: return filter_method(self.sa_col.is_(None)) if self.op == ops.not_empty: return filter_method(self.sa_col.isnot(None)) if self.op == ops.less_than_equal: return filter_method(self.sa_col <= self.value1) if self.op == ops.greater_than_equal: return filter_method(self.sa_col >= self.value1) self.raise_unrecognized_op()
[docs] def process(self, value, is_value2): """ Process the values as given to .set(), validating and manipulating as needed. """ return value
[docs] def format_invalid(self, exc, col): """Wrapper for generating a validation error string.""" return '{0}: {1}'.format( col.label, str(exc) )
[docs] def get_search_expr(self): """ Filters can be used for the general "single search" function on the grid. For this to work in SQL, the grid needs to pull search expressions from all filters and OR them together. Return value is expected to be a callable taking one argument (the search value). E.g. `lambda value: self.sa_col.like('%{}%'.format(value))` Return value of `None` is filtered out, essentially disabling search for the filter. """ return None
def serialize_filter_operator(self, op): field_type = op.field_type if op in self.html_input_types: field_type += '.' + self.html_input_types[op] return types.FilterOperator( key=op.key, label=op.display, field_type=field_type, hint=op.hint, ) def serialize_filter_option(self, key, value): return types.FilterOption( key=key, value=value, ) def serialize_filter_spec(self): return types.FilterSpec( operators=[self.serialize_filter_operator(op) for op in self.operators], primary_op=self.serialize_filter_operator( self.primary_op) if self.primary_op else None, )
[docs] def new_instance(self, **kwargs): """ Note: Ensure any overrides of this method accept and pass through kwargs to preserve compatibility in future """ cls = self.__class__ new_filter = cls(*self._vargs, **self._kwargs) new_filter.dialect = kwargs.get('dialect') if 'col' in kwargs and new_filter.sa_col is None: new_filter.sa_col = kwargs['col'] return new_filter
def __repr__(self): return 'class={}, op={}, value1={}, value2={}'.format( self.__class__.__name__, self.op, self.value1, self.value2 )
class _NoValue(object): pass
[docs]class OptionsFilterBase(FilterBase): """Base class for filters having a list of options. UI for these is a single select box. By default, these are all shown with a search box within the filter, and checkboxes to pick multiple items. Notable args: value_modifier (Union(str, callable, Validator), optional): modifier to apply to the input value(s) in the request. Generally, the options list in the filter will have the "true" type in the identifier, but the request will come in with strings. If `value_modifier` is "auto", we'll check one of the options IDs with some known types to pick a webgrid validator. Or, a webgrid validator can be passed in. Or, pass a callable, and it will be wrapped as a validator. Defaults to "auto". Class attributes: options_from (tuple): Iterable of options available to the filter. Defined here as a class attribute, but can be overridden as an instance attribute, property, or method. Options are expected to be tuples of the form (key, value). `key` is the part that will be validated on input and used in the filter query clause. `value` is displayed in UI. """ operators = ops.is_, ops.not_is, ops.empty, ops.not_empty primary_op = ops.is_ input_types = 'select' receives_list = True options_from = () def __init__(self, sa_col, value_modifier='auto', default_op=None, default_value1=None, default_value2=None): FilterBase.__init__(self, sa_col, default_op=default_op, default_value1=default_value1, default_value2=default_value2) # attributes from static instance self.value_modifier = value_modifier # attributes that will start fresh for each instance self._options_seq = None self._options_keys = None def serialize_filter_spec(self): base_spec = super().serialize_filter_spec() return types.OptionsFilterSpec( operators=base_spec.operators, primary_op=base_spec.primary_op, options=[ self.serialize_filter_option(key, value) for key, value in self.options_seq], )
[docs] def new_instance(self, **kwargs): filter = FilterBase.new_instance(self, **kwargs) filter.setup_validator() return filter
@property def options_seq(self): """Resolver for `options_from` that caches the options values. Tries to treat `options_from` as a callable first, and if that fails, refers to it as an attribute/property value instead. """ if self._options_seq is None: try: self._options_seq = self.options_from() except TypeError as e: if 'is not callable' not in str(e): raise self._options_seq = self.options_from return self._options_seq @property def option_keys(self): """Extract a keys list from the options tuples.""" if self._options_keys is None: self._options_keys = [k for k, v in self.options_seq] return self._options_keys
[docs] def setup_validator(self): """Select a validator by type if `value_modifier` is "auto", or wrap a callable.""" # make an educated guess about what type the unicode values sent in on # a set() operation should be converted to if self.value_modifier == 'auto' or self.value_modifier is None: if self.value_modifier and len(self.option_keys) == 0: raise ValueError(_('value_modifier argument set to "auto", but ' 'the options set is empty and the type can therefore not ' 'be determined for {name}', name=self.__class__.__name__)) first_key = self.option_keys[0] if isinstance(first_key, six.string_types) or self.value_modifier is None: self.value_modifier = validators.StringValidator() elif isinstance(first_key, int): self.value_modifier = validators.IntValidator() elif isinstance(first_key, float): self.value_modifier = validators.FloatValidator() elif isinstance(first_key, D): self.value_modifier = validators.DecimalValidator() else: raise TypeError( _("can't use value_modifier='auto' when option keys are {key_type}", key_type=type(first_key)) ) else: # if its not the string 'auto' and its not a webgrid validator, assume # its a callable and wrap with a webgrid validator if not hasattr(self.value_modifier, 'process'): if not hasattr(self.value_modifier, '__call__'): raise TypeError( _('value_modifier must be the string "auto", have a "process" attribute, ' 'or be a callable') ) self.value_modifier = validators.CustomValidator(processor=self.value_modifier)
[docs] def set(self, op, values, value2=None): """Set the filter op/values to be used by query modifiers. Since this type of filter has only one input box, `value2` is ignored (present here for consistent API). Each of the items passed in the first filter value is processed with the selected/given validator. Any items that do not pass validation are ignored and left out of the query. Args: op (str): Operator key. values (iterable): List/tuple/iterable of values to be validated. value2 (Any, optional): Ignored. Defaults to None. """ self.default_op = self._default_op() if callable(self._default_op) else self._default_op if not op and not self.default_op: return self.op = op or self.default_op self.using_default_op = (self.default_op is not None) if self.using_default_op and op is None and self.default_value1 is not None: values = tolist(self._default_value(self.default_value1)) self.value1 = [] if values is not None: for v in values: try: v = self.process(v) if v is not _NoValue: self.value1.append(v) except validators.ValueInvalid: # A normal user should be selecting from the options given, # so if we encounter an Invalid exception, we are going to # assume the value is erronious and just ignore it pass # if there are no values after processing, the operator is irrelevent # and should be set to None so that it is as if the filter # is not set. # # however, we have to test the operator first, because if it is empty # or !empty, then it would make sense for self.value1 to be empty. if self.op in (ops.is_, ops.not_is) and not (self.value1 or self.default_op): self.op = None self.value1_set_with = values
[docs] def process(self, value): """Apply the `value_modifier` to a value.""" if self.value_modifier is not None: validator = self.value_modifier if inspect.isclass(self.value_modifier): validator = validator() value = validator.process(value) if value not in self.option_keys: return _NoValue if self.default_op and value == -1: return _NoValue return value
[docs] def match_keys_for_value(self, value): """Used for single-search to match search value to part of an option's display string.""" return [ key for (key, _) in filter( lambda item: value.lower() in str(item[1]).lower(), self.options_seq ) ]
[docs] def get_search_expr(self): """Match up a search value to option display, grab the corresponding keys, and search.""" # The important thing to remember here is that a user will be searching for the displayed # value, not the key that generated it. We need to do some prep work to search options # to get the keys needed for lookup into the data source. def search(value): matching_keys = self.match_keys_for_value(value) return self.sa_col.in_(matching_keys) return search
[docs] def apply(self, query): """Query modifier to apply the needed clauses for filter inputs.""" if self.op == ops.is_: if len(self.value1) == 1: return query.filter(self.sa_col == self.value1[0]) elif len(self.value1) > 1: return query.filter(self.sa_col.in_(self.value1)) else: return query if self.op == ops.not_is: if len(self.value1) == 1: return query.filter(self.sa_col != self.value1[0]) elif len(self.value1) > 1: return query.filter(~self.sa_col.in_(self.value1)) else: return query return FilterBase.apply(self, query)
[docs]class OptionsIntFilterBase(OptionsFilterBase): """Base class for filters having a list of options with integer keys. Shortcut for using `OptionsFilterBase` and supplying `webgrid.validators.IntValidator` as the `value_modifier`. """ def __init__(self, sa_col, value_modifier=validators.IntValidator, default_op=None, default_value1=None, default_value2=None): OptionsFilterBase.__init__(self, sa_col, value_modifier, default_op, default_value1, default_value2)
[docs]class OptionsEnumFilter(OptionsFilterBase): """Options filter that pulls options from a python Enum. Most filters can be used in column definitions as a class or an instance. With this filter, an instance must be given, so the `enum_type` can be specified. Notable args: enum_type (Enum): Python Enum type to use for options list. """ enum_type = None def __init__( self, sa_col, value_modifier=None, default_op=None, default_value1=None, default_value2=None, enum_type=None, ): self.enum_type = enum_type or self.__class__.enum_type if self.enum_type is None: raise ValueError('enum_type argument not given') if value_modifier is None: value_modifier = self.default_modifier super(OptionsEnumFilter, self).__init__( sa_col, value_modifier=value_modifier, default_op=default_op, default_value1=default_value1, default_value2=default_value2, )
[docs] def default_modifier(self, value): """Generic `value_modifier` that validates an item in the given Enum.""" if isinstance(value, self.enum_type): return value try: return self.enum_type[value] except KeyError: raise ValueError('Not a valid selection')
[docs] def options_from(self): """Override as an instance method here, returns the options tuples from the Enum.""" return [(x.name, x.value) for x in self.enum_type]
[docs] def new_instance(self, **kwargs): new_inst = super(OptionsEnumFilter, self).new_instance(**kwargs) new_inst.enum_type = self.enum_type return new_inst
[docs] def process(self, value): """Validate value using `value_modifier`.""" if self.value_modifier is None: return value return self.value_modifier.process(value)
class OptionsEnumArrayFilter(OptionsEnumFilter): """Handle filtering array fields having an enum type. Uses SA's any/contains operators, while correcting for enum type discrepancy. Note, this is known to work with Postgres, but has not been tested on other platforms.""" def get_search_expr(self): """Match up a search value to option display, grab the corresponding keys, and search.""" # The important thing to remember here is that a user will be searching for the displayed # value, not the key that generated it. We need to do some prep work to search options # to get the keys needed for lookup into the data source. def search(value): matching_keys = self.match_keys_for_value(value) # contains doesn't have the same behavior as in_. In_ guards against empty sets # since no record is in the null set, but contains with an empty set will return # every record. Make sure we have something to contain, or else don't return an # expression at all. if matching_keys: return self.sa_col.contains(matching_keys) return None return search def apply(self, query): """Query modifier to apply the needed clauses for filter inputs.""" if self.op == ops.is_: if len(self.value1) == 1 and self.value1[0]: return query.filter(self.sa_col.any(self.value1[0].name)) elif len(self.value1) > 1: return query.filter(self.sa_col.contains(self.value1)) else: return query if self.op == ops.not_is: if len(self.value1) == 1 and self.value1[0]: return query.filter(~self.sa_col.any(self.value1[0].name)) elif len(self.value1) > 1: return query.filter(~self.sa_col.contains(self.value1)) else: return query return super().apply(query)
[docs]class TextFilter(FilterBase): """Filter with single freeform text input.""" operators = (ops.eq, ops.not_eq, ops.contains, ops.not_contains, ops.empty, ops.not_empty) primary_op = ops.contains @property def comparisons(self): """Handles the dialect-specific job of text comparisons. In a functional text filter, we want to be case-insensitive. The default behavior of some databases (such as postgresql) is to be case-sensitive for LIKE operations. We work around that for dialects known to have that complexity, and compare upper-case values or use the ILIKE operator. Some, like mssql, are normally case-insensitive for string comparisons, and we assume that is the case here. Obviously, the database can be set up differently. If it is, that case would need to be handled separately in a custom filter. """ if self.dialect and self.dialect.name in ('postgresql', 'sqlite'): return { ops.eq: lambda col, value: sa.func.upper(col) == sa.func.upper(value), ops.not_eq: lambda col, value: sa.func.upper(col) != sa.func.upper(value), ops.contains: lambda col, value: col.ilike(u'%{}%'.format(value)), ops.not_contains: lambda col, value: ~col.ilike(u'%{}%'.format(value)) } return { ops.eq: lambda col, value: col == value, ops.not_eq: lambda col, value: col != value, ops.contains: lambda col, value: col.like(u'%{}%'.format(value)), ops.not_contains: lambda col, value: ~col.like(u'%{}%'.format(value)) }
[docs] def get_search_expr(self): return lambda value: self.comparisons[ops.contains](self.sa_col, value)
[docs] def apply(self, query): if self.op == self.default_op and not self.value1: return query if self.op == ops.empty: return query.filter(or_( self.sa_col.is_(None), self.sa_col == u'', )) if self.op == ops.not_empty: return query.filter(and_( self.sa_col.isnot(None), self.sa_col != u'', )) if self.op in self.comparisons: return query.filter(self.comparisons[self.op](self.sa_col, self.value1)) return FilterBase.apply(self, query)
class NumberFilterBase(FilterBase): """Base class for filters taking one or two freeform inputs as numbers. Class attributes: validator (Validator): webgrid validator to use on input values. """ operators = (ops.eq, ops.not_eq, ops.less_than_equal, ops.greater_than_equal, ops.between, ops.not_between, ops.empty, ops.not_empty) def process(self, value, is_value2): if self.op == self.default_op and not value: return None if self.op in (ops.eq, ops.not_eq, ops.less_than_equal, ops.greater_than_equal) and not is_value2: v_required = validators.RequiredValidator() return self.validator().process(v_required.process(value)) if value is None or value == '': return None return self.validator().process(value) def get_search_expr(self): # This is a naive implementation that simply converts the number column to string and # uses a LIKE. We could go nuts with things like stripping thousands separators, # parenthesis, monetary symbols, etc. from the search value, but then we get to deal with # locale. return lambda value: sa.sql.cast(self.sa_col, sa.Unicode).like('%{}%'.format(value)) def apply(self, query): filter_method = query.filter if not self.is_aggregate else query.having if self.op == ops.between: return filter_method(self.sa_col.between(self.value1, self.value2)) if self.op == ops.not_between: return filter_method(~self.sa_col.between(self.value1, self.value2)) return super().apply(query)
[docs]class IntFilter(NumberFilterBase): """Number filter validating inputs as integers.""" validator = validators.IntValidator
[docs]class AggregateIntFilter(IntFilter): """Number filter validating inputs as integers, for use on aggregate columns.""" is_aggregate = True
[docs]class NumberFilter(NumberFilterBase): """ Same as int filter, but will handle real numbers and type everything as a decimal.Decimal object """ # our process() doesn't use a validator to return, but parent class does validator = validators.FloatValidator
[docs] def process(self, value, is_value2): # call the validator to ensure the value is in the right format, but # don't use its value b/c it converts to float NumberFilterBase.process(self, value, is_value2) if value is None or (isinstance(value, six.string_types) and not len(value)): return None return D(value)
[docs]class AggregateNumberFilter(NumberFilter): """Number filter validating inputs as Decimal, for use on aggregate columns.""" is_aggregate = True
class _DateMixin(object): options_from = [ (1, _('01-Jan')), (2, _('02-Feb')), (3, _('03-Mar')), (4, _('04-Apr')), (5, _('05-May')), (6, _('06-Jun')), (7, _('07-Jul')), (8, _('08-Aug')), (9, _('09-Sep')), (10, _('10-Oct')), (11, _('11-Nov')), (12, _('12-Dec')), ] op_to_date_range = ImmutableDict({ # these filters can be set as default ops without input values, so don't ignore them ops.this_month: lambda self, today: ( today + relativedelta(day=1), today + relativedelta(day=1, months=+1, days=-1), ), ops.last_month: lambda self, today: ( today + relativedelta(day=1, months=-1), today + relativedelta(day=1, days=-1), ), ops.select_month: lambda self, today: self._select_month(today), ops.this_year: lambda self, today: ( dt.date(today.year, 1, 1), dt.date(today.year, 12, 31), ), ops.this_week: lambda self, today: ( today - relativedelta(weekday=SU(-1)), today + relativedelta(weekday=calendar.SATURDAY), ), ops.last_week: lambda self, today: ( today - relativedelta(weekday=SU(-1)) - relativedelta(days=7), today + relativedelta(weekday=calendar.SATURDAY) - relativedelta(days=7), ), ops.today: lambda self, today: (today, today), ops.in_past: lambda self, today: (today, today), ops.in_future: lambda self, today: (today, today), # ops with both dates populated ops.between: lambda self, today: self._between_range(), ops.not_between: lambda self, today: self._between_range(), # ops with single date populated ops.less_than_days_ago: lambda self, today: ( today - dt.timedelta(days=self.value1), today, ) if self.value1 is not None else (None, None), ops.in_less_than_days: lambda self, today: ( today, today + dt.timedelta(days=self.value1), ) if self.value1 is not None else (None, None), ops.days_ago: lambda self, today: ( today - dt.timedelta(days=self.value1), today - dt.timedelta(days=self.value1), ) if self.value1 is not None else (None, None), ops.more_than_days_ago: lambda self, today: ( None, today - dt.timedelta(days=self.value1) ) if self.value1 is not None else (None, None), ops.in_days: lambda self, today: self._in_days(today), ops.in_more_than_days: lambda self, today: self._in_days(today), ops.eq: lambda self, today: self._equality(), ops.not_eq: lambda self, today: self._equality(), ops.less_than_equal: lambda self, today: self._equality(), ops.greater_than_equal: lambda self, today: self._equality(), }) @property def options_seq(self): _options_seq = self.options_from if self.default_op: _options_seq = [(-1, _('-- All --'))] + _options_seq return _options_seq def format_display_vals(self): if isinstance(self.value1, dt.date) and self.op in ( ops.eq.key, ops.not_eq.key, ops.less_than_equal.key, ops.greater_than_equal.key, ops.between.key, ops.not_between.key ): # !!!: localize self.value1_set_with = self.value1.strftime('%Y-%m-%d') if isinstance(self.value2, dt.date) and self.op in ( ops.between.key, ops.not_between.key ): # !!!: localize self.value2_set_with = self.value2.strftime('%Y-%m-%d') def _between_range(self): if self.value1 is None: return None, None if self.value1 <= self.value2: first_day, last_day = self.value1, self.value2 else: first_day, last_day = self.value2, self.value1 return first_day, last_day def _in_days(self, today): if self.value1 is None: return None, None return today + dt.timedelta(days=self.value1), None def _equality(self): return self.value1, self.value1 def _select_month(self, today): first_day = last_day = None if self.value2: month = self.value1 if self.value1 > 0 else 1 first_day = dt.date(self.value2, month, 1) last_day = first_day + relativedelta(day=1, months=1, days=-1) if self.value1 == -1: last_day = dt.date(self.value2, 12, 31) return first_day, last_day def _description_data(self): today = self._get_today() first_day, last_day = self.op_to_date_range.get( self.op, lambda self, today: (None, None) )(self, today) prefix = { ops.more_than_days_ago: _('before '), ops.in_past: _('before '), ops.not_between: _('excluding '), ops.in_more_than_days: _('after '), ops.in_future: _('after '), ops.not_eq: _('excluding '), ops.less_than_equal: _('up to '), ops.greater_than_equal: _('beginning '), }.get(self.op, '') return first_day, last_day, prefix @property def description(self): """ String description of the filter operation and values - Useful for Excel reports """ # simple cases if self.error: return _('invalid') elif self.op == ops.select_month: if not ( isinstance(self.value1, int) and isinstance(self.value2, int) ): return _('All') if self.value1 < 1 or self.value1 > 12: return self.value2 # !!!: localize return dt.date(self.value2, self.value1, 1).strftime('%b %Y') elif self.op == ops.empty: return _('date not specified') elif self.op == ops.not_empty: return _('any date') first_day, last_day, prefix = self._description_data() if not first_day and ( not self.op or ( self.default_op == self.op and ( self.value2 is None and self.value1 is None ) ) ): return _('all') if self.op in ( ops.today, ops.eq, ops.not_eq, ops.less_than_equal, ops.greater_than_equal, ops.days_ago, ops.in_past, ops.in_future, ): # !!!: localize return _('{descriptor}{date}', descriptor=prefix, date=first_day.strftime('%m/%d/%Y')) elif last_day and first_day: # !!!: localize dates return _('{descriptor}{first_date} - {second_date}', descriptor=prefix, first_date=first_day.strftime('%m/%d/%Y'), second_date=last_day.strftime('%m/%d/%Y')) else: # !!!: localize target_date = first_day if first_day else last_day return _('{descriptor}{date}', descriptor=prefix, date=target_date.strftime('%m/%d/%Y')) def valid_date_for_backend(self, value): """ Returns false if the given date or datetime is out of bounds for the backend dialect. If for any reason the value cannot be validated the value is presumed to be valid.. """ if not self.dialect: return True if not isinstance(value, (dt.datetime, dt.date)): return True def min_dt(*args): m = dt.datetime.min parts = [m.year, m.month, m.day, m.hour, m.minute, m.second, m.microsecond] return dt.datetime(*(max(a, m) for a, m in zip(args, parts))) def max_dt(*args): m = dt.datetime.max parts = [m.year, m.month, m.day, m.hour, m.minute, m.second, m.microsecond] return dt.datetime(*(min(a, m) for a, m in zip(args, parts))) if not isinstance(value, dt.datetime): value = dt.datetime.combine(value, dt.time.min) if self.dialect.name == 'postgresql': # https://www.enterprisedb.com/edb-docs/d/edb-postgres-advanced-server/user-guides/database-compatibility-for-oracle-developers-guide/9.4/Database_Compatibility_for_Oracle_Developers_Guide.1.038.html # noqa return min_dt(-4713, 1, 1) <= value <= max_dt(294276, 12, 31, 23, 59, 59) elif self.dialect.name == 'sqlite': # https://www.sqlite.org/lang_datefunc.html return min_dt(0, 1, 1) <= value <= max_dt(9999, 12, 31, 23, 59, 59) elif self.dialect.name == 'mssql': # https://docs.microsoft.com/en-us/sql/t-sql/data-types/datetime-transact-sql?view=sql-server-ver15#datetime-description # noqa return min_dt(1753, 1, 1) <= value <= max_dt(9999, 12, 31, 23, 59, 59, 997) return True def get_search_expr(self, date_comparator=None): # This is a naive implementation that simply converts the date/time column to string and # uses a LIKE. Only addition is to support a common month/day/year format, but only if # the value is easily parsible date_comparator = date_comparator or (lambda value: self.sa_col == value) def expr(value): base_expr = sa.sql.cast(self.sa_col, sa.Unicode).like('%{}%'.format(value)) try: date_value = parse(value) if not self.valid_date_for_backend(date_value): return base_expr return or_( base_expr, date_comparator(date_value) ) except (ValueError, OverflowError): pass return base_expr return expr def serialize_filter_spec(self): base_spec = super().serialize_filter_spec() return types.OptionsFilterSpec( operators=base_spec.operators, primary_op=base_spec.primary_op, options=[ self.serialize_filter_option(key, value) for key, value in self.options_seq], ) class _DateOpQueryMixin: op_to_query = ImmutableDict({ ops.today: lambda self, query, today: query.filter( self.sa_col == today ), ops.in_past: lambda self, query, today: query.filter(self.sa_col < today), ops.in_future: lambda self, query, today: query.filter(self.sa_col > today), ops.this_week: lambda self, query, today: query.filter(self.sa_col.between( today - relativedelta(weekday=SU(-1)), today + relativedelta(weekday=calendar.SATURDAY), )), ops.last_week: lambda self, query, today: query.filter(self.sa_col.between( today - relativedelta(weekday=SU(-1)) - relativedelta(days=7), today + relativedelta(weekday=calendar.SATURDAY) - relativedelta(days=7), )), ops.select_month: lambda self, query, today: ( self._month_year_filter(query) if self.value1 and self.value2 else query ), ops.this_month: lambda self, query, today: self._month_year_filter(query), ops.last_month: lambda self, query, today: self._month_year_filter(query), ops.this_year: lambda self, query, today: self._month_year_filter(query), ops.between: lambda self, query, today: query.filter(self._between_clause()), ops.not_between: lambda self, query, today: query.filter(~self._between_clause()), ops.days_ago: lambda self, query, today: query.filter( self.sa_col == today - dt.timedelta(days=self.value1) ), ops.less_than_days_ago: lambda self, query, today: query.filter(and_( self.sa_col > today - dt.timedelta(days=self.value1), self.sa_col <= today, )), ops.more_than_days_ago: lambda self, query, today: query.filter( self.sa_col < today - dt.timedelta(days=self.value1) ), ops.in_days: lambda self, query, today: query.filter( self.sa_col == today + dt.timedelta(days=self.value1) ), ops.in_less_than_days: lambda self, query, today: query.filter(and_( self.sa_col >= today, self.sa_col < today + dt.timedelta(days=self.value1), )), ops.in_more_than_days: lambda self, query, today: query.filter( self.sa_col > today + dt.timedelta(days=self.value1) ), }) def _get_today(self): # this filter is date-only, so our "now" is a date without time return ensure_date(self._get_now()) def _get_now(self): return self._now or dt.datetime.now() def _month_year_filter(self, query): return query.filter(self.sa_col.between(self.first_day, self.last_day)) def _between_clause(self): if self.value1 <= self.value2: return self.sa_col.between(self.value1, self.value2) else: return self.sa_col.between(self.value2, self.value1)
[docs]class DateFilter(_DateOpQueryMixin, _DateMixin, FilterBase): """Complex date filter. Depending on the operator, inputs could be one or two freeform, or a select and freeform. Notable args: _now (datetime, optional): Useful for testing, supplies a date the filter will use instead of the true `datetime.now()`. Defaults to None. """ operators = ( ops.eq, ops.not_eq, ops.in_past, ops.in_future, ops.less_than_equal, ops.greater_than_equal, ops.between, ops.not_between, ops.days_ago, ops.less_than_days_ago, ops.more_than_days_ago, ops.today, ops.this_week, ops.last_week, ops.in_days, ops.in_less_than_days, ops.in_more_than_days, ops.empty, ops.not_empty, ops.this_month, ops.last_month, ops.select_month, ops.this_year ) days_operators = ( ops.days_ago, ops.less_than_days_ago, ops.more_than_days_ago, ops.in_less_than_days, ops.in_more_than_days, ops.in_days ) no_value_operators = ( ops.empty, ops.not_empty, ops.today, ops.this_week, ops.last_week, ops.this_month, ops.last_month, ops.this_year, ops.in_past, ops.in_future, ) input_types = 'input', 'select', 'input2' html_input_types = { ops.eq: 'date', ops.not_eq: 'date', ops.less_than_equal: 'date', ops.greater_than_equal: 'date', ops.between: 'date', ops.not_between: 'date', } def __init__(self, sa_col, _now=None, default_op=None, default_value1=None, default_value2=None): self.first_day = None self.last_day = None FilterBase.__init__(self, sa_col, default_op=default_op, default_value1=default_value1, default_value2=default_value2) # attributes from static instance self._now = _now # attributes that will start fresh for each instance self._was_time_given1 = False self._was_time_given2 = False self.check_arrow_type()
[docs] def check_arrow_type(self): """Verify that the expression given to the filter is not ArrowType. If it is, cast it to a date to avoid type problems in the date filter""" if arrow and ArrowType and isinstance(self.sa_col.type, ArrowType): self.sa_col = sa.sql.cast(self.sa_col, sa.Date)
[docs] def set(self, op, value1, value2=None): super(DateFilter, self).set(op, value1, value2) self.format_display_vals() # store first/last day for customized usage today = self._get_today() self.first_day, self.last_day = self.op_to_date_range.get( self.op, lambda self, today: (None, None) )(self, today)
[docs] def apply(self, query): query_function = self.op_to_query.get(self.op) if query_function: filtered_query = query_function(self, query, self._get_today()) else: filtered_query = super().apply(query) # if function returns None rather than query, call super if filtered_query is None: filtered_query = super().apply(query) if self.op in ( ops.today, ops.this_week, ops.last_week, ops.select_month, ops.this_month, ops.last_month, ops.this_year, ops.in_past, ops.in_future, ): return filtered_query # filters above are handled specifically before this check because they do not require any # input values (or selmonth, which is grouped and handled specially). Filters below this # need at least one value to proceed, and if a default value is displayed but not # populated, we need to ignore it if self.op == self.default_op and self.value1 is None: return query return filtered_query
def _process_days_operator(self, value, is_value2): if is_value2: return None v_required = validators.RequiredValidator() v_int = validators.IntValidator() filter_value = v_int.process(v_required.process(value)) if self.op in (ops.days_ago, ops.less_than_days_ago, ops.more_than_days_ago): try: self._get_today() - dt.timedelta(days=filter_value) except OverflowError: raise validators.ValueInvalid(gettext('date filter given is out of range'), value, self) if self.op in (ops.in_days, ops.in_less_than_days, ops.in_more_than_days): try: self._get_today() + dt.timedelta(days=filter_value) except OverflowError: raise validators.ValueInvalid(gettext('date filter given is out of range'), value, self) return filter_value def _process_date(self, value, is_value2): try: d = ensure_date(parse(value)) if isinstance(d, (dt.date, dt.datetime)) and d.year < 1900: v_range = validators.RangeValidator(min=1900) return v_range.process(d.year) return d except (ValueError, OverflowError): # allow open ranges when blanks are submitted as a second value if is_value2 and not value: return self._get_today() raise validators.ValueInvalid(gettext('invalid date'), value, self)
[docs] def process(self, value, is_value2): # None is ok for default_ops if self.op == self.default_op and not value: return None # Subclass ops that do not require a value should be added to no_value_operators # to ensure empty values do not trigger a validation error if self.op in self.no_value_operators: return None if value is None: if is_value2: if self.op in (ops.between, ops.not_between): value = '' else: return None else: raise validators.ValueInvalid(gettext('invalid date'), value, self) if self.op == ops.select_month: v_int = validators.IntValidator() if value is None or value == '': return None if is_value2: v_range = validators.RangeValidator(min=1900, max=9999) return v_range.process(v_int.process(value)) return v_int.process(value) if self.op in self.days_operators: return self._process_days_operator(value, is_value2) return self._process_date(value, is_value2)
[docs]class DateTimeFilter(DateFilter): """Complex datetime filter. Depending on the operator, inputs could be one or two freeform, or a select and freeform. Notable args: _now (datetime, optional): Useful for testing, supplies a date the filter will use instead of the true `datetime.now()`. Defaults to None. """ html_input_types = { ops.eq: 'datetime-local', ops.not_eq: 'datetime-local', ops.less_than_equal: 'datetime-local', ops.greater_than_equal: 'datetime-local', ops.between: 'datetime-local', ops.not_between: 'datetime-local', } op_to_query = ImmutableDict({**DateFilter.op_to_query, **{ ops.today: lambda self, query, today: query.filter(self.sa_col.between( ensure_datetime(today), ensure_datetime(today, time_part=dt.time(23, 59, 59, 999999)), )), ops.in_past: lambda self, query, today: query.filter( self.sa_col < ensure_datetime(today), ), ops.in_future: lambda self, query, today: query.filter( self.sa_col > ensure_datetime(today, time_part=dt.time(23, 59, 59, 999999)), ), ops.this_week: lambda self, query, today: query.filter(self.sa_col.between( ensure_datetime(today - relativedelta(weekday=SU(-1))), ensure_datetime( today + relativedelta(weekday=calendar.SATURDAY), time_part=dt.time(23, 59, 59, 999999) ), )), ops.last_week: lambda self, query, today: query.filter(self.sa_col.between( ensure_datetime(today - relativedelta(weekday=SU(-1)) - relativedelta(days=7)), ensure_datetime( today + relativedelta(weekday=calendar.SATURDAY) - relativedelta(days=7), time_part=dt.time(23, 59, 59, 999999) ), )), ops.this_month: lambda self, query, today: query.filter(self.sa_col.between( ensure_datetime(today + relativedelta(day=1)), today + relativedelta(day=1, months=+1, microseconds=-1), )), ops.last_month: lambda self, query, today: query.filter(self.sa_col.between( ensure_datetime(today + relativedelta(day=1, months=-1)), today + relativedelta(day=1, microseconds=-1), )), ops.this_year: lambda self, query, today: query.filter(self.sa_col.between( ensure_datetime(today + relativedelta(day=1, month=1)), today + relativedelta(day=31, month=12, days=1, microseconds=-1), )), ops.select_month: lambda self, query, today: ( query.filter(self.sa_col.between( ensure_datetime(self.first_day), self.last_day + relativedelta(days=1, microseconds=-1), )) if self.value2 else query ), ops.days_ago: lambda self, query, today: query.filter(self.sa_col.between( ensure_datetime(today - dt.timedelta(days=self.value1)), ensure_datetime( today - dt.timedelta(days=self.value1), time_part=dt.time(23, 59, 59, 999999) ), )), ops.less_than_days_ago: lambda self, query, today: query.filter(and_( self.sa_col > ensure_datetime(today - dt.timedelta(days=self.value1), time_part=dt.time(23, 59, 59, 999999)), self.sa_col <= self._get_now() )), ops.more_than_days_ago: lambda self, query, today: query.filter( self.sa_col < ensure_datetime(today - dt.timedelta(days=self.value1)) ), ops.in_days: lambda self, query, today: query.filter(self.sa_col.between( ensure_datetime(today + dt.timedelta(days=self.value1)), ensure_datetime( today + dt.timedelta(days=self.value1), time_part=dt.time(23, 59, 59, 999999) ), )), ops.in_less_than_days: lambda self, query, today: query.filter(and_( self.sa_col >= self._get_now(), self.sa_col < ensure_datetime(today + dt.timedelta(days=self.value1)), )), ops.in_more_than_days: lambda self, query, today: query.filter( self.sa_col > ensure_datetime( today + dt.timedelta(days=self.value1), time_part=dt.time(23, 59, 59, 999999) ) ), ops.eq: lambda self, query, today: ( query.filter(self._eq_clause()) ), ops.not_eq: lambda self, query, today: ( query.filter(~self._eq_clause()) ), ops.less_than_equal: lambda self, query, today: query.filter( self.sa_col <= ensure_datetime( self.value1.date(), time_part=dt.time(23, 59, 59, 999999) ) ) if self._has_date_only1 else None, ops.between: lambda self, query, today: query.filter(self._between_clause()), ops.not_between: lambda self, query, today: query.filter(~self._between_clause()), }}) def __init__(self, sa_col, _now=None, default_op=None, default_value1=None, default_value2=None): self._has_date_only1 = self._has_date_only2 = False super(DateTimeFilter, self).__init__(sa_col, _now=_now, default_op=default_op, default_value1=default_value1, default_value2=default_value2)
[docs] def check_arrow_type(self): """DateTimeFilter has no problems with ArrowType. Pass this case through.""" pass
def format_display_vals(self): ops_single_val = ( ops.eq.key, ops.not_eq.key, ops.less_than_equal.key, ops.greater_than_equal.key, ) ops_double_val = ( ops.between.key, ops.not_between.key ) if isinstance(self.value1, dt.datetime) and self.op in ops_single_val + ops_double_val: # !!!: localize self.value1_set_with = self.value1.strftime('%Y-%m-%dT%H:%M') if self.op in ops_single_val and self._has_date_only1: # !!!: localize self.value1_set_with = self.value1.strftime('%Y-%m-%d') if isinstance(self.value2, dt.datetime) and self.op in ops_double_val: # !!!: localize self.value2_set_with = self.value2.strftime('%Y-%m-%dT%H:%M') if self._has_date_only2: # !!!: localize self.value2_set_with = self.value2.strftime('%Y-%m-%dT23:59') def _between_clause(self): if self._has_date_only2: right_side = ensure_datetime(self.value2.date(), time_part=dt.time(23, 59, 59, 999999)) elif self.value2.second == 0 and self.value2.microsecond == 0: right_side = self.value2 + dt.timedelta(seconds=59, microseconds=999999) else: right_side = self.value2 return self.sa_col.between(ensure_datetime(self.value1), right_side) def _eq_clause(self): if self._has_date_only1: left_side = ensure_datetime(self.value1.date()) right_side = ensure_datetime(self.value1.date(), time_part=dt.time(23, 59, 59, 999999)) return self.sa_col.between(left_side, right_side) elif self.value1 and self.value1.second == 0: left_side = self.value1 right_side = self.value1 + dt.timedelta(seconds=59, microseconds=999999) return self.sa_col.between(left_side, right_side) elif self.value1 and self.value1.microsecond == 0: left_side = self.value1 right_side = self.value1 + dt.timedelta(microseconds=999999) return self.sa_col.between(left_side, right_side) def _process_datetime(self, value, is_value2): try: dt_value = parse(value) except (ValueError, OverflowError): # allow open ranges when blanks are submitted as a second value if is_value2 and not value: return self._get_now() raise validators.ValueInvalid(gettext('invalid date'), value, self) if is_value2: self._has_date_only2 = self._has_date_only(dt_value, value) else: self._has_date_only1 = self._has_date_only(dt_value, value) return dt_value
[docs] def process(self, value, is_value2): # None is ok for default_ops if self.op == self.default_op and not value: return None # Subclass ops that do not require a value should be added to no_value_operators # to ensure empty values do not trigger a validation error if self.op in self.no_value_operators: return None if value is None: if is_value2: if self.op in (ops.between, ops.not_between): value = '' else: return None else: raise validators.ValueInvalid(gettext('invalid date'), value, self) if self.op == ops.select_month: v_int = validators.IntValidator() if value is None or value == '': return None if is_value2: v_range = validators.RangeValidator(min=1900, max=9999) return v_range.process(v_int.process(value)) return v_int.process(value) if self.op in self.days_operators: return self._process_days_operator(value, is_value2) return self._process_datetime(value, is_value2)
def _has_date_only(self, dt_value, value): return bool( dt_value.hour == 0 and dt_value.minute == 0 and dt_value.second == 0 and '00:00' not in value )
[docs] def get_search_expr(self): # This is a naive implementation that simply converts the date/time column to string and # uses a LIKE. Only addition is to support a common month/day/year format, but only if # the value is easily parsible def date_comparator(value): if self._has_date_only(value, ''): left_side = ensure_datetime(value.date()) right_side = ensure_datetime(value.date(), time_part=dt.time(23, 59, 59, 999999)) return self.sa_col.between(left_side, right_side) return self.sa_col == value return super().get_search_expr(date_comparator=date_comparator)
[docs]class TimeFilter(FilterBase): """Time filter with one or two freeform inputs.""" operators = (ops.eq, ops.not_eq, ops.less_than_equal, ops.greater_than_equal, ops.between, ops.not_between, ops.empty, ops.not_empty) input_types = 'input', 'input2' html_input_types = { ops.eq: 'time', ops.not_eq: 'time', ops.less_than_equal: 'time', ops.greater_than_equal: 'time', ops.between: 'time', ops.not_between: 'time', } # !!!: localize time_format = '%H:%M'
[docs] def apply(self, query): if self.op == self.default_op and self.value1 is None: return query if self.op in (ops.between, ops.not_between): left = min(self.value1, self.value2) right = max(self.value1, self.value2) cond = self.sa_col.between(sa.cast(left, sa.Time), sa.cast(right, sa.Time)) if self.op == ops.not_between: cond = ~cond return query.filter(cond) # Casting this because some SQLAlchemy dialects (MSSQL) convert the value to datetime # before binding. endval = val = sa.cast(self.value1, sa.Time) if self.value1: endval = sa.cast( ( dt.datetime.combine(dt.date.today(), self.value1) + dt.timedelta(seconds=59, microseconds=999999) ).time(), sa.Time ) if self.op == ops.eq: query = query.filter(self.sa_col.between(val, endval)) elif self.op == ops.not_eq: query = query.filter(~self.sa_col.between(val, endval)) elif self.op == ops.less_than_equal: query = query.filter(self.sa_col <= endval) elif self.op == ops.greater_than_equal: query = query.filter(self.sa_col >= val) else: query = super(TimeFilter, self).apply(query) return query
[docs] def process(self, value, is_value2): if value in (None, ''): return None if self.op == self.default_op and not value: return None try: return dt.datetime.strptime(value, self.time_format).time() except ValueError: raise validators.ValueInvalid(_('invalid time'), value, self)
[docs] def get_search_expr(self, date_comparator=None): # This is a naive implementation that simply converts the time column to string and # uses a LIKE. return lambda value: sa.sql.cast(self.sa_col, sa.Unicode).like('%{}%'.format(value))
[docs]class YesNoFilter(FilterBase): """Simple "bool" filter designed for use with the YesNoColumn. No inputs, just the all/yes/no operators. """ class ops(object): all = Operator('a', _('all'), None) yes = Operator('y', _('yes'), None) no = Operator('n', _('no'), None) operators = ( ops.all, ops.yes, ops.no ) primary_op = ops.yes
[docs] def get_search_expr(self): def expr(value): if value.lower() in self.ops.yes.display: return self.sa_col == sa.true() elif value.lower() in self.ops.no.display: return self.sa_col == sa.false() return None return expr
[docs] def apply(self, query): if self.op == self.ops.all: return query if self.op == self.ops.yes: return query.filter(self.sa_col == sa.true()) if self.op == self.ops.no: return query.filter(self.sa_col == sa.false()) return FilterBase.apply(self, query)