// Copyright 2015 Google Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

'use strict';

// BatchQueue
//
// Enqueue items and flush them in batch (using setTimeout of 0).

var BatchQueue = function(flushFunc, maxBatchSize)
{
	maxBatchSize = maxBatchSize || 0;

	var queue = [];

	// Flush queue func.
	var flushQueue = function()
	{
		var batchSize	= (maxBatchSize > 0) ? Math.min(maxBatchSize, queue.length) : queue.length;
		var batchItems	= queue.slice(0, batchSize);

		flushFunc(batchItems);

		queue = queue.slice(batchSize);
		if (queue.length !== 0)
			setTimeout(flushQueue, 0);
	};

	this.enqueue = function(item)
	{
		queue.push(item);
		if (queue.length === 1)
			setTimeout(flushQueue, 0);
	};
};

// Services

angular.module('cherry.services', [])

.value('version', '0.0.1')

// WebSockets

.factory('socket', ['$rootScope', '$location', function ($rootScope, $location)
{
	var url = "ws://" + $location.host() + ":" + $location.port() + "/ws";
	var socket = new WebSocket(url);

	var sendQueue		= [];
	var eventHandlers	= {};
	var msgHandlers		= [];
	var statusHandlers	= [];

	var flushInMsgs = function(items)
	{
		// \note scope.$apply is wrapped in setTimeout to avoid calling it while angular's digest is already active.
		setTimeout(function()
		{
			$rootScope.$apply(function ()
			{
				for (var msgNdx = 0; msgNdx < items.length; msgNdx++)
				{
					var msgArgs = items[msgNdx];
					for (var ndx = 0; ndx < msgHandlers.length; ndx++)
						msgHandlers[ndx].call(null, socket, msgArgs); // \todo [petri] this?
				}
			});
		}, 0);
	};
	var inMsgQueue = new BatchQueue(flushInMsgs, 100);

	socket.onopen = function(ev)
	{
		console.log('[socket] connected (' + sendQueue.length + ' queued)!');
		for (var ndx = 0; ndx < sendQueue.length; ndx++)
			socket.send(sendQueue[ndx]);
		sendQueue = [];

		// \note scope.$apply is wrapped in setTimeout to avoid calling it while angular's digest is already active.
		setTimeout(function()
		{
			$rootScope.$apply(function ()
			{
				for (var ndx = 0; ndx < statusHandlers.length; ndx++)
					statusHandlers[ndx]('connected');
			});
		}, 0);
	};

	socket.onclose = function(ev)
	{
		console.log('[socket] disconnected -- reload!');

		// \note scope.$apply is wrapped in setTimeout to avoid calling it while angular's digest is already active.
		setTimeout(function()
		{
			$rootScope.$apply(function ()
			{
				for (var ndx = 0; ndx < statusHandlers.length; ndx++)
					statusHandlers[ndx]('disconnected');
			});
		}, 0);
	};

	socket.onmessage = function(ev)
	{
		// Push to queue.
		var args = JSON.parse(ev.data);
		inMsgQueue.enqueue(args);
	};

	return {
		onmessage: function (callback)
		{
			msgHandlers.push(callback);
		},

		onStatus: function (callback)
		{
			statusHandlers.push(callback);
		},

		on: function (eventName, callback)
		{
			console.log('[socket] register handler for ' + eventName);
			if (eventHandlers.hasOwnProperty(eventName))
				eventHandlers[eventName].push(callback);
			else
				eventHandlers[eventName] = [callback];
		},

		emit: function (data)
		{
			var msg = JSON.stringify(data);
			if (socket.readyState === WebSocket.OPEN)
				socket.send(msg);
			else
				sendQueue.push(msg);
		}
	};
}])

