Source code for snorky.services.datasync.dealers

# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.

import abc
from snorky.types import with_metaclass
from collections import namedtuple
from functools import wraps
from snorky.types import PY2, StringTypes, Number, MultiDict, items
from snorky.services.datasync.delta import \
        Delta, InsertionDelta, UpdateDelta, DeletionDelta


class BadQuery(Exception):
    pass


[docs]class Dealer(with_metaclass(abc.ABCMeta, object)): """Matches dealer data with subscriptions in order to deliver deltas to clients. """ __slots__ = tuple() @property def name(self): """The name of the dealer. If not provided will default to the name of the class. """ return self.__class__.__name__ @abc.abstractproperty def model(self): """The name of the model class that is handled by this dealer. Usually specified as an static attribute. """ pass @abc.abstractmethod
[docs] def add_subscription_item(self, item): """Called everytime a subscription item referring this Dealer is authorized. """ pass
@abc.abstractmethod
[docs] def remove_subscription_item(self, item): """Called everytime a subscription is cancelled, once for each subscription item which refers to this Dealer. """ pass
@abc.abstractmethod
[docs] def get_subscription_items_for_model(self, model): """Called every time a delta arrives. If the delta is of ``update`` type, it's called twice, once with the old data and another time with the new data. It must return an iterable set of the subscription items which represent subscriptions to the provided model. """ pass
def deliver_delta(self, delta): """Receives a delta, computes a set of destination subscriptions items and forwards the delta to them. Please note that altough InsertionDelta and DeletionDelta will always trigger deltas of the same time on the client, UpdateDelta may not. To understand this fact, see the next example: Suppose our model class composes a player name and a color, i.e. 'id: 1, name: Alice, color: blue'. We may also have a dealer named 'Players with color' that uses the `color` attribute to filter players. A client maybe subscribed to players with color red, showing them in a list. Now, if Alice changes its color to red, this client will receive a insertion delta instead of an update one because they had no previous knowledge of Alice's model and will have to render it as a new row. If then Alice decides to change her player name, the client will receive a delta update because they already knew Alice's model. And lastly, if Alice changes its color again to other than red the new model will no longer satisfy the filter and thus a deletion delta will be sent to the client. """ if isinstance(delta, InsertionDelta) or \ isinstance(delta, DeletionDelta): deliver_to = self.get_subscription_items_for_model(delta.data) for subscription_item in deliver_to: subscription_item.subscription.deliver_delta(delta) else: # If origin delta is an update model = delta.model new_data = delta.new_data old_data = delta.old_data set_old = self.get_subscription_items_for_model(old_data) set_new = self.get_subscription_items_for_model(new_data) # Prepare every type of destination delta insertion_delta = InsertionDelta(model, new_data) update_delta = delta deletion_delta = DeletionDelta(model, old_data) # For the subscription items that do not match the filter with the # old data but do with the new data, send a insertion delta. for item in set_new - set_old: item.subscription.deliver_delta(insertion_delta) # For the subscription items that match the filter with both the # new and old data, send an update delta. for item in set_old.intersection(set_new): item.subscription.deliver_delta(update_delta) # For the subscription items that match the filter with the old # data but do not match with the new data, send a deletion delta. for item in set_old - set_new: item.subscription.deliver_delta(deletion_delta)
[docs]class BroadcastDealer(Dealer): """Dealer that matches all deltas with all subscription items, without filters. """ __slots__ = ('items',) def __init__(self): super(BroadcastDealer, self).__init__() self.items = set() def add_subscription_item(self, item): """Adds a subscription item.""" self.items.add(item) def remove_subscription_item(self, item): """Removes a subscription item.""" self.items.remove(item) def get_subscription_items_for_model(self, model): """Returns all the subscription items.""" return self.items
[docs]class SimpleDealer(Dealer): """This dealer uses a key function in order to determine which subscription items match which models. """ __slots__ = ('items_by_model_key',) def __init__(self): super(SimpleDealer, self).__init__() self.items_by_model_key = MultiDict() @abc.abstractmethod
[docs] def get_key_for_model(self, model): """A subclass must define a function here that for each model returns a value, usually the value of a certain field. The dealer will forward deltas to those subscriptions whose query equals the value returned by this function. """ pass
def add_subscription_item(self, item): """Adds a subscription item. The query must contain the desired value of the model key which will cause this client to receive a notification.""" self.items_by_model_key.add(item.query, item) def remove_subscription_item(self, item): """Removes a subscription item.""" self.items_by_model_key.remove(item.query, item) def get_subscription_items_for_model(self, model): """Returns the subscription items whose queries match the key of the provided model.""" query = self.get_key_for_model(model) # Return the set of items with this query, or an empty set if # there is none. return self.items_by_model_key.get_set(query)
FilterAssociation = namedtuple('FilterAssociation', ('filter', 'subscription_item')) """ Filter syntax ============= ['==', 'color', 'blue'] color is 'blue' ['<', 'age', 21] age is less than 21 ['>=', 'age', 21] age is greater than or equal to 21 ['and', ['==', 'service', 'prosody'], ['>=', 'severity_level', 3]] service is 'prosody' and severity_level is greater than or equal to 3 ['or', ['not', ['==', 'service', 'java']], ['>=', 'severity_level', 3]] service is not 'java' or severity_level is greater than or equal to 3 ['==', 'player.color', 'blue'] color of player is 'blue' """ def get_field(model, name): """ Given a dictionary for `model` and a dot separated string for `name`, accesses the named properties in model. e.g. get_field(model, 'service.name') returns model['service']['name'] """ prop = model for prop_name in name.split('.'): prop = model[prop_name] return prop def false_on_raise(function): """Decorator to make a function return False when a :py:class:`KeyError` or :py:class:`TypeError` is thrown. """ @wraps(function) def _false_on_raise(*args): try: return function(*args) except (KeyError, TypeError): # KeyError: missing property # TypeError: unorderable types (e.g. compared str with int) return False return _false_on_raise def orderable_types(a, b): """The provided parameters are orderable if and only if both are number types or both are string types.""" return (isinstance(a, Number) and isinstance(b, Number)) or \ (isinstance(a, StringTypes) and isinstance(b, StringTypes)) @false_on_raise def filter_eq(model, field, value): """Equality filter.""" return get_field(model, field) == value @false_on_raise def filter_lt(model, field, value): """*Less than* filter""" field_value = get_field(model, field) # Python3 does not allow to compare strings with numbers, emulate that. if PY2 and not orderable_types(field_value, value): raise TypeError return field_value < value @false_on_raise def filter_lte(model, field, value): """*Less than or equal* filter""" field_value = get_field(model, field) # Python3 does not allow to compare strings with numbers, emulate that. if PY2 and not orderable_types(field_value, value): raise TypeError return field_value <= value @false_on_raise def filter_gt(model, field, value): """*Greater than* filter""" field_value = get_field(model, field) # Python3 does not allow to compare strings with numbers, emulate that. if PY2 and not orderable_types(field_value, value): raise TypeError return field_value > value @false_on_raise def filter_gte(model, field, value): """*Greater or equal than* filter""" field_value = get_field(model, field) # Python3 does not allow to compare strings with numbers, emulate that. if PY2 and not orderable_types(field_value, value): raise TypeError return field_value >= value @false_on_raise def filter_not(model, expr): """*Not* filter.""" return not filter_matches(model, expr) @false_on_raise def filter_and(model, *expressions): """*And* filter. Accepts any number of expressions.""" return all(filter_matches(model, expr) for expr in expressions) @false_on_raise def filter_or(model, *expressions): """*Or* filter. Accepts any number of expressions.""" return any(filter_matches(model, expr) for expr in expressions) operator_functions = { '==': filter_eq, '<': filter_lt, '<=': filter_lte, '>': filter_gt, '>=': filter_gte, 'and': filter_and, 'or': filter_or, 'not': filter_not, } def filter_matches(model, filter): """Returns true if a model matches the provided filter.""" op = filter[0] args = filter[1:] return operator_functions[op](model, *args) class FilterDealer(Dealer): """This dealer matches deltas against filters specified in subscription queries.""" __slots__ = ('filters_by_item',) def __init__(self): super(FilterDealer, self).__init__() self.filters_by_item = {} def add_subscription_item(self, item): """Adds a new subscription item. The query must be a filter.""" if not isinstance(item.query, list): raise BadQuery filter = item.query self.filters_by_item[item] = filter def remove_subscription_item(self, item): """Removes a subscription item.""" del self.filters_by_item[item] def get_subscription_items_for_model(self, model): """Returns the subscription items matching a filter.""" for subscription_item, filter in items(self.filters_by_item): if filter_matches(model, filter): yield subscription_item