blob: 87bdbe48f8ec0f8c4200a30d890bb112e390e984 [file] [log] [blame]
from tests.compat import mock, unittest
from boto.dynamodb2 import exceptions
from boto.dynamodb2.fields import (HashKey, RangeKey,
AllIndex, KeysOnlyIndex, IncludeIndex,
GlobalAllIndex, GlobalKeysOnlyIndex,
GlobalIncludeIndex)
from boto.dynamodb2.items import Item
from boto.dynamodb2.layer1 import DynamoDBConnection
from boto.dynamodb2.results import ResultSet, BatchGetResultSet
from boto.dynamodb2.table import Table
from boto.dynamodb2.types import (STRING, NUMBER, BINARY,
FILTER_OPERATORS, QUERY_OPERATORS)
from boto.exception import JSONResponseError
from boto.compat import six, long_type
FakeDynamoDBConnection = mock.create_autospec(DynamoDBConnection)
class SchemaFieldsTestCase(unittest.TestCase):
def test_hash_key(self):
hash_key = HashKey('hello')
self.assertEqual(hash_key.name, 'hello')
self.assertEqual(hash_key.data_type, STRING)
self.assertEqual(hash_key.attr_type, 'HASH')
self.assertEqual(hash_key.definition(), {
'AttributeName': 'hello',
'AttributeType': 'S'
})
self.assertEqual(hash_key.schema(), {
'AttributeName': 'hello',
'KeyType': 'HASH'
})
def test_range_key(self):
range_key = RangeKey('hello')
self.assertEqual(range_key.name, 'hello')
self.assertEqual(range_key.data_type, STRING)
self.assertEqual(range_key.attr_type, 'RANGE')
self.assertEqual(range_key.definition(), {
'AttributeName': 'hello',
'AttributeType': 'S'
})
self.assertEqual(range_key.schema(), {
'AttributeName': 'hello',
'KeyType': 'RANGE'
})
def test_alternate_type(self):
alt_key = HashKey('alt', data_type=NUMBER)
self.assertEqual(alt_key.name, 'alt')
self.assertEqual(alt_key.data_type, NUMBER)
self.assertEqual(alt_key.attr_type, 'HASH')
self.assertEqual(alt_key.definition(), {
'AttributeName': 'alt',
'AttributeType': 'N'
})
self.assertEqual(alt_key.schema(), {
'AttributeName': 'alt',
'KeyType': 'HASH'
})
class IndexFieldTestCase(unittest.TestCase):
def test_all_index(self):
all_index = AllIndex('AllKeys', parts=[
HashKey('username'),
RangeKey('date_joined')
])
self.assertEqual(all_index.name, 'AllKeys')
self.assertEqual([part.attr_type for part in all_index.parts], [
'HASH',
'RANGE'
])
self.assertEqual(all_index.projection_type, 'ALL')
self.assertEqual(all_index.definition(), [
{'AttributeName': 'username', 'AttributeType': 'S'},
{'AttributeName': 'date_joined', 'AttributeType': 'S'}
])
self.assertEqual(all_index.schema(), {
'IndexName': 'AllKeys',
'KeySchema': [
{
'AttributeName': 'username',
'KeyType': 'HASH'
},
{
'AttributeName': 'date_joined',
'KeyType': 'RANGE'
}
],
'Projection': {
'ProjectionType': 'ALL'
}
})
def test_keys_only_index(self):
keys_only = KeysOnlyIndex('KeysOnly', parts=[
HashKey('username'),
RangeKey('date_joined')
])
self.assertEqual(keys_only.name, 'KeysOnly')
self.assertEqual([part.attr_type for part in keys_only.parts], [
'HASH',
'RANGE'
])
self.assertEqual(keys_only.projection_type, 'KEYS_ONLY')
self.assertEqual(keys_only.definition(), [
{'AttributeName': 'username', 'AttributeType': 'S'},
{'AttributeName': 'date_joined', 'AttributeType': 'S'}
])
self.assertEqual(keys_only.schema(), {
'IndexName': 'KeysOnly',
'KeySchema': [
{
'AttributeName': 'username',
'KeyType': 'HASH'
},
{
'AttributeName': 'date_joined',
'KeyType': 'RANGE'
}
],
'Projection': {
'ProjectionType': 'KEYS_ONLY'
}
})
def test_include_index(self):
include_index = IncludeIndex('IncludeKeys', parts=[
HashKey('username'),
RangeKey('date_joined')
], includes=[
'gender',
'friend_count'
])
self.assertEqual(include_index.name, 'IncludeKeys')
self.assertEqual([part.attr_type for part in include_index.parts], [
'HASH',
'RANGE'
])
self.assertEqual(include_index.projection_type, 'INCLUDE')
self.assertEqual(include_index.definition(), [
{'AttributeName': 'username', 'AttributeType': 'S'},
{'AttributeName': 'date_joined', 'AttributeType': 'S'}
])
self.assertEqual(include_index.schema(), {
'IndexName': 'IncludeKeys',
'KeySchema': [
{
'AttributeName': 'username',
'KeyType': 'HASH'
},
{
'AttributeName': 'date_joined',
'KeyType': 'RANGE'
}
],
'Projection': {
'ProjectionType': 'INCLUDE',
'NonKeyAttributes': [
'gender',
'friend_count',
]
}
})
def test_global_all_index(self):
all_index = GlobalAllIndex('AllKeys', parts=[
HashKey('username'),
RangeKey('date_joined')
],
throughput={
'read': 6,
'write': 2,
})
self.assertEqual(all_index.name, 'AllKeys')
self.assertEqual([part.attr_type for part in all_index.parts], [
'HASH',
'RANGE'
])
self.assertEqual(all_index.projection_type, 'ALL')
self.assertEqual(all_index.definition(), [
{'AttributeName': 'username', 'AttributeType': 'S'},
{'AttributeName': 'date_joined', 'AttributeType': 'S'}
])
self.assertEqual(all_index.schema(), {
'IndexName': 'AllKeys',
'KeySchema': [
{
'AttributeName': 'username',
'KeyType': 'HASH'
},
{
'AttributeName': 'date_joined',
'KeyType': 'RANGE'
}
],
'Projection': {
'ProjectionType': 'ALL'
},
'ProvisionedThroughput': {
'ReadCapacityUnits': 6,
'WriteCapacityUnits': 2
}
})
def test_global_keys_only_index(self):
keys_only = GlobalKeysOnlyIndex('KeysOnly', parts=[
HashKey('username'),
RangeKey('date_joined')
],
throughput={
'read': 3,
'write': 4,
})
self.assertEqual(keys_only.name, 'KeysOnly')
self.assertEqual([part.attr_type for part in keys_only.parts], [
'HASH',
'RANGE'
])
self.assertEqual(keys_only.projection_type, 'KEYS_ONLY')
self.assertEqual(keys_only.definition(), [
{'AttributeName': 'username', 'AttributeType': 'S'},
{'AttributeName': 'date_joined', 'AttributeType': 'S'}
])
self.assertEqual(keys_only.schema(), {
'IndexName': 'KeysOnly',
'KeySchema': [
{
'AttributeName': 'username',
'KeyType': 'HASH'
},
{
'AttributeName': 'date_joined',
'KeyType': 'RANGE'
}
],
'Projection': {
'ProjectionType': 'KEYS_ONLY'
},
'ProvisionedThroughput': {
'ReadCapacityUnits': 3,
'WriteCapacityUnits': 4
}
})
def test_global_include_index(self):
# Lean on the default throughput
include_index = GlobalIncludeIndex('IncludeKeys', parts=[
HashKey('username'),
RangeKey('date_joined')
], includes=[
'gender',
'friend_count'
])
self.assertEqual(include_index.name, 'IncludeKeys')
self.assertEqual([part.attr_type for part in include_index.parts], [
'HASH',
'RANGE'
])
self.assertEqual(include_index.projection_type, 'INCLUDE')
self.assertEqual(include_index.definition(), [
{'AttributeName': 'username', 'AttributeType': 'S'},
{'AttributeName': 'date_joined', 'AttributeType': 'S'}
])
self.assertEqual(include_index.schema(), {
'IndexName': 'IncludeKeys',
'KeySchema': [
{
'AttributeName': 'username',
'KeyType': 'HASH'
},
{
'AttributeName': 'date_joined',
'KeyType': 'RANGE'
}
],
'Projection': {
'ProjectionType': 'INCLUDE',
'NonKeyAttributes': [
'gender',
'friend_count',
]
},
'ProvisionedThroughput': {
'ReadCapacityUnits': 5,
'WriteCapacityUnits': 5
}
})
def test_global_include_index_throughput(self):
include_index = GlobalIncludeIndex('IncludeKeys', parts=[
HashKey('username'),
RangeKey('date_joined')
], includes=[
'gender',
'friend_count'
], throughput={
'read': 10,
'write': 8
})
self.assertEqual(include_index.schema(), {
'IndexName': 'IncludeKeys',
'KeySchema': [
{
'AttributeName': 'username',
'KeyType': 'HASH'
},
{
'AttributeName': 'date_joined',
'KeyType': 'RANGE'
}
],
'Projection': {
'ProjectionType': 'INCLUDE',
'NonKeyAttributes': [
'gender',
'friend_count',
]
},
'ProvisionedThroughput': {
'ReadCapacityUnits': 10,
'WriteCapacityUnits': 8
}
})
class ItemTestCase(unittest.TestCase):
if six.PY2:
assertCountEqual = unittest.TestCase.assertItemsEqual
def setUp(self):
super(ItemTestCase, self).setUp()
self.table = Table('whatever', connection=FakeDynamoDBConnection())
self.johndoe = self.create_item({
'username': 'johndoe',
'first_name': 'John',
'date_joined': 12345,
})
def create_item(self, data):
return Item(self.table, data=data)
def test_initialization(self):
empty_item = Item(self.table)
self.assertEqual(empty_item.table, self.table)
self.assertEqual(empty_item._data, {})
full_item = Item(self.table, data={
'username': 'johndoe',
'date_joined': 12345,
})
self.assertEqual(full_item.table, self.table)
self.assertEqual(full_item._data, {
'username': 'johndoe',
'date_joined': 12345,
})
# The next couple methods make use of ``sorted(...)`` so we get consistent
# ordering everywhere & no erroneous failures.
def test_keys(self):
self.assertCountEqual(self.johndoe.keys(), [
'date_joined',
'first_name',
'username',
])
def test_values(self):
self.assertCountEqual(self.johndoe.values(),
[12345, 'John', 'johndoe'])
def test_contains(self):
self.assertIn('username', self.johndoe)
self.assertIn('first_name', self.johndoe)
self.assertIn('date_joined', self.johndoe)
self.assertNotIn('whatever', self.johndoe)
def test_iter(self):
self.assertCountEqual(self.johndoe,
['johndoe', 'John', 12345])
def test_get(self):
self.assertEqual(self.johndoe.get('username'), 'johndoe')
self.assertEqual(self.johndoe.get('first_name'), 'John')
self.assertEqual(self.johndoe.get('date_joined'), 12345)
# Test a missing key. No default yields ``None``.
self.assertEqual(self.johndoe.get('last_name'), None)
# This time with a default.
self.assertEqual(self.johndoe.get('last_name', True), True)
def test_items(self):
self.assertCountEqual(
self.johndoe.items(),
[
('date_joined', 12345),
('first_name', 'John'),
('username', 'johndoe'),
])
def test_attribute_access(self):
self.assertEqual(self.johndoe['username'], 'johndoe')
self.assertEqual(self.johndoe['first_name'], 'John')
self.assertEqual(self.johndoe['date_joined'], 12345)
# Test a missing key.
self.assertEqual(self.johndoe['last_name'], None)
# Set a key.
self.johndoe['last_name'] = 'Doe'
# Test accessing the new key.
self.assertEqual(self.johndoe['last_name'], 'Doe')
# Delete a key.
del self.johndoe['last_name']
# Test the now-missing-again key.
self.assertEqual(self.johndoe['last_name'], None)
def test_needs_save(self):
self.johndoe.mark_clean()
self.assertFalse(self.johndoe.needs_save())
self.johndoe['last_name'] = 'Doe'
self.assertTrue(self.johndoe.needs_save())
def test_needs_save_set_changed(self):
# First, ensure we're clean.
self.johndoe.mark_clean()
self.assertFalse(self.johndoe.needs_save())
# Add a friends collection.
self.johndoe['friends'] = set(['jane', 'alice'])
self.assertTrue(self.johndoe.needs_save())
# Now mark it clean, then change the collection.
# This does NOT call ``__setitem__``, so the item used to be
# incorrectly appearing to be clean, when it had in fact been changed.
self.johndoe.mark_clean()
self.assertFalse(self.johndoe.needs_save())
self.johndoe['friends'].add('bob')
self.assertTrue(self.johndoe.needs_save())
def test_mark_clean(self):
self.johndoe['last_name'] = 'Doe'
self.assertTrue(self.johndoe.needs_save())
self.johndoe.mark_clean()
self.assertFalse(self.johndoe.needs_save())
def test_load(self):
empty_item = Item(self.table)
empty_item.load({
'Item': {
'username': {'S': 'johndoe'},
'first_name': {'S': 'John'},
'last_name': {'S': 'Doe'},
'date_joined': {'N': '1366056668'},
'friend_count': {'N': '3'},
'friends': {'SS': ['alice', 'bob', 'jane']},
}
})
self.assertEqual(empty_item['username'], 'johndoe')
self.assertEqual(empty_item['date_joined'], 1366056668)
self.assertEqual(sorted(empty_item['friends']), sorted([
'alice',
'bob',
'jane'
]))
def test_get_keys(self):
# Setup the data.
self.table.schema = [
HashKey('username'),
RangeKey('date_joined'),
]
self.assertEqual(self.johndoe.get_keys(), {
'username': 'johndoe',
'date_joined': 12345,
})
def test_get_raw_keys(self):
# Setup the data.
self.table.schema = [
HashKey('username'),
RangeKey('date_joined'),
]
self.assertEqual(self.johndoe.get_raw_keys(), {
'username': {'S': 'johndoe'},
'date_joined': {'N': '12345'},
})
def test_build_expects(self):
# Pristine.
self.assertEqual(self.johndoe.build_expects(), {
'first_name': {
'Exists': False,
},
'username': {
'Exists': False,
},
'date_joined': {
'Exists': False,
},
})
# Without modifications.
self.johndoe.mark_clean()
self.assertEqual(self.johndoe.build_expects(), {
'first_name': {
'Exists': True,
'Value': {
'S': 'John',
},
},
'username': {
'Exists': True,
'Value': {
'S': 'johndoe',
},
},
'date_joined': {
'Exists': True,
'Value': {
'N': '12345',
},
},
})
# Change some data.
self.johndoe['first_name'] = 'Johann'
# Add some data.
self.johndoe['last_name'] = 'Doe'
# Delete some data.
del self.johndoe['date_joined']
# All fields (default).
self.assertEqual(self.johndoe.build_expects(), {
'first_name': {
'Exists': True,
'Value': {
'S': 'John',
},
},
'last_name': {
'Exists': False,
},
'username': {
'Exists': True,
'Value': {
'S': 'johndoe',
},
},
'date_joined': {
'Exists': True,
'Value': {
'N': '12345',
},
},
})
# Only a subset of the fields.
self.assertEqual(self.johndoe.build_expects(fields=[
'first_name',
'last_name',
'date_joined',
]), {
'first_name': {
'Exists': True,
'Value': {
'S': 'John',
},
},
'last_name': {
'Exists': False,
},
'date_joined': {
'Exists': True,
'Value': {
'N': '12345',
},
},
})
def test_prepare_full(self):
self.assertEqual(self.johndoe.prepare_full(), {
'username': {'S': 'johndoe'},
'first_name': {'S': 'John'},
'date_joined': {'N': '12345'}
})
self.johndoe['friends'] = set(['jane', 'alice'])
data = self.johndoe.prepare_full()
self.assertEqual(data['username'], {'S': 'johndoe'})
self.assertEqual(data['first_name'], {'S': 'John'})
self.assertEqual(data['date_joined'], {'N': '12345'})
self.assertCountEqual(data['friends']['SS'],
['jane', 'alice'])
def test_prepare_full_empty_set(self):
self.johndoe['friends'] = set()
self.assertEqual(self.johndoe.prepare_full(), {
'username': {'S': 'johndoe'},
'first_name': {'S': 'John'},
'date_joined': {'N': '12345'}
})
def test_prepare_partial(self):
self.johndoe.mark_clean()
# Change some data.
self.johndoe['first_name'] = 'Johann'
# Add some data.
self.johndoe['last_name'] = 'Doe'
# Delete some data.
del self.johndoe['date_joined']
final_data, fields = self.johndoe.prepare_partial()
self.assertEqual(final_data, {
'date_joined': {
'Action': 'DELETE',
},
'first_name': {
'Action': 'PUT',
'Value': {'S': 'Johann'},
},
'last_name': {
'Action': 'PUT',
'Value': {'S': 'Doe'},
},
})
self.assertEqual(fields, set([
'first_name',
'last_name',
'date_joined'
]))
def test_prepare_partial_empty_set(self):
self.johndoe.mark_clean()
# Change some data.
self.johndoe['first_name'] = 'Johann'
# Add some data.
self.johndoe['last_name'] = 'Doe'
# Delete some data.
del self.johndoe['date_joined']
# Put an empty set on the ``Item``.
self.johndoe['friends'] = set()
final_data, fields = self.johndoe.prepare_partial()
self.assertEqual(final_data, {
'date_joined': {
'Action': 'DELETE',
},
'first_name': {
'Action': 'PUT',
'Value': {'S': 'Johann'},
},
'last_name': {
'Action': 'PUT',
'Value': {'S': 'Doe'},
},
})
self.assertEqual(fields, set([
'first_name',
'last_name',
'date_joined'
]))
def test_save_no_changes(self):
# Unchanged, no save.
with mock.patch.object(self.table, '_put_item', return_value=True) \
as mock_put_item:
# Pretend we loaded it via ``get_item``...
self.johndoe.mark_clean()
self.assertFalse(self.johndoe.save())
self.assertFalse(mock_put_item.called)
def test_save_with_changes(self):
# With changed data.
with mock.patch.object(self.table, '_put_item', return_value=True) \
as mock_put_item:
self.johndoe.mark_clean()
self.johndoe['first_name'] = 'J'
self.johndoe['new_attr'] = 'never_seen_before'
self.assertTrue(self.johndoe.save())
self.assertFalse(self.johndoe.needs_save())
self.assertTrue(mock_put_item.called)
mock_put_item.assert_called_once_with({
'username': {'S': 'johndoe'},
'first_name': {'S': 'J'},
'new_attr': {'S': 'never_seen_before'},
'date_joined': {'N': '12345'}
}, expects={
'username': {
'Value': {
'S': 'johndoe',
},
'Exists': True,
},
'first_name': {
'Value': {
'S': 'John',
},
'Exists': True,
},
'new_attr': {
'Exists': False,
},
'date_joined': {
'Value': {
'N': '12345',
},
'Exists': True,
},
})
def test_save_with_changes_overwrite(self):
# With changed data.
with mock.patch.object(self.table, '_put_item', return_value=True) \
as mock_put_item:
self.johndoe['first_name'] = 'J'
self.johndoe['new_attr'] = 'never_seen_before'
# OVERWRITE ALL THE THINGS
self.assertTrue(self.johndoe.save(overwrite=True))
self.assertFalse(self.johndoe.needs_save())
self.assertTrue(mock_put_item.called)
mock_put_item.assert_called_once_with({
'username': {'S': 'johndoe'},
'first_name': {'S': 'J'},
'new_attr': {'S': 'never_seen_before'},
'date_joined': {'N': '12345'}
}, expects=None)
def test_partial_no_changes(self):
# Unchanged, no save.
with mock.patch.object(self.table, '_update_item', return_value=True) \
as mock_update_item:
# Pretend we loaded it via ``get_item``...
self.johndoe.mark_clean()
self.assertFalse(self.johndoe.partial_save())
self.assertFalse(mock_update_item.called)
def test_partial_with_changes(self):
# Setup the data.
self.table.schema = [
HashKey('username'),
]
# With changed data.
with mock.patch.object(self.table, '_update_item', return_value=True) \
as mock_update_item:
# Pretend we loaded it via ``get_item``...
self.johndoe.mark_clean()
# Now... MODIFY!!!
self.johndoe['first_name'] = 'J'
self.johndoe['last_name'] = 'Doe'
del self.johndoe['date_joined']
self.assertTrue(self.johndoe.partial_save())
self.assertFalse(self.johndoe.needs_save())
self.assertTrue(mock_update_item.called)
mock_update_item.assert_called_once_with({
'username': 'johndoe',
}, {
'first_name': {
'Action': 'PUT',
'Value': {'S': 'J'},
},
'last_name': {
'Action': 'PUT',
'Value': {'S': 'Doe'},
},
'date_joined': {
'Action': 'DELETE',
}
}, expects={
'first_name': {
'Value': {
'S': 'John',
},
'Exists': True
},
'last_name': {
'Exists': False
},
'date_joined': {
'Value': {
'N': '12345',
},
'Exists': True
},
})
def test_delete(self):
# Setup the data.
self.table.schema = [
HashKey('username'),
RangeKey('date_joined'),
]
with mock.patch.object(self.table, 'delete_item', return_value=True) \
as mock_delete_item:
self.johndoe.delete()
self.assertTrue(mock_delete_item.called)
mock_delete_item.assert_called_once_with(
username='johndoe',
date_joined=12345
)
def test_nonzero(self):
self.assertTrue(self.johndoe)
self.assertFalse(self.create_item({}))
class ItemFromItemTestCase(ItemTestCase):
def setUp(self):
super(ItemFromItemTestCase, self).setUp()
self.johndoe = self.create_item(self.johndoe)
def fake_results(name, greeting='hello', exclusive_start_key=None, limit=None):
if exclusive_start_key is None:
exclusive_start_key = -1
if limit == 0:
raise Exception("Web Service Returns '400 Bad Request'")
end_cap = 13
results = []
start_key = exclusive_start_key + 1
for i in range(start_key, start_key + 5):
if i < end_cap:
results.append("%s %s #%s" % (greeting, name, i))
# Don't return more than limit results
if limit < len(results):
results = results[:limit]
retval = {
'results': results,
}
if exclusive_start_key + 5 < end_cap:
retval['last_key'] = exclusive_start_key + 5
return retval
class ResultSetTestCase(unittest.TestCase):
def setUp(self):
super(ResultSetTestCase, self).setUp()
self.results = ResultSet()
self.result_function = mock.MagicMock(side_effect=fake_results)
self.results.to_call(self.result_function, 'john', greeting='Hello', limit=20)
def test_first_key(self):
self.assertEqual(self.results.first_key, 'exclusive_start_key')
def test_max_page_size_fetch_more(self):
self.results = ResultSet(max_page_size=10)
self.results.to_call(self.result_function, 'john', greeting='Hello')
self.results.fetch_more()
self.result_function.assert_called_with('john', greeting='Hello', limit=10)
self.result_function.reset_mock()
def test_max_page_size_and_smaller_limit_fetch_more(self):
self.results = ResultSet(max_page_size=10)
self.results.to_call(self.result_function, 'john', greeting='Hello', limit=5)
self.results.fetch_more()
self.result_function.assert_called_with('john', greeting='Hello', limit=5)
self.result_function.reset_mock()
def test_max_page_size_and_bigger_limit_fetch_more(self):
self.results = ResultSet(max_page_size=10)
self.results.to_call(self.result_function, 'john', greeting='Hello', limit=15)
self.results.fetch_more()
self.result_function.assert_called_with('john', greeting='Hello', limit=10)
self.result_function.reset_mock()
def test_fetch_more(self):
# First "page".
self.results.fetch_more()
self.assertEqual(self.results._results, [
'Hello john #0',
'Hello john #1',
'Hello john #2',
'Hello john #3',
'Hello john #4',
])
self.result_function.assert_called_with('john', greeting='Hello', limit=20)
self.result_function.reset_mock()
# Fake in a last key.
self.results._last_key_seen = 4
# Second "page".
self.results.fetch_more()
self.assertEqual(self.results._results, [
'Hello john #5',
'Hello john #6',
'Hello john #7',
'Hello john #8',
'Hello john #9',
])
self.result_function.assert_called_with('john', greeting='Hello', limit=20, exclusive_start_key=4)
self.result_function.reset_mock()
# Fake in a last key.
self.results._last_key_seen = 9
# Last "page".
self.results.fetch_more()
self.assertEqual(self.results._results, [
'Hello john #10',
'Hello john #11',
'Hello john #12',
])
# Fake in a key outside the range.
self.results._last_key_seen = 15
# Empty "page". Nothing new gets added
self.results.fetch_more()
self.assertEqual(self.results._results, [])
# Make sure we won't check for results in the future.
self.assertFalse(self.results._results_left)
def test_iteration(self):
# First page.
self.assertEqual(next(self.results), 'Hello john #0')
self.assertEqual(next(self.results), 'Hello john #1')
self.assertEqual(next(self.results), 'Hello john #2')
self.assertEqual(next(self.results), 'Hello john #3')
self.assertEqual(next(self.results), 'Hello john #4')
self.assertEqual(self.results._limit, 15)
# Second page.
self.assertEqual(next(self.results), 'Hello john #5')
self.assertEqual(next(self.results), 'Hello john #6')
self.assertEqual(next(self.results), 'Hello john #7')
self.assertEqual(next(self.results), 'Hello john #8')
self.assertEqual(next(self.results), 'Hello john #9')
self.assertEqual(self.results._limit, 10)
# Third page.
self.assertEqual(next(self.results), 'Hello john #10')
self.assertEqual(next(self.results), 'Hello john #11')
self.assertEqual(next(self.results), 'Hello john #12')
self.assertRaises(StopIteration, self.results.next)
self.assertEqual(self.results._limit, 7)
def test_limit_smaller_than_first_page(self):
results = ResultSet()
results.to_call(fake_results, 'john', greeting='Hello', limit=2)
self.assertEqual(next(results), 'Hello john #0')
self.assertEqual(next(results), 'Hello john #1')
self.assertRaises(StopIteration, results.next)
def test_limit_equals_page(self):
results = ResultSet()
results.to_call(fake_results, 'john', greeting='Hello', limit=5)
# First page
self.assertEqual(next(results), 'Hello john #0')
self.assertEqual(next(results), 'Hello john #1')
self.assertEqual(next(results), 'Hello john #2')
self.assertEqual(next(results), 'Hello john #3')
self.assertEqual(next(results), 'Hello john #4')
self.assertRaises(StopIteration, results.next)
def test_limit_greater_than_page(self):
results = ResultSet()
results.to_call(fake_results, 'john', greeting='Hello', limit=6)
# First page
self.assertEqual(next(results), 'Hello john #0')
self.assertEqual(next(results), 'Hello john #1')
self.assertEqual(next(results), 'Hello john #2')
self.assertEqual(next(results), 'Hello john #3')
self.assertEqual(next(results), 'Hello john #4')
# Second page
self.assertEqual(next(results), 'Hello john #5')
self.assertRaises(StopIteration, results.next)
def test_iteration_noresults(self):
def none(limit=10):
return {
'results': [],
}
results = ResultSet()
results.to_call(none, limit=20)
self.assertRaises(StopIteration, results.next)
def test_iteration_sporadic_pages(self):
# Some pages have no/incomplete results but have a ``LastEvaluatedKey``
# (for instance, scans with filters), so we need to accommodate that.
def sporadic():
# A dict, because Python closures have read-only access to the
# reference itself.
count = {'value': -1}
def _wrapper(limit=10, exclusive_start_key=None):
count['value'] = count['value'] + 1
if count['value'] == 0:
# Full page.
return {
'results': [
'Result #0',
'Result #1',
'Result #2',
'Result #3',
],
'last_key': 'page-1'
}
elif count['value'] == 1:
# Empty page but continue.
return {
'results': [],
'last_key': 'page-2'
}
elif count['value'] == 2:
# Final page.
return {
'results': [
'Result #4',
'Result #5',
'Result #6',
],
}
return _wrapper
results = ResultSet()
results.to_call(sporadic(), limit=20)
# First page
self.assertEqual(next(results), 'Result #0')
self.assertEqual(next(results), 'Result #1')
self.assertEqual(next(results), 'Result #2')
self.assertEqual(next(results), 'Result #3')
# Second page (misses!)
# Moves on to the third page
self.assertEqual(next(results), 'Result #4')
self.assertEqual(next(results), 'Result #5')
self.assertEqual(next(results), 'Result #6')
self.assertRaises(StopIteration, results.next)
def test_list(self):
self.assertEqual(list(self.results), [
'Hello john #0',
'Hello john #1',
'Hello john #2',
'Hello john #3',
'Hello john #4',
'Hello john #5',
'Hello john #6',
'Hello john #7',
'Hello john #8',
'Hello john #9',
'Hello john #10',
'Hello john #11',
'Hello john #12'
])
def fake_batch_results(keys):
results = []
simulate_unprocessed = True
if len(keys) and keys[0] == 'johndoe':
simulate_unprocessed = False
for key in keys:
if simulate_unprocessed and key == 'johndoe':
continue
results.append("hello %s" % key)
retval = {
'results': results,
'last_key': None,
}
if simulate_unprocessed:
retval['unprocessed_keys'] = ['johndoe']
return retval
class BatchGetResultSetTestCase(unittest.TestCase):
def setUp(self):
super(BatchGetResultSetTestCase, self).setUp()
self.results = BatchGetResultSet(keys=[
'alice',
'bob',
'jane',
'johndoe',
])
self.results.to_call(fake_batch_results)
def test_fetch_more(self):
# First "page".
self.results.fetch_more()
self.assertEqual(self.results._results, [
'hello alice',
'hello bob',
'hello jane',
])
self.assertEqual(self.results._keys_left, ['johndoe'])
# Second "page".
self.results.fetch_more()
self.assertEqual(self.results._results, [
'hello johndoe',
])
# Empty "page". Nothing new gets added
self.results.fetch_more()
self.assertEqual(self.results._results, [])
# Make sure we won't check for results in the future.
self.assertFalse(self.results._results_left)
def test_fetch_more_empty(self):
self.results.to_call(lambda keys: {'results': [], 'last_key': None})
self.results.fetch_more()
self.assertEqual(self.results._results, [])
self.assertRaises(StopIteration, self.results.next)
def test_iteration(self):
# First page.
self.assertEqual(next(self.results), 'hello alice')
self.assertEqual(next(self.results), 'hello bob')
self.assertEqual(next(self.results), 'hello jane')
self.assertEqual(next(self.results), 'hello johndoe')
self.assertRaises(StopIteration, self.results.next)
class TableTestCase(unittest.TestCase):
def setUp(self):
super(TableTestCase, self).setUp()
self.users = Table('users', connection=FakeDynamoDBConnection())
self.default_connection = DynamoDBConnection(
aws_access_key_id='access_key',
aws_secret_access_key='secret_key'
)
def test__introspect_schema(self):
raw_schema_1 = [
{
"AttributeName": "username",
"KeyType": "HASH"
},
{
"AttributeName": "date_joined",
"KeyType": "RANGE"
}
]
raw_attributes_1 = [
{
'AttributeName': 'username',
'AttributeType': 'S'
},
{
'AttributeName': 'date_joined',
'AttributeType': 'S'
},
]
schema_1 = self.users._introspect_schema(raw_schema_1, raw_attributes_1)
self.assertEqual(len(schema_1), 2)
self.assertTrue(isinstance(schema_1[0], HashKey))
self.assertEqual(schema_1[0].name, 'username')
self.assertTrue(isinstance(schema_1[1], RangeKey))
self.assertEqual(schema_1[1].name, 'date_joined')
raw_schema_2 = [
{
"AttributeName": "username",
"KeyType": "BTREE"
},
]
raw_attributes_2 = [
{
'AttributeName': 'username',
'AttributeType': 'S'
},
]
self.assertRaises(
exceptions.UnknownSchemaFieldError,
self.users._introspect_schema,
raw_schema_2,
raw_attributes_2
)
# Test a complex schema & ensure the types come back correctly.
raw_schema_3 = [
{
"AttributeName": "user_id",
"KeyType": "HASH"
},
{
"AttributeName": "junk",
"KeyType": "RANGE"
}
]
raw_attributes_3 = [
{
'AttributeName': 'user_id',
'AttributeType': 'N'
},
{
'AttributeName': 'junk',
'AttributeType': 'B'
},
]
schema_3 = self.users._introspect_schema(raw_schema_3, raw_attributes_3)
self.assertEqual(len(schema_3), 2)
self.assertTrue(isinstance(schema_3[0], HashKey))
self.assertEqual(schema_3[0].name, 'user_id')
self.assertEqual(schema_3[0].data_type, NUMBER)
self.assertTrue(isinstance(schema_3[1], RangeKey))
self.assertEqual(schema_3[1].name, 'junk')
self.assertEqual(schema_3[1].data_type, BINARY)
def test__introspect_indexes(self):
raw_indexes_1 = [
{
"IndexName": "MostRecentlyJoinedIndex",
"KeySchema": [
{
"AttributeName": "username",
"KeyType": "HASH"
},
{
"AttributeName": "date_joined",
"KeyType": "RANGE"
}
],
"Projection": {
"ProjectionType": "KEYS_ONLY"
}
},
{
"IndexName": "EverybodyIndex",
"KeySchema": [
{
"AttributeName": "username",
"KeyType": "HASH"
},
],
"Projection": {
"ProjectionType": "ALL"
}
},
{
"IndexName": "GenderIndex",
"KeySchema": [
{
"AttributeName": "username",
"KeyType": "HASH"
},
{
"AttributeName": "date_joined",
"KeyType": "RANGE"
}
],
"Projection": {
"ProjectionType": "INCLUDE",
"NonKeyAttributes": [
'gender',
]
}
}
]
indexes_1 = self.users._introspect_indexes(raw_indexes_1)
self.assertEqual(len(indexes_1), 3)
self.assertTrue(isinstance(indexes_1[0], KeysOnlyIndex))
self.assertEqual(indexes_1[0].name, 'MostRecentlyJoinedIndex')
self.assertEqual(len(indexes_1[0].parts), 2)
self.assertTrue(isinstance(indexes_1[1], AllIndex))
self.assertEqual(indexes_1[1].name, 'EverybodyIndex')
self.assertEqual(len(indexes_1[1].parts), 1)
self.assertTrue(isinstance(indexes_1[2], IncludeIndex))
self.assertEqual(indexes_1[2].name, 'GenderIndex')
self.assertEqual(len(indexes_1[2].parts), 2)
self.assertEqual(indexes_1[2].includes_fields, ['gender'])
raw_indexes_2 = [
{
"IndexName": "MostRecentlyJoinedIndex",
"KeySchema": [
{
"AttributeName": "username",
"KeyType": "HASH"
},
{
"AttributeName": "date_joined",
"KeyType": "RANGE"
}
],
"Projection": {
"ProjectionType": "SOMETHING_CRAZY"
}
},
]
self.assertRaises(
exceptions.UnknownIndexFieldError,
self.users._introspect_indexes,
raw_indexes_2
)
def test_initialization(self):
users = Table('users', connection=self.default_connection)
self.assertEqual(users.table_name, 'users')
self.assertTrue(isinstance(users.connection, DynamoDBConnection))
self.assertEqual(users.throughput['read'], 5)
self.assertEqual(users.throughput['write'], 5)
self.assertEqual(users.schema, None)
self.assertEqual(users.indexes, None)
groups = Table('groups', connection=FakeDynamoDBConnection())
self.assertEqual(groups.table_name, 'groups')
self.assertTrue(hasattr(groups.connection, 'assert_called_once_with'))
def test_create_simple(self):
conn = FakeDynamoDBConnection()
with mock.patch.object(conn, 'create_table', return_value={}) \
as mock_create_table:
retval = Table.create('users', schema=[
HashKey('username'),
RangeKey('date_joined', data_type=NUMBER)
], connection=conn)
self.assertTrue(retval)
self.assertTrue(mock_create_table.called)
mock_create_table.assert_called_once_with(attribute_definitions=[
{
'AttributeName': 'username',
'AttributeType': 'S'
},
{
'AttributeName': 'date_joined',
'AttributeType': 'N'
}
],
table_name='users',
key_schema=[
{
'KeyType': 'HASH',
'AttributeName': 'username'
},
{
'KeyType': 'RANGE',
'AttributeName': 'date_joined'
}
],
provisioned_throughput={
'WriteCapacityUnits': 5,
'ReadCapacityUnits': 5
})
def test_create_full(self):
conn = FakeDynamoDBConnection()
with mock.patch.object(conn, 'create_table', return_value={}) \
as mock_create_table:
retval = Table.create('users', schema=[
HashKey('username'),
RangeKey('date_joined', data_type=NUMBER)
], throughput={
'read':20,
'write': 10,
}, indexes=[
KeysOnlyIndex('FriendCountIndex', parts=[
RangeKey('friend_count')
]),
], global_indexes=[
GlobalKeysOnlyIndex('FullFriendCountIndex', parts=[
RangeKey('friend_count')
], throughput={
'read': 10,
'write': 8,
}),
], connection=conn)
self.assertTrue(retval)
self.assertTrue(mock_create_table.called)
mock_create_table.assert_called_once_with(attribute_definitions=[
{
'AttributeName': 'username',
'AttributeType': 'S'
},
{
'AttributeName': 'date_joined',
'AttributeType': 'N'
},
{
'AttributeName': 'friend_count',
'AttributeType': 'S'
}
],
key_schema=[
{
'KeyType': 'HASH',
'AttributeName': 'username'
},
{
'KeyType': 'RANGE',
'AttributeName': 'date_joined'
}
],
table_name='users',
provisioned_throughput={
'WriteCapacityUnits': 10,
'ReadCapacityUnits': 20
},
global_secondary_indexes=[
{
'KeySchema': [
{
'KeyType': 'RANGE',
'AttributeName': 'friend_count'
}
],
'IndexName': 'FullFriendCountIndex',
'Projection': {
'ProjectionType': 'KEYS_ONLY'
},
'ProvisionedThroughput': {
'WriteCapacityUnits': 8,
'ReadCapacityUnits': 10
}
}
],
local_secondary_indexes=[
{
'KeySchema': [
{
'KeyType': 'RANGE',
'AttributeName': 'friend_count'
}
],
'IndexName': 'FriendCountIndex',
'Projection': {
'ProjectionType': 'KEYS_ONLY'
}
}
])
def test_describe(self):
expected = {
"Table": {
"AttributeDefinitions": [
{
"AttributeName": "username",
"AttributeType": "S"
}
],
"ItemCount": 5,
"KeySchema": [
{
"AttributeName": "username",
"KeyType": "HASH"
}
],
"LocalSecondaryIndexes": [
{
"IndexName": "UsernameIndex",
"KeySchema": [
{
"AttributeName": "username",
"KeyType": "HASH"
}
],
"Projection": {
"ProjectionType": "KEYS_ONLY"
}
}
],
"ProvisionedThroughput": {
"ReadCapacityUnits": 20,
"WriteCapacityUnits": 6
},
"TableName": "Thread",
"TableStatus": "ACTIVE"
}
}
with mock.patch.object(
self.users.connection,
'describe_table',
return_value=expected) as mock_describe:
self.assertEqual(self.users.throughput['read'], 5)
self.assertEqual(self.users.throughput['write'], 5)
self.assertEqual(self.users.schema, None)
self.assertEqual(self.users.indexes, None)
self.users.describe()
self.assertEqual(self.users.throughput['read'], 20)
self.assertEqual(self.users.throughput['write'], 6)
self.assertEqual(len(self.users.schema), 1)
self.assertEqual(isinstance(self.users.schema[0], HashKey), 1)
self.assertEqual(len(self.users.indexes), 1)
mock_describe.assert_called_once_with('users')
def test_update(self):
with mock.patch.object(
self.users.connection,
'update_table',
return_value={}) as mock_update:
self.assertEqual(self.users.throughput['read'], 5)
self.assertEqual(self.users.throughput['write'], 5)
self.users.update(throughput={
'read': 7,
'write': 2,
})
self.assertEqual(self.users.throughput['read'], 7)
self.assertEqual(self.users.throughput['write'], 2)
mock_update.assert_called_once_with(
'users',
global_secondary_index_updates=None,
provisioned_throughput={
'WriteCapacityUnits': 2,
'ReadCapacityUnits': 7
}
)
with mock.patch.object(
self.users.connection,
'update_table',
return_value={}) as mock_update:
self.assertEqual(self.users.throughput['read'], 7)
self.assertEqual(self.users.throughput['write'], 2)
self.users.update(throughput={
'read': 9,
'write': 5,
},
global_indexes={
'WhateverIndex': {
'read': 6,
'write': 1
},
'AnotherIndex': {
'read': 1,
'write': 2
}
})
self.assertEqual(self.users.throughput['read'], 9)
self.assertEqual(self.users.throughput['write'], 5)
args, kwargs = mock_update.call_args
self.assertEqual(args, ('users',))
self.assertEqual(kwargs['provisioned_throughput'], {
'WriteCapacityUnits': 5,
'ReadCapacityUnits': 9,
})
update = kwargs['global_secondary_index_updates'][:]
update.sort(key=lambda x: x['Update']['IndexName'])
self.assertDictEqual(
update[0],
{
'Update': {
'IndexName': 'AnotherIndex',
'ProvisionedThroughput': {
'WriteCapacityUnits': 2,
'ReadCapacityUnits': 1
}
}
})
self.assertDictEqual(
update[1],
{
'Update': {
'IndexName': 'WhateverIndex',
'ProvisionedThroughput': {
'WriteCapacityUnits': 1,
'ReadCapacityUnits': 6
}
}
})
def test_create_global_secondary_index(self):
with mock.patch.object(
self.users.connection,
'update_table',
return_value={}) as mock_update:
self.users.create_global_secondary_index(
global_index=GlobalAllIndex(
'JustCreatedIndex',
parts=[
HashKey('requiredHashKey')
],
throughput={
'read': 2,
'write': 2
}
)
)
mock_update.assert_called_once_with(
'users',
global_secondary_index_updates=[
{
'Create': {
'IndexName': 'JustCreatedIndex',
'KeySchema': [
{
'KeyType': 'HASH',
'AttributeName': 'requiredHashKey'
}
],
'Projection': {
'ProjectionType': 'ALL'
},
'ProvisionedThroughput': {
'WriteCapacityUnits': 2,
'ReadCapacityUnits': 2
}
}
}
],
attribute_definitions=[
{
'AttributeName': 'requiredHashKey',
'AttributeType': 'S'
}
]
)
def test_delete_global_secondary_index(self):
with mock.patch.object(
self.users.connection,
'update_table',
return_value={}) as mock_update:
self.users.delete_global_secondary_index('RandomGSIIndex')
mock_update.assert_called_once_with(
'users',
global_secondary_index_updates=[
{
'Delete': {
'IndexName': 'RandomGSIIndex',
}
}
]
)
def test_update_global_secondary_index(self):
# Updating a single global secondary index
with mock.patch.object(
self.users.connection,
'update_table',
return_value={}) as mock_update:
self.users.update_global_secondary_index(global_indexes={
'A_IndexToBeUpdated': {
'read': 5,
'write': 5
}
})
mock_update.assert_called_once_with(
'users',
global_secondary_index_updates=[
{
'Update': {
'IndexName': 'A_IndexToBeUpdated',
"ProvisionedThroughput": {
"ReadCapacityUnits": 5,
"WriteCapacityUnits": 5
},
}
}
]
)
# Updating multiple global secondary indexes
with mock.patch.object(
self.users.connection,
'update_table',
return_value={}) as mock_update:
self.users.update_global_secondary_index(global_indexes={
'A_IndexToBeUpdated': {
'read': 5,
'write': 5
},
'B_IndexToBeUpdated': {
'read': 9,
'write': 9
}
})
args, kwargs = mock_update.call_args
self.assertEqual(args, ('users',))
update = kwargs['global_secondary_index_updates'][:]
update.sort(key=lambda x: x['Update']['IndexName'])
self.assertDictEqual(
update[0],
{
'Update': {
'IndexName': 'A_IndexToBeUpdated',
'ProvisionedThroughput': {
'WriteCapacityUnits': 5,
'ReadCapacityUnits': 5
}
}
})
self.assertDictEqual(
update[1],
{
'Update': {
'IndexName': 'B_IndexToBeUpdated',
'ProvisionedThroughput': {
'WriteCapacityUnits': 9,
'ReadCapacityUnits': 9
}
}
})
def test_delete(self):
with mock.patch.object(
self.users.connection,
'delete_table',
return_value={}) as mock_delete:
self.assertTrue(self.users.delete())
mock_delete.assert_called_once_with('users')
def test_get_item(self):
expected = {
'Item': {
'username': {'S': 'johndoe'},
'first_name': {'S': 'John'},
'last_name': {'S': 'Doe'},
'date_joined': {'N': '1366056668'},
'friend_count': {'N': '3'},
'friends': {'SS': ['alice', 'bob', 'jane']},
}
}
with mock.patch.object(
self.users.connection,
'get_item',
return_value=expected) as mock_get_item:
item = self.users.get_item(username='johndoe')
self.assertEqual(item['username'], 'johndoe')
self.assertEqual(item['first_name'], 'John')
mock_get_item.assert_called_once_with('users', {
'username': {'S': 'johndoe'}
}, consistent_read=False, attributes_to_get=None)
with mock.patch.object(
self.users.connection,
'get_item',
return_value=expected) as mock_get_item:
item = self.users.get_item(username='johndoe', attributes=[
'username',
'first_name',
])
mock_get_item.assert_called_once_with('users', {
'username': {'S': 'johndoe'}
}, consistent_read=False, attributes_to_get=['username', 'first_name'])
def test_has_item(self):
expected = {
'Item': {
'username': {'S': 'johndoe'},
'first_name': {'S': 'John'},
'last_name': {'S': 'Doe'},
'date_joined': {'N': '1366056668'},
'friend_count': {'N': '3'},
'friends': {'SS': ['alice', 'bob', 'jane']},
}
}
with mock.patch.object(
self.users.connection,
'get_item',
return_value=expected) as mock_get_item:
found = self.users.has_item(username='johndoe')
self.assertTrue(found)
with mock.patch.object(
self.users.connection,
'get_item') as mock_get_item:
mock_get_item.side_effect = JSONResponseError("Nope.", None, None)
found = self.users.has_item(username='mrsmith')
self.assertFalse(found)
def test_lookup_hash(self):
"""Tests the "lookup" function with just a hash key"""
expected = {
'Item': {
'username': {'S': 'johndoe'},
'first_name': {'S': 'John'},
'last_name': {'S': 'Doe'},
'date_joined': {'N': '1366056668'},
'friend_count': {'N': '3'},
'friends': {'SS': ['alice', 'bob', 'jane']},
}
}
# Set the Schema
self.users.schema = [
HashKey('username'),
RangeKey('date_joined', data_type=NUMBER),
]
with mock.patch.object(
self.users,
'get_item',
return_value=expected) as mock_get_item:
self.users.lookup('johndoe')
mock_get_item.assert_called_once_with(
username= 'johndoe')
def test_lookup_hash_and_range(self):
"""Test the "lookup" function with a hash and range key"""
expected = {
'Item': {
'username': {'S': 'johndoe'},
'first_name': {'S': 'John'},
'last_name': {'S': 'Doe'},
'date_joined': {'N': '1366056668'},
'friend_count': {'N': '3'},
'friends': {'SS': ['alice', 'bob', 'jane']},
}
}
# Set the Schema
self.users.schema = [
HashKey('username'),
RangeKey('date_joined', data_type=NUMBER),
]
with mock.patch.object(
self.users,
'get_item',
return_value=expected) as mock_get_item:
self.users.lookup('johndoe', 1366056668)
mock_get_item.assert_called_once_with(
username= 'johndoe',
date_joined= 1366056668)
def test_put_item(self):
with mock.patch.object(
self.users.connection,
'put_item',
return_value={}) as mock_put_item:
self.users.put_item(data={
'username': 'johndoe',
'last_name': 'Doe',
'date_joined': 12345,
})
mock_put_item.assert_called_once_with('users', {
'username': {'S': 'johndoe'},
'last_name': {'S': 'Doe'},
'date_joined': {'N': '12345'}
}, expected={
'username': {
'Exists': False,
},
'last_name': {
'Exists': False,
},
'date_joined': {
'Exists': False,
}
})
def test_private_put_item(self):
with mock.patch.object(
self.users.connection,
'put_item',
return_value={}) as mock_put_item:
self.users._put_item({'some': 'data'})
mock_put_item.assert_called_once_with('users', {'some': 'data'})
def test_private_update_item(self):
with mock.patch.object(
self.users.connection,
'update_item',
return_value={}) as mock_update_item:
self.users._update_item({
'username': 'johndoe'
}, {
'some': 'data',
})
mock_update_item.assert_called_once_with('users', {
'username': {'S': 'johndoe'},
}, {
'some': 'data',
})
def test_delete_item(self):
with mock.patch.object(
self.users.connection,
'delete_item',
return_value={}) as mock_delete_item:
self.assertTrue(self.users.delete_item(username='johndoe', date_joined=23456))
mock_delete_item.assert_called_once_with('users', {
'username': {
'S': 'johndoe'
},
'date_joined': {
'N': '23456'
}
}, expected=None, conditional_operator=None)
def test_delete_item_conditionally(self):
with mock.patch.object(
self.users.connection,
'delete_item',
return_value={}) as mock_delete_item:
self.assertTrue(self.users.delete_item(expected={'balance__eq': 0},
username='johndoe', date_joined=23456))
mock_delete_item.assert_called_once_with('users', {
'username': {
'S': 'johndoe'
},
'date_joined': {
'N': '23456'
}
},
expected={
'balance': {
'ComparisonOperator': 'EQ', 'AttributeValueList': [{'N': '0'}]
},
},
conditional_operator=None)
def side_effect(*args, **kwargs):
raise exceptions.ConditionalCheckFailedException(400, '', {})
with mock.patch.object(
self.users.connection,
'delete_item',
side_effect=side_effect) as mock_delete_item:
self.assertFalse(self.users.delete_item(expected={'balance__eq': 0},
username='johndoe', date_joined=23456))
def test_get_key_fields_no_schema_populated(self):
expected = {
"Table": {
"AttributeDefinitions": [
{
"AttributeName": "username",
"AttributeType": "S"
},
{
"AttributeName": "date_joined",
"AttributeType": "N"
}
],
"ItemCount": 5,
"KeySchema": [
{
"AttributeName": "username",
"KeyType": "HASH"
},
{
"AttributeName": "date_joined",
"KeyType": "RANGE"
}
],
"LocalSecondaryIndexes": [
{
"IndexName": "UsernameIndex",
"KeySchema": [
{
"AttributeName": "username",
"KeyType": "HASH"
}
],
"Projection": {
"ProjectionType": "KEYS_ONLY"
}
}
],
"ProvisionedThroughput": {
"ReadCapacityUnits": 20,
"WriteCapacityUnits": 6
},
"TableName": "Thread",
"TableStatus": "ACTIVE"
}
}
with mock.patch.object(
self.users.connection,
'describe_table',
return_value=expected) as mock_describe:
self.assertEqual(self.users.schema, None)
key_fields = self.users.get_key_fields()
self.assertEqual(key_fields, ['username', 'date_joined'])
self.assertEqual(len(self.users.schema), 2)
mock_describe.assert_called_once_with('users')
def test_batch_write_no_writes(self):
with mock.patch.object(
self.users.connection,
'batch_write_item',
return_value={}) as mock_batch:
with self.users.batch_write() as batch:
pass
self.assertFalse(mock_batch.called)
def test_batch_write(self):
with mock.patch.object(
self.users.connection,
'batch_write_item',
return_value={}) as mock_batch:
with self.users.batch_write() as batch:
batch.put_item(data={
'username': 'jane',
'date_joined': 12342547
})
batch.delete_item(username='johndoe')
batch.put_item(data={
'username': 'alice',
'date_joined': 12342888
})
mock_batch.assert_called_once_with({
'users': [
{
'PutRequest': {
'Item': {
'username': {'S': 'jane'},
'date_joined': {'N': '12342547'}
}
}
},
{
'PutRequest': {
'Item': {
'username': {'S': 'alice'},
'date_joined': {'N': '12342888'}
}
}
},
{
'DeleteRequest': {
'Key': {
'username': {'S': 'johndoe'},
}
}
},
]
})
def test_batch_write_dont_swallow_exceptions(self):
with mock.patch.object(
self.users.connection,
'batch_write_item',
return_value={}) as mock_batch:
try:
with self.users.batch_write() as batch:
raise Exception('OH NOES')
except Exception as e:
self.assertEqual(str(e), 'OH NOES')
self.assertFalse(mock_batch.called)
def test_batch_write_flushing(self):
with mock.patch.object(
self.users.connection,
'batch_write_item',
return_value={}) as mock_batch:
with self.users.batch_write() as batch:
batch.put_item(data={
'username': 'jane',
'date_joined': 12342547
})
# This would only be enough for one batch.
batch.delete_item(username='johndoe1')
batch.delete_item(username='johndoe2')
batch.delete_item(username='johndoe3')
batch.delete_item(username='johndoe4')
batch.delete_item(username='johndoe5')
batch.delete_item(username='johndoe6')
batch.delete_item(username='johndoe7')
batch.delete_item(username='johndoe8')
batch.delete_item(username='johndoe9')
batch.delete_item(username='johndoe10')
batch.delete_item(username='johndoe11')
batch.delete_item(username='johndoe12')
batch.delete_item(username='johndoe13')
batch.delete_item(username='johndoe14')
batch.delete_item(username='johndoe15')
batch.delete_item(username='johndoe16')
batch.delete_item(username='johndoe17')
batch.delete_item(username='johndoe18')
batch.delete_item(username='johndoe19')
batch.delete_item(username='johndoe20')
batch.delete_item(username='johndoe21')
batch.delete_item(username='johndoe22')
batch.delete_item(username='johndoe23')
# We're only at 24 items. No flushing yet.
self.assertEqual(mock_batch.call_count, 0)
# This pushes it over the edge. A flush happens then we start
# queuing objects again.
batch.delete_item(username='johndoe24')
self.assertEqual(mock_batch.call_count, 1)
# Since we add another, there's enough for a second call to
# flush.
batch.delete_item(username='johndoe25')
self.assertEqual(mock_batch.call_count, 2)
def test_batch_write_unprocessed_items(self):
unprocessed = {
'UnprocessedItems': {
'users': [
{
'PutRequest': {
'username': {
'S': 'jane',
},
'date_joined': {
'N': 12342547
}
},
},
],
},
}
# Test enqueuing the unprocessed bits.
with mock.patch.object(
self.users.connection,
'batch_write_item',
return_value=unprocessed) as mock_batch:
with self.users.batch_write() as batch:
self.assertEqual(len(batch._unprocessed), 0)
# Trash the ``resend_unprocessed`` method so that we don't
# infinite loop forever here.
batch.resend_unprocessed = lambda: True
batch.put_item(data={
'username': 'jane',
'date_joined': 12342547
})
batch.delete_item(username='johndoe')
batch.put_item(data={
'username': 'alice',
'date_joined': 12342888
})
self.assertEqual(len(batch._unprocessed), 1)
# Now test resending those unprocessed items.
with mock.patch.object(
self.users.connection,
'batch_write_item',
return_value={}) as mock_batch:
with self.users.batch_write() as batch:
self.assertEqual(len(batch._unprocessed), 0)
# Toss in faked unprocessed items, as though a previous batch
# had failed.
batch._unprocessed = [
{
'PutRequest': {
'username': {
'S': 'jane',
},
'date_joined': {
'N': 12342547
}
},
},
]
batch.put_item(data={
'username': 'jane',
'date_joined': 12342547
})
batch.delete_item(username='johndoe')
batch.put_item(data={
'username': 'alice',
'date_joined': 12342888
})
# Flush, to make sure everything has been processed.
# Unprocessed items should still be hanging around.
batch.flush()
self.assertEqual(len(batch._unprocessed), 1)
# Post-exit, this should be emptied.
self.assertEqual(len(batch._unprocessed), 0)
def test__build_filters(self):
filters = self.users._build_filters({
'username__eq': 'johndoe',
'date_joined__gte': 1234567,
'age__in': [30, 31, 32, 33],
'last_name__between': ['danzig', 'only'],
'first_name__null': False,
'gender__null': True,
}, using=FILTER_OPERATORS)
self.assertEqual(filters, {
'username': {
'AttributeValueList': [
{
'S': 'johndoe',
},
],
'ComparisonOperator': 'EQ',
},
'date_joined': {
'AttributeValueList': [
{
'N': '1234567',
},
],
'ComparisonOperator': 'GE',
},
'age': {
'AttributeValueList': [
{'N': '30'},
{'N': '31'},
{'N': '32'},
{'N': '33'},
],
'ComparisonOperator': 'IN',
},
'last_name': {
'AttributeValueList': [{'S': 'danzig'}, {'S': 'only'}],
'ComparisonOperator': 'BETWEEN',
},
'first_name': {
'ComparisonOperator': 'NOT_NULL'
},
'gender': {
'ComparisonOperator': 'NULL'
},
})
self.assertRaises(exceptions.UnknownFilterTypeError,
self.users._build_filters,
{
'darling__die': True,
}
)
q_filters = self.users._build_filters({
'username__eq': 'johndoe',
'date_joined__gte': 1234567,
'last_name__between': ['danzig', 'only'],
'gender__beginswith': 'm',
}, using=QUERY_OPERATORS)
self.assertEqual(q_filters, {
'username': {
'AttributeValueList': [
{
'S': 'johndoe',
},
],
'ComparisonOperator': 'EQ',
},
'date_joined': {
'AttributeValueList': [
{
'N': '1234567',
},
],
'ComparisonOperator': 'GE',
},
'last_name': {
'AttributeValueList': [{'S': 'danzig'}, {'S': 'only'}],
'ComparisonOperator': 'BETWEEN',
},
'gender': {
'AttributeValueList': [{'S': 'm'}],
'ComparisonOperator': 'BEGINS_WITH',
},
})
self.assertRaises(exceptions.UnknownFilterTypeError,
self.users._build_filters,
{
'darling__die': True,
},
using=QUERY_OPERATORS
)
self.assertRaises(exceptions.UnknownFilterTypeError,
self.users._build_filters,
{
'first_name__null': True,
},
using=QUERY_OPERATORS
)
def test_private_query(self):
expected = {
"ConsumedCapacity": {
"CapacityUnits": 0.5,
"TableName": "users"
},
"Count": 4,
"Items": [
{
'username': {'S': 'johndoe'},
'first_name': {'S': 'John'},
'last_name': {'S': 'Doe'},
'date_joined': {'N': '1366056668'},
'friend_count': {'N': '3'},
'friends': {'SS': ['alice', 'bob', 'jane']},
},
{
'username': {'S': 'jane'},
'first_name': {'S': 'Jane'},
'last_name': {'S': 'Doe'},
'date_joined': {'N': '1366057777'},
'friend_count': {'N': '2'},
'friends': {'SS': ['alice', 'johndoe']},
},
{
'username': {'S': 'alice'},
'first_name': {'S': 'Alice'},
'last_name': {'S': 'Expert'},
'date_joined': {'N': '1366056680'},
'friend_count': {'N': '1'},
'friends': {'SS': ['jane']},
},
{
'username': {'S': 'bob'},
'first_name': {'S': 'Bob'},
'last_name': {'S': 'Smith'},
'date_joined': {'N': '1366056888'},
'friend_count': {'N': '1'},
'friends': {'SS': ['johndoe']},
},
],
"ScannedCount": 4
}
with mock.patch.object(
self.users.connection,
'query',
return_value=expected) as mock_query:
results = self.users._query(
limit=4,
reverse=True,
username__between=['aaa', 'mmm']
)
usernames = [res['username'] for res in results['results']]
self.assertEqual(usernames, ['johndoe', 'jane', 'alice', 'bob'])
self.assertEqual(len(results['results']), 4)
self.assertEqual(results['last_key'], None)
mock_query.assert_called_once_with('users',
consistent_read=False,
scan_index_forward=False,
index_name=None,
attributes_to_get=None,
limit=4,
key_conditions={
'username': {
'AttributeValueList': [{'S': 'aaa'}, {'S': 'mmm'}],
'ComparisonOperator': 'BETWEEN',
}
},
select=None,
query_filter=None,
conditional_operator=None
)
# Now alter the expected.
expected['LastEvaluatedKey'] = {
'username': {
'S': 'johndoe',
},
}
with mock.patch.object(
self.users.connection,
'query',
return_value=expected) as mock_query_2:
results = self.users._query(
limit=4,
reverse=True,
username__between=['aaa', 'mmm'],
exclusive_start_key={
'username': 'adam',
},
consistent=True,
query_filter=None,
conditional_operator='AND'
)
usernames = [res['username'] for res in results['results']]
self.assertEqual(usernames, ['johndoe', 'jane', 'alice', 'bob'])
self.assertEqual(len(results['results']), 4)
self.assertEqual(results['last_key'], {'username': 'johndoe'})
mock_query_2.assert_called_once_with('users',
key_conditions={
'username': {
'AttributeValueList': [{'S': 'aaa'}, {'S': 'mmm'}],
'ComparisonOperator': 'BETWEEN',
}
},
index_name=None,
attributes_to_get=None,
scan_index_forward=False,
limit=4,
exclusive_start_key={
'username': {
'S': 'adam',
},
},
consistent_read=True,
select=None,
query_filter=None,
conditional_operator='AND'
)
def test_private_scan(self):
expected = {
"ConsumedCapacity": {
"CapacityUnits": 0.5,
"TableName": "users"
},
"Count": 4,
"Items": [
{
'username': {'S': 'alice'},
'first_name': {'S': 'Alice'},
'last_name': {'S': 'Expert'},
'date_joined': {'N': '1366056680'},
'friend_count': {'N': '1'},
'friends': {'SS': ['jane']},
},
{
'username': {'S': 'bob'},
'first_name': {'S': 'Bob'},
'last_name': {'S': 'Smith'},
'date_joined': {'N': '1366056888'},
'friend_count': {'N': '1'},
'friends': {'SS': ['johndoe']},
},
{
'username': {'S': 'jane'},
'first_name': {'S': 'Jane'},
'last_name': {'S': 'Doe'},
'date_joined': {'N': '1366057777'},
'friend_count': {'N': '2'},
'friends': {'SS': ['alice', 'johndoe']},
},
],
"ScannedCount": 4
}
with mock.patch.object(
self.users.connection,
'scan',
return_value=expected) as mock_scan:
results = self.users._scan(
limit=2,
friend_count__lte=2
)
usernames = [res['username'] for res in results['results']]
self.assertEqual(usernames, ['alice', 'bob', 'jane'])
self.assertEqual(len(results['results']), 3)
self.assertEqual(results['last_key'], None)
mock_scan.assert_called_once_with('users',
scan_filter={
'friend_count': {
'AttributeValueList': [{'N': '2'}],
'ComparisonOperator': 'LE',
}
},
limit=2,
segment=None,
attributes_to_get=None,
total_segments=None,
conditional_operator=None
)
# Now alter the expected.
expected['LastEvaluatedKey'] = {
'username': {
'S': 'jane',
},
}
with mock.patch.object(
self.users.connection,
'scan',
return_value=expected) as mock_scan_2:
results = self.users._scan(
limit=3,
friend_count__lte=2,
exclusive_start_key={
'username': 'adam',
},
segment=None,
total_segments=None
)
usernames = [res['username'] for res in results['results']]
self.assertEqual(usernames, ['alice', 'bob', 'jane'])
self.assertEqual(len(results['results']), 3)
self.assertEqual(results['last_key'], {'username': 'jane'})
mock_scan_2.assert_called_once_with('users',
scan_filter={
'friend_count': {
'AttributeValueList': [{'N': '2'}],
'ComparisonOperator': 'LE',
}
},
limit=3,
exclusive_start_key={
'username': {
'S': 'adam',
},
},
segment=None,
attributes_to_get=None,
total_segments=None,
conditional_operator=None
)
def test_query(self):
items_1 = {
'results': [
Item(self.users, data={
'username': 'johndoe',
'first_name': 'John',
'last_name': 'Doe',
}),
Item(self.users, data={
'username': 'jane',
'first_name': 'Jane',
'last_name': 'Doe',
}),
],
'last_key': 'jane',
}
results = self.users.query_2(last_name__eq='Doe')
self.assertTrue(isinstance(results, ResultSet))
self.assertEqual(len(results._results), 0)
self.assertEqual(results.the_callable, self.users._query)
with mock.patch.object(
results,
'the_callable',
return_value=items_1) as mock_query:
res_1 = next(results)
# Now it should be populated.
self.assertEqual(len(results._results), 2)
self.assertEqual(res_1['username'], 'johndoe')
res_2 = next(results)
self.assertEqual(res_2['username'], 'jane')
self.assertEqual(mock_query.call_count, 1)
items_2 = {
'results': [
Item(self.users, data={
'username': 'foodoe',
'first_name': 'Foo',
'last_name': 'Doe',
}),
],
}
with mock.patch.object(
results,
'the_callable',
return_value=items_2) as mock_query_2:
res_3 = next(results)
# New results should have been found.
self.assertEqual(len(results._results), 1)
self.assertEqual(res_3['username'], 'foodoe')
self.assertRaises(StopIteration, results.next)
self.assertEqual(mock_query_2.call_count, 1)
def test_query_with_specific_attributes(self):
items_1 = {
'results': [
Item(self.users, data={
'username': 'johndoe',
}),
Item(self.users, data={
'username': 'jane',
}),
],
'last_key': 'jane',
}
results = self.users.query_2(last_name__eq='Doe',
attributes=['username'])
self.assertTrue(isinstance(results, ResultSet))
self.assertEqual(len(results._results), 0)
self.assertEqual(results.the_callable, self.users._query)
with mock.patch.object(
results,
'the_callable',
return_value=items_1) as mock_query:
res_1 = next(results)
# Now it should be populated.
self.assertEqual(len(results._results), 2)
self.assertEqual(res_1['username'], 'johndoe')
self.assertEqual(list(res_1.keys()), ['username'])
res_2 = next(results)
self.assertEqual(res_2['username'], 'jane')
self.assertEqual(mock_query.call_count, 1)
def test_scan(self):
items_1 = {
'results': [
Item(self.users, data={
'username': 'johndoe',
'first_name': 'John',
'last_name': 'Doe',
}),
Item(self.users, data={
'username': 'jane',
'first_name': 'Jane',
'last_name': 'Doe',
}),
],
'last_key': 'jane',
}
results = self.users.scan(last_name__eq='Doe')
self.assertTrue(isinstance(results, ResultSet))
self.assertEqual(len(results._results), 0)
self.assertEqual(results.the_callable, self.users._scan)
with mock.patch.object(
results,
'the_callable',
return_value=items_1) as mock_scan:
res_1 = next(results)
# Now it should be populated.
self.assertEqual(len(results._results), 2)
self.assertEqual(res_1['username'], 'johndoe')
res_2 = next(results)
self.assertEqual(res_2['username'], 'jane')
self.assertEqual(mock_scan.call_count, 1)
items_2 = {
'results': [
Item(self.users, data={
'username': 'zoeydoe',
'first_name': 'Zoey',
'last_name': 'Doe',
}),
],
}
with mock.patch.object(
results,
'the_callable',
return_value=items_2) as mock_scan_2:
res_3 = next(results)
# New results should have been found.
self.assertEqual(len(results._results), 1)
self.assertEqual(res_3['username'], 'zoeydoe')
self.assertRaises(StopIteration, results.next)
self.assertEqual(mock_scan_2.call_count, 1)
def test_scan_with_specific_attributes(self):
items_1 = {
'results': [
Item(self.users, data={
'username': 'johndoe',
}),
Item(self.users, data={
'username': 'jane',
}),
],
'last_key': 'jane',
}
results = self.users.scan(attributes=['username'])
self.assertTrue(isinstance(results, ResultSet))
self.assertEqual(len(results._results), 0)
self.assertEqual(results.the_callable, self.users._scan)
with mock.patch.object(
results,
'the_callable',
return_value=items_1) as mock_query:
res_1 = next(results)
# Now it should be populated.
self.assertEqual(len(results._results), 2)
self.assertEqual(res_1['username'], 'johndoe')
self.assertEqual(list(res_1.keys()), ['username'])
res_2 = next(results)
self.assertEqual(res_2['username'], 'jane')
self.assertEqual(mock_query.call_count, 1)
def test_count(self):
expected = {
"Table": {
"AttributeDefinitions": [
{
"AttributeName": "username",
"AttributeType": "S"
}
],
"ItemCount": 5,
"KeySchema": [
{
"AttributeName": "username",
"KeyType": "HASH"
}
],
"LocalSecondaryIndexes": [
{
"IndexName": "UsernameIndex",
"KeySchema": [
{
"AttributeName": "username",
"KeyType": "HASH"
}
],
"Projection": {
"ProjectionType": "KEYS_ONLY"
}
}
],
"ProvisionedThroughput": {
"ReadCapacityUnits": 20,
"WriteCapacityUnits": 6
},
"TableName": "Thread",
"TableStatus": "ACTIVE"
}
}
with mock.patch.object(
self.users,
'describe',
return_value=expected) as mock_count:
self.assertEqual(self.users.count(), 5)
def test_query_count_simple(self):
expected_0 = {
'Count': 0.0,
}
expected_1 = {
'Count': 10.0,
}
with mock.patch.object(
self.users.connection,
'query',
return_value=expected_0) as mock_query:
results = self.users.query_count(username__eq='notmyname')
self.assertTrue(isinstance(results, int))
self.assertEqual(results, 0)
self.assertEqual(mock_query.call_count, 1)
self.assertIn('scan_index_forward', mock_query.call_args[1])
self.assertEqual(True, mock_query.call_args[1]['scan_index_forward'])
self.assertIn('limit', mock_query.call_args[1])
self.assertEqual(None, mock_query.call_args[1]['limit'])
with mock.patch.object(
self.users.connection,
'query',
return_value=expected_1) as mock_query:
results = self.users.query_count(username__gt='somename', consistent=True, scan_index_forward=False, limit=10)
self.assertTrue(isinstance(results, int))
self.assertEqual(results, 10)
self.assertEqual(mock_query.call_count, 1)
self.assertIn('scan_index_forward', mock_query.call_args[1])
self.assertEqual(False, mock_query.call_args[1]['scan_index_forward'])
self.assertIn('limit', mock_query.call_args[1])
self.assertEqual(10, mock_query.call_args[1]['limit'])
def test_query_count_paginated(self):
def return_side_effect(*args, **kwargs):
if kwargs.get('exclusive_start_key'):
return {'Count': 10, 'LastEvaluatedKey': None}
else:
return {
'Count': 20,
'LastEvaluatedKey': {
'username': {
'S': 'johndoe'
},
'date_joined': {
'N': '4118642633'
}
}
}
with mock.patch.object(
self.users.connection,
'query',
side_effect=return_side_effect
) as mock_query:
count = self.users.query_count(username__eq='johndoe')
self.assertTrue(isinstance(count, int))
self.assertEqual(30, count)
self.assertEqual(mock_query.call_count, 2)
def test_private_batch_get(self):
expected = {
"ConsumedCapacity": {
"CapacityUnits": 0.5,
"TableName": "users"
},
'Responses': {
'users': [
{
'username': {'S': 'alice'},
'first_name': {'S': 'Alice'},
'last_name': {'S': 'Expert'},
'date_joined': {'N': '1366056680'},
'friend_count': {'N': '1'},
'friends': {'SS': ['jane']},
},
{
'username': {'S': 'bob'},
'first_name': {'S': 'Bob'},
'last_name': {'S': 'Smith'},
'date_joined': {'N': '1366056888'},
'friend_count': {'N': '1'},
'friends': {'SS': ['johndoe']},
},
{
'username': {'S': 'jane'},
'first_name': {'S': 'Jane'},
'last_name': {'S': 'Doe'},
'date_joined': {'N': '1366057777'},
'friend_count': {'N': '2'},
'friends': {'SS': ['alice', 'johndoe']},
},
],
},
"UnprocessedKeys": {
},
}
with mock.patch.object(
self.users.connection,
'batch_get_item',
return_value=expected) as mock_batch_get:
results = self.users._batch_get(keys=[
{'username': 'alice', 'friend_count': 1},
{'username': 'bob', 'friend_count': 1},
{'username': 'jane'},
])
usernames = [res['username'] for res in results['results']]
self.assertEqual(usernames, ['alice', 'bob', 'jane'])
self.assertEqual(len(results['results']), 3)
self.assertEqual(results['last_key'], None)
self.assertEqual(results['unprocessed_keys'], [])
mock_batch_get.assert_called_once_with(request_items={
'users': {
'Keys': [
{
'username': {'S': 'alice'},
'friend_count': {'N': '1'}
},
{
'username': {'S': 'bob'},
'friend_count': {'N': '1'}
}, {
'username': {'S': 'jane'},
}
]
}
})
# Now alter the expected.
del expected['Responses']['users'][2]
expected['UnprocessedKeys'] = {
'Keys': [
{'username': {'S': 'jane',}},
],
}
with mock.patch.object(
self.users.connection,
'batch_get_item',
return_value=expected) as mock_batch_get_2:
results = self.users._batch_get(keys=[
{'username': 'alice', 'friend_count': 1},
{'username': 'bob', 'friend_count': 1},
{'username': 'jane'},
])
usernames = [res['username'] for res in results['results']]
self.assertEqual(usernames, ['alice', 'bob'])
self.assertEqual(len(results['results']), 2)
self.assertEqual(results['last_key'], None)
self.assertEqual(results['unprocessed_keys'], [
{'username': 'jane'}
])
mock_batch_get_2.assert_called_once_with(request_items={
'users': {
'Keys': [
{
'username': {'S': 'alice'},
'friend_count': {'N': '1'}
},
{
'username': {'S': 'bob'},
'friend_count': {'N': '1'}
}, {
'username': {'S': 'jane'},
}
]
}
})
def test_private_batch_get_attributes(self):
# test if AttributesToGet parameter is passed to DynamoDB API
expected = {
"ConsumedCapacity": {
"CapacityUnits": 0.5,
"TableName": "users"
},
'Responses': {
'users': [
{
'username': {'S': 'alice'},
'first_name': {'S': 'Alice'},
},
{
'username': {'S': 'bob'},
'first_name': {'S': 'Bob'},
},
],
},
"UnprocessedKeys": {},
}
with mock.patch.object(
self.users.connection,
'batch_get_item',
return_value=expected) as mock_batch_get_attr:
results = self.users._batch_get(keys=[
{'username': 'alice'},
{'username': 'bob'},
], attributes=['username', 'first_name'])
usernames = [res['username'] for res in results['results']]
first_names = [res['first_name'] for res in results['results']]
self.assertEqual(usernames, ['alice', 'bob'])
self.assertEqual(first_names, ['Alice', 'Bob'])
self.assertEqual(len(results['results']), 2)
self.assertEqual(results['last_key'], None)
self.assertEqual(results['unprocessed_keys'], [])
mock_batch_get_attr.assert_called_once_with(request_items={
'users': {
'Keys': [ { 'username': {'S': 'alice'} },
{ 'username': {'S': 'bob'} }, ],
'AttributesToGet': ['username', 'first_name'],
},
})
def test_batch_get(self):
items_1 = {
'results': [
Item(self.users, data={
'username': 'johndoe',
'first_name': 'John',
'last_name': 'Doe',
}),
Item(self.users, data={
'username': 'jane',
'first_name': 'Jane',
'last_name': 'Doe',
}),
],
'last_key': None,
'unprocessed_keys': [
'zoeydoe',
]
}
results = self.users.batch_get(keys=[
{'username': 'johndoe'},
{'username': 'jane'},
{'username': 'zoeydoe'},
])
self.assertTrue(isinstance(results, BatchGetResultSet))
self.assertEqual(len(results._results), 0)
self.assertEqual(results.the_callable, self.users._batch_get)
with mock.patch.object(
results,
'the_callable',
return_value=items_1) as mock_batch_get:
res_1 = next(results)
# Now it should be populated.
self.assertEqual(len(results._results), 2)
self.assertEqual(res_1['username'], 'johndoe')
res_2 = next(results)
self.assertEqual(res_2['username'], 'jane')
self.assertEqual(mock_batch_get.call_count, 1)
self.assertEqual(results._keys_left, ['zoeydoe'])
items_2 = {
'results': [
Item(self.users, data={
'username': 'zoeydoe',
'first_name': 'Zoey',
'last_name': 'Doe',
}),
],
}
with mock.patch.object(
results,
'the_callable',
return_value=items_2) as mock_batch_get_2:
res_3 = next(results)
# New results should have been found.
self.assertEqual(len(results._results), 1)
self.assertEqual(res_3['username'], 'zoeydoe')
self.assertRaises(StopIteration, results.next)
self.assertEqual(mock_batch_get_2.call_count, 1)
self.assertEqual(results._keys_left, [])