Source code for snorky.services.datasync.backend

# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.

from snorky.services.base import RPCService, RPCError, rpc_command
from snorky.services.datasync.managers.dealer import DealerManager
from snorky.services.datasync.managers.subscription import SubscriptionManager
from snorky.services.datasync.subscription import \
        Subscription, SubscriptionItem
from snorky.services.datasync.delta import \
        InsertionDelta, UpdateDelta, DeletionDelta
from snorky.timeout import TornadoTimeoutFactory
from snorky.log import snorky_log
from snorky.types import is_string
import functools


class DataSyncBackend(RPCService):
    def __init__(self, name, frontend, timeout_interval=120,
                 timeout_factory=None):
        super(DataSyncBackend, self).__init__(name)

        self.frontend = frontend
        self.timeout_interval = timeout_interval
        self.timeout_factory = timeout_factory or TornadoTimeoutFactory

    @rpc_command
[docs] def authorizeSubscription(self, req, items): """Requests a subscription authorization token. :param dict items: Subscription items to be authorized. Each item must be a dictionary with two properties: ``dealer`` and ``query``. """ obj_items = [] try: for item in items: dealer_name = item["dealer"] if not is_string(dealer_name): raise RPCError("dealer should be a dealer name") elif dealer_name not in self.frontend.dm.dealers_by_name: raise RPCError("No such dealer") obj_item = SubscriptionItem(dealer_name, item["query"]) obj_items.append(obj_item) except KeyError as field: raise RPCError("Missing field %s" % field) subscription = Subscription(obj_items, self.frontend) token = self.frontend.sm.register_subscription(subscription) self.frontend.dm.connect_subscription(subscription) subscription._awaited_client_timeout = self.timeout_factory.call_later( self.timeout_interval, self.frontend.do_cancel_subscription, subscription) return token
@rpc_command
[docs] def publishDeltas(self, req, deltas): """Distributes one or more deltas to the appropriate dealers which, in turn, will distribute them to browser clients. :param list deltas: A list of deltas represented as dictionaries. """ # deltas = [{ # "type": "insert", # "model": "player", # "data": { "color": "blue" } # }] obj_deltas = [] try: for delta in deltas: snorky_log.info(delta) delta_type = delta["type"] model = delta["model"] if not is_string(model): raise RPCError("model must be string") if delta_type == "insert": obj_delta = InsertionDelta(model, delta["data"]) elif delta_type == "delete": obj_delta = DeletionDelta(model, delta["data"]) elif delta_type == "update": obj_delta = UpdateDelta(model, delta["newData"], delta["oldData"]) else: raise RPCError("Invalid delta type") obj_deltas.append(obj_delta) except KeyError: raise RPCError("Missing field") for delta_obj in obj_deltas: self.frontend.dm.deliver_delta(delta_obj)