blob: ef928d1b85ce601c20b27de2aa3ac7f7f34986f0 [file] [log] [blame]
# Copyright 2015 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""A module for storing and getting objects from datastore.
This module provides Get, Set and Delete functions for storing pickleable
objects in datastore, with support for large objects greater than 1 MB.
Although this module contains ndb.Model classes, these are not intended
to be used directly by other modules.
App Engine datastore limits entity size to less than 1 MB; this module
supports storing larger objects by splitting the data and using multiple
datastore entities and multiple memcache keys. Using ndb.get and pickle, a
complex data structure can be retrieved more quickly than datastore fetch.
john = Account()
john.username = 'John'
john.userid = 123
stored_object.Set(john.userid, john)
import cPickle as pickle
import logging
from google.appengine.api import memcache
from google.appengine.ext import ndb
_MULTIPART_ENTITY_MEMCACHE_KEY = 'multipart_entity_'
# Maximum number of entities and memcache to save a value.
# The limit for data stored in one datastore entity is 1 MB,
# and the limit for memcache batch operations is 32 MB. See:
# Max bytes per entity or value cached with memcache.
_CHUNK_SIZE = 1000 * 1000
def Get(key):
"""Gets the value.
key: String key value.
A value for key.
results = MultipartCache.Get(key)
if not results:
results = _GetValueFromDatastore(key)
MultipartCache.Set(key, results)
return results
def Set(key, value):
"""Sets the value in datastore and memcache with limit of '_MAX_NUM_PARTS' MB.
key: String key value.
value: A pickleable value to be stored limited at '_MAX_NUM_PARTS' MB.
entity = ndb.Key(MultipartEntity, key).get()
if not entity:
entity = MultipartEntity(id=key)
MultipartCache.Set(key, value)
def Delete(key):
"""Deletes the value in datastore and memcache."""
ndb.Key(MultipartEntity, key).delete()
class MultipartEntity(ndb.Model):
"""Container for PartEntity."""
# Number of entities use to store serialized.
size = ndb.IntegerProperty(default=0, indexed=False)
def _post_get_hook(cls, key, future): # pylint: disable=unused-argument
"""Deserializes data from multiple PartEntity."""
entity = future.get_result()
if entity is None or not entity.size:
string_id = entity.key.string_id()
part_keys = [ndb.Key(MultipartEntity, string_id, PartEntity, i + 1)
for i in xrange(entity.size)]
part_entities = ndb.get_multi(part_keys)
serialized = ''.join(p.value for p in part_entities if p is not None)
def _pre_delete_hook(cls, key):
"""Deletes PartEntity entities."""
part_keys = PartEntity.query(ancestor=key).fetch(keys_only=True)
def Save(self):
"""Stores serialized data over multiple PartEntity."""
serialized_parts = _Serialize(self.GetData())
if len(serialized_parts) > _MAX_NUM_PARTS:
logging.error('Max number of parts reached.')
part_list = []
num_parts = len(serialized_parts)
for i in xrange(num_parts):
if serialized_parts[i] is not None:
part = PartEntity(id=i+1, parent=self.key, value=serialized_parts[i])
self.size = num_parts
ndb.put_multi(part_list + [self])
def GetData(self):
return getattr(self, '_data', None)
def SetData(self, data):
setattr(self, '_data', data)
class PartEntity(ndb.Model):
"""Holds a part of serialized data for MultipartEntity.
This entity key has the form:
ndb.Key('MultipartEntity', multipart_entity_id, 'PartEntity', part_index)
value = ndb.BlobProperty()
class MultipartCache(object):
"""Contains operations for storing values over multiple memcache keys.
Values are serialized, split, and stored over multiple memcache keys. The
head cache stores the expected size.
def Get(cls, key):
"""Gets value in memcache."""
keys = cls._GetCacheKeyList(key)
head_key = cls._GetCacheKey(key)
cache_values = memcache.get_multi(keys)
# Whether we have all the memcache values.
if len(keys) != len(cache_values) or head_key not in cache_values:
return None
serialized = ''
cache_size = cache_values[head_key]
for key in keys[:cache_size]:
if key not in cache_values:
return None
if cache_values[key] is not None:
serialized += cache_values[key]
return pickle.loads(serialized)
def Set(cls, key, value):
"""Sets a value in memcache."""
serialized_parts = _Serialize(value)
if len(serialized_parts) > _MAX_NUM_PARTS:
logging.error('Max number of parts reached.')
cached_values = {}
cached_values[cls._GetCacheKey(key)] = len(serialized_parts)
for i in xrange(len(serialized_parts)):
cached_values[cls._GetCacheKey(key, i)] = serialized_parts[i]
def Delete(cls, key):
"""Deletes all cached values for key."""
def _GetCacheKeyList(cls, key):
"""Gets a list of head cache key and cache key parts."""
keys = [cls._GetCacheKey(key, i) for i in xrange(_MAX_NUM_PARTS)]
return keys
def _GetCacheKey(cls, key, index=None):
"""Returns either head cache key or cache key part."""
if index is not None:
return _MULTIPART_ENTITY_MEMCACHE_KEY + '%s.%s' % (key, index)
def _GetValueFromDatastore(key):
entity = ndb.Key(MultipartEntity, key).get()
if not entity:
return None
return entity.GetData()
def _Serialize(value):
"""Serializes value and returns a list of its parts.
value: A pickleable value.
A list of string representation of the value that has been pickled and split
serialized = pickle.dumps(value, 2)
length = len(serialized)
values = []
for i in xrange(0, length, _CHUNK_SIZE):
for i in xrange(len(values), _MAX_NUM_PARTS):
return values