blob: 08cd25aa4b7d917998242867c2ea65f9cacbed78 [file] [log] [blame]
// Copyright 2015 Google Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
'use strict';
// BatchQueue
//
// Enqueue items and flush them in batch (using setTimeout of 0).
var BatchQueue = function(flushFunc, maxBatchSize)
{
maxBatchSize = maxBatchSize || 0;
var queue = [];
// Flush queue func.
var flushQueue = function()
{
var batchSize = (maxBatchSize > 0) ? Math.min(maxBatchSize, queue.length) : queue.length;
var batchItems = queue.slice(0, batchSize);
flushFunc(batchItems);
queue = queue.slice(batchSize);
if (queue.length !== 0)
setTimeout(flushQueue, 0);
};
this.enqueue = function(item)
{
queue.push(item);
if (queue.length === 1)
setTimeout(flushQueue, 0);
};
};
// Services
angular.module('cherry.services', [])
.value('version', '0.0.1')
// WebSockets
.factory('socket', ['$rootScope', '$location', function ($rootScope, $location)
{
var url = "ws://" + $location.host() + ":" + $location.port() + "/ws";
var socket = new WebSocket(url);
var sendQueue = [];
var eventHandlers = {};
var msgHandlers = [];
var statusHandlers = [];
var flushInMsgs = function(items)
{
// \note scope.$apply is wrapped in setTimeout to avoid calling it while angular's digest is already active.
setTimeout(function()
{
$rootScope.$apply(function ()
{
for (var msgNdx = 0; msgNdx < items.length; msgNdx++)
{
var msgArgs = items[msgNdx];
for (var ndx = 0; ndx < msgHandlers.length; ndx++)
msgHandlers[ndx].call(null, socket, msgArgs); // \todo [petri] this?
}
});
}, 0);
};
var inMsgQueue = new BatchQueue(flushInMsgs, 100);
socket.onopen = function(ev)
{
console.log('[socket] connected (' + sendQueue.length + ' queued)!');
for (var ndx = 0; ndx < sendQueue.length; ndx++)
socket.send(sendQueue[ndx]);
sendQueue = [];
// \note scope.$apply is wrapped in setTimeout to avoid calling it while angular's digest is already active.
setTimeout(function()
{
$rootScope.$apply(function ()
{
for (var ndx = 0; ndx < statusHandlers.length; ndx++)
statusHandlers[ndx]('connected');
});
}, 0);
};
socket.onclose = function(ev)
{
console.log('[socket] disconnected -- reload!');
// \note scope.$apply is wrapped in setTimeout to avoid calling it while angular's digest is already active.
setTimeout(function()
{
$rootScope.$apply(function ()
{
for (var ndx = 0; ndx < statusHandlers.length; ndx++)
statusHandlers[ndx]('disconnected');
});
}, 0);
};
socket.onmessage = function(ev)
{
// Push to queue.
var args = JSON.parse(ev.data);
inMsgQueue.enqueue(args);
};
return {
onmessage: function (callback)
{
msgHandlers.push(callback);
},
onStatus: function (callback)
{
statusHandlers.push(callback);
},
on: function (eventName, callback)
{
console.log('[socket] register handler for ' + eventName);
if (eventHandlers.hasOwnProperty(eventName))
eventHandlers[eventName].push(callback);
else
eventHandlers[eventName] = [callback];
},
emit: function (data)
{
var msg = JSON.stringify(data);
if (socket.readyState === WebSocket.OPEN)
socket.send(msg);
else
sendQueue.push(msg);
}
};
}])
.factory('rpc', ['$q', 'socket', 'usSpinnerService', function($q, socket, usSpinnerService)
{
var nextRequestId = 1;
var pendingRequests = {};
var spinnerSpinning = false;
var spinnerStartIssued = false;
var modules = {};
var updateSpinner = function()
{
var shouldBeSpinning = !_.isEmpty(pendingRequests);
// \todo [petri] use data binding instead of explicit calls?
if (spinnerSpinning && !shouldBeSpinning)
{
spinnerSpinning = false;
usSpinnerService.stop('rpc-spinner');
}
else if (!spinnerSpinning && shouldBeSpinning)
{
// Delay spinner start to avoid costly relayout for short rpc queries
if (!spinnerStartIssued)
{
spinnerStartIssued = true;
setTimeout(function()
{
spinnerStartIssued = false;
var shouldBeSpinning = !_.isEmpty(pendingRequests);
if (shouldBeSpinning)
{
spinnerSpinning = true;
usSpinnerService.spin('rpc-spinner');
}
}, 300);
}
}
};
var call = function(method)
{
var requestId = nextRequestId++;
var params = Array.prototype.slice.call(arguments, 1)
console.log('[rpc] call ' + method + ': ' + debugStr(JSON.stringify(params)));
socket.emit({ jsonrpc:"2.0", id:requestId, method:method, params:params });
// \todo [petri] possible race condition?
var deferred = $q.defer();
pendingRequests[requestId] = deferred;
updateSpinner();
return deferred.promise;
};
var cast = function(method)
{
var requestId = nextRequestId++;
var params = Array.prototype.slice.call(arguments, 1)
socket.emit({ jsonrpc:"2.0", id:requestId, method:method, params:params });
pendingRequests[requestId] = $q.defer(); // not used!
// \todo [petri] don't create deferred object?
updateSpinner();
};
socket.onmessage(function(socket, args)
{
if (args.hasOwnProperty('jsonrpc'))
{
if (args.hasOwnProperty('method')) // rpc call by server
{
var methodParts = args.method.split('.'); // \todo [petri] assert two parts?
var moduleName = methodParts[0];
var funcName = methodParts[1];
if (modules.hasOwnProperty(moduleName))
{
var module = modules[moduleName];
if (module.hasOwnProperty(funcName))
{
var func = module[funcName];
func.apply(module, args.params);
}
else
{
// \todo [petri] proper error handling
console.log('[rpc] ERROR: unknown func ' + funcName + ' in module ' + moduleName);
}
}
else
{
// \todo [petri] proper error handling
console.log('[rpc] ERROR: unknown module ' + moduleName);
}
}
else // result from an rpc call
{
console.log('[rpc] result: ' + debugStr(JSON.stringify(args)));
var requestId = args.id;
var deferred = pendingRequests[requestId];
delete pendingRequests[requestId];
if (args.hasOwnProperty('result'))
deferred.resolve(args.result);
else
deferred.reject(args.error);
updateSpinner();
}
}
});
var register = function(name, module)
{
modules[name] = module;
};
return {
call: call,
cast: cast,
register: register
};
}])
.factory('rtdb', ['socket', 'rpc', function(socket, rpc)
{
console.log('[rtdb] init');
var subscribedObjects = {}; // objectKey(objType, objId): {count:<int>, obj:<obj>}
var ListenerSet = function()
{
var nextId = 1;
var listeners = {}
var idTypeCallbacks = {} // callbacks[objId][objType] :: List(callbacks)
this.addListener = function(objId, objType, callback)
{
var listenerId = nextId;
nextId += 1;
listeners[listenerId] =
{
listenerId: listenerId,
objType: objType,
objId: objId,
callback: callback
};
if (!idTypeCallbacks.hasOwnProperty(objId))
idTypeCallbacks[objId] = {};
if (!idTypeCallbacks[objId].hasOwnProperty(objType))
idTypeCallbacks[objId][objType] = [];
idTypeCallbacks[objId][objType].push(callback);
return listenerId;
}
this.removeListener = function(listenerId)
{
var listener = listeners[listenerId];
var objType = listener.objType;
var objId = listener.objId;
var callback = listener.callback;
var callbacks = idTypeCallbacks[objId][objType];
// swap with last and pop
for (var ndx = 0; ndx < callbacks.length; ++ndx)
{
if (callbacks[ndx] === callback)
{
callbacks[ndx] = callbacks[callbacks.length - 1];
callbacks.pop();
if (callbacks.length === 0)
{
delete idTypeCallbacks[objId][objType];
if (Object.keys(idTypeCallbacks[objId]).length === 0)
delete idTypeCallbacks[objId];
}
break;
}
}
delete listeners[listenerId];
}
this.getListenerById = function(listenerId)
{
return listeners[listenerId];
}
this.notifyUpdate = function(objId, objType, objValue)
{
if (!idTypeCallbacks.hasOwnProperty(objId))
return;
if (!idTypeCallbacks[objId].hasOwnProperty(objType))
return;
var callbacks = idTypeCallbacks[objId][objType];
for (var ndx = 0; ndx < callbacks.length; ++ndx)
callbacks[ndx](objValue);
}
};
var listeners = new ListenerSet();
var objectKey = function(objType, objId)
{
return objType + "," + objId;
};
var flushRTDB = function(items)
{
if (items.length !== 1)
throw new Error("flushRTDB: items.length must be 1");
var item = items[0];
rpc.cast(item.method, item.args);
};
var rtdbQueue = new BatchQueue(flushRTDB, 1);
// \note The same queue is used for subscriptions and unsubscriptions, and their
// respective order is preserved. We want to keep them in sync so that the
// server doesn't e.g. think that we have subscribed twice to the same
// object. With separate subscribe and unsubscribe queues that could happen
// if we first subscribed to an object, then unsubscribed and re-subscribed
// the same object, all three operations done between two consecutive
// flushes of the subscribe queue (so two subscribes to the same object
// would end up in the same flush of the subscribe queue).
var flushSubscriptionChanges = function(items)
{
var maxSubscriptionBatchSize = 100;
var batchEndNdx;
// Split the subscribe/unsubscribe operations to successive batches, such that:
// - each batch contains only subscribes or only unsubscribes
// - if subscribe S is before unsubscribe U in items, the batch containing
// S will go before U. And vice versa.
// - subscription batches are no bigger than maxSubscriptionBatchSize
for (var batchBeginNdx = 0; batchBeginNdx < items.length; batchBeginNdx = batchEndNdx)
{
for (batchEndNdx = batchBeginNdx+1; batchEndNdx < items.length; batchEndNdx++)
{
if (items[batchEndNdx].type !== items[batchBeginNdx].type)
break;
if (items[batchEndNdx].type === 'subscribe' && batchEndNdx-batchBeginNdx >= maxSubscriptionBatchSize)
break;
}
var batchObjects = _.map(items.slice(batchBeginNdx, batchEndNdx), function(item) { return item.object; });
if (items[batchBeginNdx].type === 'subscribe')
{
console.log('[rtdb] subscribe ' + batchObjects.length + ' objects from server; ' + JSON.stringify(batchObjects));
rtdbQueue.enqueue({ method:'rtdb.Subscribe', args:{ objects:batchObjects } });
}
else if (items[batchBeginNdx].type === 'unsubscribe')
{
console.log('[rtdb] unsubscribe ' + batchObjects.length + ' objects from server');
rtdbQueue.enqueue({ method:'rtdb.Unsubscribe', args:{ objects:batchObjects } });
}
else
throw new Error('Invalid subscription change type');
}
};
var subscriptionChangeQueue = new BatchQueue(flushSubscriptionChanges);
var enqueueSubscribe = function(object) { subscriptionChangeQueue.enqueue({ type:'subscribe', object:object }); };
var enqueueUnsubscribe = function(object) { subscriptionChangeQueue.enqueue({ type:'unsubscribe', object:object }); };
var flushVersionViewedObjectGets = function(items)
{
var views = [];
var itemsPerViewId = {};
for (var ndx = 0; ndx < items.length; ndx++)
{
var view = items[ndx].view;
if (!itemsPerViewId.hasOwnProperty(view.id))
{
views.push(view);
itemsPerViewId[view.id] = [];
}
itemsPerViewId[view.id].push(items[ndx]);
}
for (var viewNdx = 0; viewNdx < views.length; viewNdx++)
{
// Get the objects in successive batches, such that each batch waits for the
// completion of the previous one before being issued. This makes it possible
// to cancel the object-getting when the version view is released while some
// objects are still pending (happens e.g. when user changes to another page
// before objects are all loaded), so that we don't try to get objects from a
// released version view.
var getBatch = function(view, startNdx)
{
var maxBatchSize = 1000;
if (view.id !== undefined) // view.id will be undefined when the view is released
{
var viewItems = itemsPerViewId[view.id];
if (startNdx < viewItems.length)
{
var batchSize = Math.min(maxBatchSize, viewItems.length - startNdx);
var batchItems = viewItems.slice(startNdx, startNdx + batchSize);
var batchObjects = _.map(batchItems, function(item) { return { objType: item.objType, objId: item.objId }; });
rpc.call('rtdb.GetVersionViewedObjects', {
viewId: view.id,
objects: batchObjects
})
.then(function(objects)
{
if (objects.length !== batchSize)
throw new Error('flushVersionViewedObjectGets callback: got ' + objects.length + ' objects, expected ' + batchSize);
for (var ndx = 0; ndx < objects.length; ndx++)
batchItems[ndx].onUpdate(objects[ndx]);
getBatch(view, startNdx + maxBatchSize);
});
}
}
};
getBatch(views[viewNdx], 0);
}
};
var versionViewedObjectGetQueue = new BatchQueue(flushVersionViewedObjectGets);
var rpcHandler =
{
InitObjects: function(args)
{
console.log('[rtdb] InitObjects: ' + args.length + ' objects');
for (var objNdx = 0; objNdx < args.length; objNdx++)
{
var obj = args[objNdx];
var objType = obj.type;
var objId = obj.id;
var value = obj.value;
console.log('[rtdb] init: ' + objType + ' ' + objId + ': ' + debugStr(value));
if (objectop.opHandlers.hasOwnProperty(objType) && objectop.opHandlers[objType].$postInitialize)
objectop.opHandlers[objType].$postInitialize(value);
var obj = {
objType: objType,
objId: objId,
value: value,
ops: []
};
var objKey = objectKey(objType, objId);
// Store copy of value.
if (subscribedObjects.hasOwnProperty(objKey))
subscribedObjects[objKey].obj = obj;
// Send notification to all listeners.
listeners.notifyUpdate(objId, objType, obj.value);
}
},
UpdateObjects: function(args)
{
var changes = args.changes;
// console.log('[rtdb] UpdateObjects: ' + changes.length + ' objects');
// console.log('[rtdb] UpdateObjects: ' + _.map(changes, function(change) { return "'" + change.objId + "'"; }).join(', '));
// Process all changed objects.
for (var updateNdx = 0; updateNdx < changes.length; updateNdx++)
{
var change = changes[updateNdx];
var objId = change.objId;
var objType = change.objType;
var objKey = objectKey(objType, objId);
var ops = change.ops;
if (subscribedObjects.hasOwnProperty(objKey))
{
if (subscribedObjects[objKey].obj === undefined)
console.log('[rtdb] update received before init for ' + objKey);
else
{
// Apply ops to cached value.
var obj = subscribedObjects[objKey].obj;
for (var ndx = 0; ndx < ops.length; ndx++)
{
var op = ops[ndx];
if (!objectop.opHandlers.hasOwnProperty(objType))
throw new Error('[op] invalid object type (module): ' + objType);
var module = objectop.opHandlers[objType];
if (!module.hasOwnProperty(op.method))
throw new Error('[op] object type "' + objType + '" has no method "' + op.method + '"');
var handler = module[op.method];
var args = [obj.value].concat(op.args);
handler.apply(null, args);
}
// Notify all listeners.
listeners.notifyUpdate(objId, objType, obj.value);
}
}
else
console.log('[rtdb] got update to unsubscribed object ' + objKey);
}
}
};
rpc.register('rtdb', rpcHandler);
var subscribe = function(objType, objId, callback)
{
var listenerId = listeners.addListener(objId, objType, callback)
var objKey = objectKey(objType, objId);
// Subscribe from cache or server.
if (subscribedObjects.hasOwnProperty(objKey))
{
var subscribed = subscribedObjects[objKey];
subscribed.count++;
if (subscribed.obj !== undefined)
callback(subscribed.obj.value);
}
else
{
// Push subscribe into queue (perform multiple subscribes in single RPC op).
subscribedObjects[objKey] = { count:1, obj:undefined };
enqueueSubscribe({ objType:objType, objId:objId });
}
return listenerId;
};
var unsubscribe = function(listenerId)
{
var listener = listeners.getListenerById(listenerId);
var objType = listener.objType;
var objId = listener.objId;
var objKey = objectKey(objType, objId);
var subscribed = subscribedObjects[objKey];
subscribed.count--;
if (subscribed.count === 0)
{
delete subscribedObjects[objKey];
enqueueUnsubscribe({objType: objType, objId: objId});
}
listeners.removeListener(listenerId);
};
var getVersionViewedObject = function(view, objType, objId, onUpdate)
{
versionViewedObjectGetQueue.enqueue({ view: view, objType: objType, objId: objId, onUpdate: onUpdate });
};
var bind = function(objType, objId, scope, options)
{
options = options || {};
var valueName = options.valueName || 'value';
var initial = {};
initial[valueName] = null;
angular.extend(scope, initial);
var onUpdate = function(obj)
{
scope[valueName] = obj;
if (options.onUpdate)
options.onUpdate(objType, objId, obj);
};
if (scope.rtdbVersionView && scope.rtdbVersionView.id !== undefined)
{
// Version-viewed bind. Just get the version-viewed object once and set the value.
getVersionViewedObject(scope.rtdbVersionView, objType, objId,
function(obj)
{
if (objectop.opHandlers.hasOwnProperty(objType) && objectop.opHandlers[objType].$postInitialize)
objectop.opHandlers[objType].$postInitialize(obj);
onUpdate(obj);
});
}
else
{
// Normal (real-time version) bind. Subscribe to object.
var listener = subscribe(objType, objId, onUpdate);
scope.$on('$destroy', function()
{
unsubscribe(listener);
});
}
};
var prefetch = function(objType, objId, scope)
{
var listener = subscribe(objType, objId, function(obj) { /*nada*/ });
scope.$on('$destroy', function()
{
unsubscribe(listener);
});
};
return {
subscribe: subscribe,
unsubscribe: unsubscribe,
bind: bind,
prefetch: prefetch
};
}])
;