.factory('rpc', ['$q', 'socket', 'usSpinnerService', function($q, socket, usSpinnerService)
{
	var nextRequestId		= 1;
	var pendingRequests		= {};
	var spinnerSpinning		= false;
	var spinnerStartIssued	= false;
	var modules				= {};

	var updateSpinner = function()
	{
		var shouldBeSpinning = !_.isEmpty(pendingRequests);

		// \todo [petri] use data binding instead of explicit calls?

		if (spinnerSpinning && !shouldBeSpinning)
		{
			spinnerSpinning = false;
			usSpinnerService.stop('rpc-spinner');
		}
		else if (!spinnerSpinning && shouldBeSpinning)
		{
			// Delay spinner start to avoid costly relayout for short rpc queries
			if (!spinnerStartIssued)
			{
				spinnerStartIssued = true;
				setTimeout(function()
				{
					spinnerStartIssued = false;

					var shouldBeSpinning = !_.isEmpty(pendingRequests);
					if (shouldBeSpinning)
					{
						spinnerSpinning = true;
						usSpinnerService.spin('rpc-spinner');
					}
				}, 300);
			}
		}
	};

	var call = function(method)
	{
		var requestId = nextRequestId++;
		var params = Array.prototype.slice.call(arguments, 1)
		console.log('[rpc] call ' + method + ': ' + debugStr(JSON.stringify(params)));
		socket.emit({ jsonrpc:"2.0", id:requestId, method:method, params:params });

		// \todo [petri] possible race condition?
		var deferred = $q.defer();
		pendingRequests[requestId] = deferred;

		updateSpinner();

		return deferred.promise;
	};

	var cast = function(method)
	{
		var requestId = nextRequestId++;
		var params = Array.prototype.slice.call(arguments, 1)
		socket.emit({ jsonrpc:"2.0", id:requestId, method:method, params:params });
		pendingRequests[requestId] = $q.defer(); // not used!
		// \todo [petri] don't create deferred object?

		updateSpinner();
	};

	socket.onmessage(function(socket, args)
	{
		if (args.hasOwnProperty('jsonrpc'))
		{
			if (args.hasOwnProperty('method')) // rpc call by server
			{
				var methodParts	= args.method.split('.'); // \todo [petri] assert two parts?
				var moduleName	= methodParts[0];
				var funcName	= methodParts[1];
				if (modules.hasOwnProperty(moduleName))
				{
					var module = modules[moduleName];
					if (module.hasOwnProperty(funcName))
					{
						var func = module[funcName];
						func.apply(module, args.params);
					}
					else
					{
						// \todo [petri] proper error handling
						console.log('[rpc] ERROR: unknown func ' + funcName + ' in module ' + moduleName);
					}
				}
				else
				{
					// \todo [petri] proper error handling
					console.log('[rpc] ERROR: unknown module ' + moduleName);
				}
			}
			else // result from an rpc call
			{
				console.log('[rpc] result: ' + debugStr(JSON.stringify(args)));
				var requestId	= args.id;
				var deferred	= pendingRequests[requestId];
				delete pendingRequests[requestId];

				if (args.hasOwnProperty('result'))
					deferred.resolve(args.result);
				else
					deferred.reject(args.error);

				updateSpinner();
			}
		}
	});

	var register = function(name, module)
	{
		modules[name] = module;
	};

	return {
		call:		call,
		cast:		cast,
		register:	register
	};
}])

.factory('rtdb', ['socket', 'rpc', function(socket, rpc)
{
	console.log('[rtdb] init');
	var subscribedObjects	= {}; // objectKey(objType, objId): {count:<int>, obj:<obj>}

	var ListenerSet = function()
	{
		var nextId			= 1;
		var listeners		= {}
		var idTypeCallbacks	= {} // callbacks[objId][objType] :: List(callbacks)

		this.addListener = function(objId, objType, callback)
		{
			var listenerId	= nextId;
			nextId += 1;

			listeners[listenerId] =
			{
				listenerId:	listenerId,
				objType:	objType,
				objId:		objId,
				callback:	callback
			};

			if (!idTypeCallbacks.hasOwnProperty(objId))
				idTypeCallbacks[objId] = {};

			if (!idTypeCallbacks[objId].hasOwnProperty(objType))
				idTypeCallbacks[objId][objType] = [];

			idTypeCallbacks[objId][objType].push(callback);

			return listenerId;
		}

		this.removeListener = function(listenerId)
		{
			var listener	= listeners[listenerId];
			var objType		= listener.objType;
			var objId		= listener.objId;
			var callback	= listener.callback;
			var callbacks	= idTypeCallbacks[objId][objType];

			// swap with last and pop
			for (var ndx = 0; ndx < callbacks.length; ++ndx)
			{
				if (callbacks[ndx] === callback)
				{
					callbacks[ndx] = callbacks[callbacks.length - 1];
					callbacks.pop();

					if (callbacks.length === 0)
					{
						delete idTypeCallbacks[objId][objType];

						if (Object.keys(idTypeCallbacks[objId]).length === 0)
							delete idTypeCallbacks[objId];
					}
					break;
				}
			}
			delete listeners[listenerId];
		}

		this.getListenerById = function(listenerId)
		{
			return listeners[listenerId];
		}

		this.notifyUpdate = function(objId, objType, objValue)
		{
			if (!idTypeCallbacks.hasOwnProperty(objId))
				return;

			if (!idTypeCallbacks[objId].hasOwnProperty(objType))
				return;

			var callbacks = idTypeCallbacks[objId][objType];
			for (var ndx = 0; ndx < callbacks.length; ++ndx)
				callbacks[ndx](objValue);
		}
	};
	var listeners = new ListenerSet();

	var objectKey = function(objType, objId)
	{
		return objType + "," + objId;
	};

	var flushRTDB = function(items)
	{
		if (items.length !== 1)
			throw new Error("flushRTDB: items.length must be 1");

		var item = items[0];
		rpc.cast(item.method, item.args);
	};
	var rtdbQueue = new BatchQueue(flushRTDB, 1);

	// \note The same queue is used for subscriptions and unsubscriptions, and their
	//		 respective order is preserved. We want to keep them in sync so that the
	//		 server doesn't e.g. think that we have subscribed twice to the same
	//		 object. With separate subscribe and unsubscribe queues that could happen
	//		 if we first subscribed to an object, then unsubscribed and re-subscribed
	//		 the same object, all three operations done between two consecutive
	//		 flushes of the subscribe queue (so two subscribes to the same object
	//		 would end up in the same flush of the subscribe queue).
	var flushSubscriptionChanges = function(items)
	{
		var maxSubscriptionBatchSize = 100;
		var batchEndNdx;

		// Split the subscribe/unsubscribe operations to successive batches, such that:
		// - each batch contains only subscribes or only unsubscribes
		// - if subscribe S is before unsubscribe U in items, the batch containing
		//   S will go before U. And vice versa.
		// - subscription batches are no bigger than maxSubscriptionBatchSize
		for (var batchBeginNdx = 0; batchBeginNdx < items.length; batchBeginNdx = batchEndNdx)
		{
			for (batchEndNdx = batchBeginNdx+1; batchEndNdx < items.length; batchEndNdx++)
			{
				if (items[batchEndNdx].type !== items[batchBeginNdx].type)
					break;
				if (items[batchEndNdx].type === 'subscribe' && batchEndNdx-batchBeginNdx >= maxSubscriptionBatchSize)
					break;
			}

			var batchObjects = _.map(items.slice(batchBeginNdx, batchEndNdx), function(item) { return item.object; });

			if (items[batchBeginNdx].type === 'subscribe')
			{
				console.log('[rtdb] subscribe ' + batchObjects.length + ' objects from server; ' + JSON.stringify(batchObjects));
				rtdbQueue.enqueue({ method:'rtdb.Subscribe', args:{ objects:batchObjects } });
			}
			else if (items[batchBeginNdx].type === 'unsubscribe')
			{
				console.log('[rtdb] unsubscribe ' + batchObjects.length + ' objects from server');
				rtdbQueue.enqueue({ method:'rtdb.Unsubscribe', args:{ objects:batchObjects } });
			}
			else
				throw new Error('Invalid subscription change type');
		}
	};
	var subscriptionChangeQueue		= new BatchQueue(flushSubscriptionChanges);
	var enqueueSubscribe			= function(object) { subscriptionChangeQueue.enqueue({ type:'subscribe',	object:object }); };
	var enqueueUnsubscribe			= function(object) { subscriptionChangeQueue.enqueue({ type:'unsubscribe',	object:object }); };

	var flushVersionViewedObjectGets = function(items)
	{
		var views			= [];
		var itemsPerViewId	= {};

		for (var ndx = 0; ndx < items.length; ndx++)
		{
			var view = items[ndx].view;
			if (!itemsPerViewId.hasOwnProperty(view.id))
			{
				views.push(view);
				itemsPerViewId[view.id] = [];
			}
			itemsPerViewId[view.id].push(items[ndx]);
		}

		for (var viewNdx = 0; viewNdx < views.length; viewNdx++)
		{
			// Get the objects in successive batches, such that each batch waits for the
			// completion of the previous one before being issued. This makes it possible
			// to cancel the object-getting when the version view is released while some
			// objects are still pending (happens e.g. when user changes to another page
			// before objects are all loaded), so that we don't try to get objects from a
			// released version view.

			var getBatch = function(view, startNdx)
			{
				var maxBatchSize = 1000;

				if (view.id !== undefined) // view.id will be undefined when the view is released
				{
					var viewItems = itemsPerViewId[view.id];

					if (startNdx < viewItems.length)
					{
						var batchSize		= Math.min(maxBatchSize, viewItems.length - startNdx);
						var batchItems		= viewItems.slice(startNdx, startNdx + batchSize);
						var batchObjects	= _.map(batchItems, function(item) { return { objType: item.objType, objId: item.objId }; });

						rpc.call('rtdb.GetVersionViewedObjects', {
							viewId:		view.id,
							objects:	batchObjects
						})
						.then(function(objects)
						{
							if (objects.length !== batchSize)
								throw new Error('flushVersionViewedObjectGets callback: got ' + objects.length + ' objects, expected ' + batchSize);
							for (var ndx = 0; ndx < objects.length; ndx++)
								batchItems[ndx].onUpdate(objects[ndx]);
							getBatch(view, startNdx + maxBatchSize);
						});
					}
				}
			};

			getBatch(views[viewNdx], 0);
		}
	};
	var versionViewedObjectGetQueue = new BatchQueue(flushVersionViewedObjectGets);

	var rpcHandler =
	{
		InitObjects: function(args)
		{
			console.log('[rtdb] InitObjects: ' + args.length + ' objects');
			for (var objNdx = 0; objNdx < args.length; objNdx++)
			{
				var obj		= args[objNdx];
				var objType	= obj.type;
				var objId	= obj.id;
				var value	= obj.value;
				console.log('[rtdb] init: ' + objType + ' ' + objId + ': ' + debugStr(value));

				if (objectop.opHandlers.hasOwnProperty(objType) && objectop.opHandlers[objType].$postInitialize)
					objectop.opHandlers[objType].$postInitialize(value);

				var obj = {
					objType:	objType,
					objId:		objId,
					value:		value,
					ops:		[]
				};

				var objKey = objectKey(objType, objId);

				// Store copy of value.
				if (subscribedObjects.hasOwnProperty(objKey))
					subscribedObjects[objKey].obj = obj;

				// Send notification to all listeners.
				listeners.notifyUpdate(objId, objType, obj.value);
			}
		},

		UpdateObjects: function(args)
		{
			var changes = args.changes;
//			console.log('[rtdb] UpdateObjects: ' + changes.length + ' objects');
//			console.log('[rtdb] UpdateObjects: ' + _.map(changes, function(change) { return "'" + change.objId + "'"; }).join(', '));

			// Process all changed objects.
			for (var updateNdx = 0; updateNdx < changes.length; updateNdx++)
			{
				var change	= changes[updateNdx];
				var objId	= change.objId;
				var objType	= change.objType;
				var objKey	= objectKey(objType, objId);
				var ops		= change.ops;

				if (subscribedObjects.hasOwnProperty(objKey))
				{
					if (subscribedObjects[objKey].obj === undefined)
						console.log('[rtdb] update received before init for ' + objKey);
					else
					{
						// Apply ops to cached value.
						var obj = subscribedObjects[objKey].obj;
						for (var ndx = 0; ndx < ops.length; ndx++)
						{
							var op = ops[ndx];
							if (!objectop.opHandlers.hasOwnProperty(objType))
								throw new Error('[op] invalid object type (module): ' + objType);
							var module = objectop.opHandlers[objType];
							if (!module.hasOwnProperty(op.method))
								throw new Error('[op] object type "' + objType + '" has no method "' + op.method + '"');
							var handler = module[op.method];
							var args	= [obj.value].concat(op.args);
							handler.apply(null, args);
						}

						// Notify all listeners.
						listeners.notifyUpdate(objId, objType, obj.value);
					}
				}
				else
					console.log('[rtdb] got update to unsubscribed object ' + objKey);
			}
		}
	};

	rpc.register('rtdb', rpcHandler);

	var subscribe = function(objType, objId, callback)
	{
		var listenerId	= listeners.addListener(objId, objType, callback)
		var objKey		= objectKey(objType, objId);

		// Subscribe from cache or server.
		if (subscribedObjects.hasOwnProperty(objKey))
		{
			var subscribed = subscribedObjects[objKey];
			subscribed.count++;
			if (subscribed.obj !== undefined)
				callback(subscribed.obj.value);
		}
		else
		{
			// Push subscribe into queue (perform multiple subscribes in single RPC op).
			subscribedObjects[objKey] = { count:1, obj:undefined };
			enqueueSubscribe({ objType:objType, objId:objId });
		}

		return listenerId;
	};

	var unsubscribe = function(listenerId)
	{
		var listener	= listeners.getListenerById(listenerId);
		var objType		= listener.objType;
		var objId		= listener.objId;
		var objKey		= objectKey(objType, objId);

		var subscribed = subscribedObjects[objKey];
		subscribed.count--;
		if (subscribed.count === 0)
		{
			delete subscribedObjects[objKey];
			enqueueUnsubscribe({objType: objType, objId: objId});
		}

		listeners.removeListener(listenerId);
	};

	var getVersionViewedObject = function(view, objType, objId, onUpdate)
	{
		versionViewedObjectGetQueue.enqueue({ view: view, objType: objType, objId: objId, onUpdate: onUpdate });
	};

	var bind = function(objType, objId, scope, options)
	{
		options = options || {};
		var valueName = options.valueName || 'value';

		var initial = {};
		initial[valueName] = null;
		angular.extend(scope, initial);

		var onUpdate = function(obj)
		{
			scope[valueName] = obj;

			if (options.onUpdate)
				options.onUpdate(objType, objId, obj);
		};

		if (scope.rtdbVersionView && scope.rtdbVersionView.id !== undefined)
		{
			// Version-viewed bind. Just get the version-viewed object once and set the value.
			getVersionViewedObject(scope.rtdbVersionView, objType, objId,
				function(obj)
				{
					if (objectop.opHandlers.hasOwnProperty(objType) && objectop.opHandlers[objType].$postInitialize)
						objectop.opHandlers[objType].$postInitialize(obj);
					onUpdate(obj);
				});
		}
		else
		{
			// Normal (real-time version) bind. Subscribe to object.

			var listener = subscribe(objType, objId, onUpdate);

			scope.$on('$destroy', function()
			{
				unsubscribe(listener);
			});
		}
	};

	var prefetch = function(objType, objId, scope)
	{
		var listener = subscribe(objType, objId, function(obj) { /*nada*/ });

		scope.$on('$destroy', function()
		{
			unsubscribe(listener);
		});
	};

	return {
		subscribe:		subscribe,
		unsubscribe:	unsubscribe,
		bind:			bind,
		prefetch:		prefetch
	};
}])

;
