blob: 5856e71654985a15f7ea2d52214d19861e952cc6 [file] [edit]
from collections.abc import Container, Iterable
from typing import Any, Protocol
import cachetools
class _TestCaseProtocol(Protocol):
def assertEqual(self, first: Any, second: Any, msg: Any = None) -> None: ...
def assertNotEqual(self, first: Any, second: Any, msg: Any = None) -> None: ...
def assertTrue(self, expr: Any, msg: Any = None) -> None: ...
def assertFalse(self, expr: Any, msg: Any = None) -> None: ...
def assertIs(self, expr1: object, expr2: object, msg: Any = None) -> None: ...
def assertIsNot(self, expr1: object, expr2: object, msg: Any = None) -> None: ...
def assertIsNone(self, obj: object, msg: Any = None) -> None: ...
def assertIn(
self, member: Any, container: Iterable[Any] | Container[Any], msg: Any = None
) -> None: ...
def assertNotIn(
self, member: Any, container: Iterable[Any] | Container[Any], msg: Any = None
) -> None: ...
def assertRaises(self, expected_exception: type[BaseException]) -> Any: ...
def assertRaisesRegex(
self, expected_exception: type[BaseException], expected_regex: str
) -> Any: ...
class CacheTestMixin(_TestCaseProtocol):
Cache: type[cachetools.Cache]
def test_defaults(self):
cache = self.Cache(maxsize=1)
self.assertEqual(0, len(cache))
self.assertEqual(1, cache.maxsize)
self.assertEqual(0, cache.currsize)
self.assertEqual(1, cache.getsizeof(None))
self.assertEqual(1, cache.getsizeof(""))
self.assertEqual(1, cache.getsizeof(0))
self.assertTrue(repr(cache).startswith(cache.__class__.__name__))
def test_insert(self):
cache = self.Cache(maxsize=2)
cache.update({1: 1, 2: 2})
self.assertEqual(2, len(cache))
self.assertEqual(1, cache[1])
self.assertEqual(2, cache[2])
cache[3] = 3
self.assertEqual(2, len(cache))
self.assertEqual(3, cache[3])
self.assertTrue(1 in cache or 2 in cache)
cache[4] = 4
self.assertEqual(2, len(cache))
self.assertEqual(4, cache[4])
self.assertTrue(1 in cache or 2 in cache or 3 in cache)
def test_update(self):
cache = self.Cache(maxsize=2)
cache.update({1: 1, 2: 2})
self.assertEqual(2, len(cache))
self.assertEqual(1, cache[1])
self.assertEqual(2, cache[2])
cache.update({1: 1, 2: 2})
self.assertEqual(2, len(cache))
self.assertEqual(1, cache[1])
self.assertEqual(2, cache[2])
cache.update({1: "a", 2: "b"})
self.assertEqual(2, len(cache))
self.assertEqual("a", cache[1])
self.assertEqual("b", cache[2])
def test_delete(self):
cache = self.Cache(maxsize=2)
cache.update({1: 1, 2: 2})
self.assertEqual(2, len(cache))
self.assertEqual(1, cache[1])
self.assertEqual(2, cache[2])
del cache[2]
self.assertEqual(1, len(cache))
self.assertEqual(1, cache[1])
self.assertNotIn(2, cache)
del cache[1]
self.assertEqual(0, len(cache))
self.assertNotIn(1, cache)
self.assertNotIn(2, cache)
with self.assertRaises(KeyError):
del cache[1]
self.assertEqual(0, len(cache))
self.assertNotIn(1, cache)
self.assertNotIn(2, cache)
def test_pop(self):
cache = self.Cache(maxsize=2)
cache.update({1: 1, 2: 2})
self.assertEqual(2, cache.pop(2))
self.assertEqual(1, len(cache))
self.assertEqual(1, cache.pop(1))
self.assertEqual(0, len(cache))
with self.assertRaises(KeyError):
cache.pop(2)
with self.assertRaises(KeyError):
cache.pop(1)
with self.assertRaises(KeyError):
cache.pop(0)
self.assertEqual(None, cache.pop(2, None))
self.assertEqual(None, cache.pop(1, None))
self.assertEqual(None, cache.pop(0, None))
def test_popitem(self):
cache = self.Cache(maxsize=2)
cache.update({1: 1, 2: 2})
key, _ = cache.popitem()
self.assertIn(key, {1, 2})
self.assertEqual(1, len(cache))
key, _ = cache.popitem()
self.assertIn(key, {1, 2})
self.assertEqual(0, len(cache))
with self.assertRaises(KeyError):
cache.popitem()
def test_popitem_exception_context(self):
# since Python 3.7, MutableMapping.popitem() suppresses
# exception context as implementation detail
with self.assertRaises(KeyError) as cm:
self.Cache(maxsize=2).popitem()
e = cm.exception
self.assertIsNone(e.__cause__)
self.assertTrue(e.__suppress_context__)
def test_missing(self):
class DefaultCache(self.Cache):
def __missing__(self, key):
self[key] = key
return key
cache = DefaultCache(maxsize=2)
self.assertEqual(0, cache.currsize)
self.assertEqual(2, cache.maxsize)
self.assertEqual(0, len(cache))
self.assertEqual(1, cache[1])
self.assertEqual(2, cache[2])
self.assertEqual(2, len(cache))
self.assertTrue(1 in cache and 2 in cache)
self.assertEqual(3, cache[3])
self.assertEqual(2, len(cache))
self.assertTrue(3 in cache)
self.assertTrue(1 in cache or 2 in cache)
self.assertTrue(1 not in cache or 2 not in cache)
self.assertEqual(4, cache[4])
self.assertEqual(2, len(cache))
self.assertTrue(4 in cache)
self.assertTrue(1 in cache or 2 in cache or 3 in cache)
# verify __missing__() is *not* called for any operations
# besides __getitem__()
self.assertEqual(4, cache.get(4))
self.assertEqual(None, cache.get(5))
self.assertEqual(5 * 5, cache.get(5, 5 * 5))
self.assertEqual(2, len(cache))
self.assertEqual(4, cache.pop(4))
with self.assertRaises(KeyError):
cache.pop(5)
self.assertEqual(None, cache.pop(5, None))
self.assertEqual(5 * 5, cache.pop(5, 5 * 5))
self.assertEqual(1, len(cache))
cache.clear()
cache[1] = 1 + 1
self.assertEqual(1 + 1, cache.setdefault(1))
self.assertEqual(1 + 1, cache.setdefault(1, 1))
self.assertEqual(1 + 1, cache[1])
self.assertEqual(2 + 2, cache.setdefault(2, 2 + 2))
self.assertEqual(2 + 2, cache.setdefault(2, None))
self.assertEqual(2 + 2, cache.setdefault(2))
self.assertEqual(2 + 2, cache[2])
self.assertEqual(2, len(cache))
self.assertTrue(1 in cache and 2 in cache)
self.assertEqual(None, cache.setdefault(3))
self.assertEqual(2, len(cache))
self.assertTrue(3 in cache)
self.assertTrue(1 in cache or 2 in cache)
self.assertTrue(1 not in cache or 2 not in cache)
def test_missing_getsizeof(self):
class DefaultCache(self.Cache):
def __missing__(self, key):
try:
self[key] = key
except ValueError:
pass # not stored
return key
cache = DefaultCache(maxsize=2, getsizeof=lambda x: x)
self.assertEqual(0, cache.currsize)
self.assertEqual(2, cache.maxsize)
self.assertEqual(1, cache[1])
self.assertEqual(1, len(cache))
self.assertEqual(1, cache.currsize)
self.assertIn(1, cache)
self.assertEqual(2, cache[2])
self.assertEqual(1, len(cache))
self.assertEqual(2, cache.currsize)
self.assertNotIn(1, cache)
self.assertIn(2, cache)
self.assertEqual(3, cache[3]) # not stored
self.assertEqual(1, len(cache))
self.assertEqual(2, cache.currsize)
self.assertEqual(1, cache[1])
self.assertEqual(1, len(cache))
self.assertEqual(1, cache.currsize)
self.assertEqual((1, 1), cache.popitem())
def _test_getsizeof(self, cache):
self.assertEqual(0, cache.currsize)
self.assertEqual(3, cache.maxsize)
self.assertEqual(1, cache.getsizeof(1))
self.assertEqual(2, cache.getsizeof(2))
self.assertEqual(3, cache.getsizeof(3))
cache.update({1: 1, 2: 2})
self.assertEqual(2, len(cache))
self.assertEqual(3, cache.currsize)
self.assertEqual(1, cache[1])
self.assertEqual(2, cache[2])
cache[1] = 2
self.assertEqual(1, len(cache))
self.assertEqual(2, cache.currsize)
self.assertEqual(2, cache[1])
self.assertNotIn(2, cache)
cache.update({1: 1, 2: 2})
self.assertEqual(2, len(cache))
self.assertEqual(3, cache.currsize)
self.assertEqual(1, cache[1])
self.assertEqual(2, cache[2])
cache[3] = 3
self.assertEqual(1, len(cache))
self.assertEqual(3, cache.currsize)
self.assertEqual(3, cache[3])
self.assertNotIn(1, cache)
self.assertNotIn(2, cache)
with self.assertRaises(ValueError):
cache[3] = 4
self.assertEqual(1, len(cache))
self.assertEqual(3, cache.currsize)
self.assertEqual(3, cache[3])
with self.assertRaises(ValueError):
cache[4] = 4
self.assertEqual(1, len(cache))
self.assertEqual(3, cache.currsize)
self.assertEqual(3, cache[3])
def test_getsizeof_param(self):
self._test_getsizeof(self.Cache(maxsize=3, getsizeof=lambda x: x))
def test_getsizeof_subclass(self):
class Cache(self.Cache):
@staticmethod
def getsizeof(value):
return value
self._test_getsizeof(Cache(maxsize=3))
def test_clear(self):
cache = self.Cache(maxsize=2)
cache.update({1: 1, 2: 2})
self.assertEqual(2, len(cache))
self.assertEqual(2, cache.currsize)
cache.clear()
self.assertEqual(0, len(cache))
self.assertEqual(0, cache.currsize)
self.assertNotIn(1, cache)
self.assertNotIn(2, cache)
# verify cache can be reused after clear
cache[3] = 3
cache[4] = 4
self.assertEqual(2, len(cache))
self.assertEqual(3, cache[3])
self.assertEqual(4, cache[4])
# verify eviction still works after clear
cache[5] = 5
self.assertEqual(2, len(cache))
self.assertIn(5, cache)
def test_clear_empty(self):
cache = self.Cache(maxsize=2)
cache.clear() # should not raise
self.assertEqual(0, len(cache))
self.assertEqual(0, cache.currsize)
def test_clear_getsizeof(self):
cache = self.Cache(maxsize=10, getsizeof=lambda x: x)
cache[1] = 1
cache[2] = 2
cache[3] = 3
self.assertEqual(3, len(cache))
self.assertEqual(6, cache.currsize)
cache.clear()
self.assertEqual(0, len(cache))
self.assertEqual(0, cache.currsize)
# verify cache can be reused with getsizeof after clear
cache[4] = 4
self.assertEqual(1, len(cache))
self.assertEqual(4, cache.currsize)
def test_pickle(self):
import pickle
source = self.Cache(maxsize=2)
source.update({1: 1, 2: 2})
cache = pickle.loads(pickle.dumps(source))
self.assertEqual(source, cache)
self.assertEqual(2, len(cache))
self.assertEqual(1, cache[1])
self.assertEqual(2, cache[2])
cache[3] = 3
self.assertEqual(2, len(cache))
self.assertEqual(3, cache[3])
self.assertTrue(1 in cache or 2 in cache)
cache[4] = 4
self.assertEqual(2, len(cache))
self.assertEqual(4, cache[4])
self.assertTrue(1 in cache or 2 in cache or 3 in cache)
self.assertEqual(cache, pickle.loads(pickle.dumps(cache)))
def test_pickle_maxsize(self):
import pickle
import sys
# test empty cache, single element, large cache (recursion limit)
for n in [0, 1, sys.getrecursionlimit() * 2]:
source = self.Cache(maxsize=n)
source.update((i, i) for i in range(n))
cache = pickle.loads(pickle.dumps(source))
self.assertEqual(n, len(cache))
self.assertEqual(source, cache)
class CountedLock:
def __init__(self):
self.count = 0
def __enter__(self):
self.count += 1
def __exit__(self, *exc):
pass
class CountedCondition(CountedLock):
def __init__(self):
super().__init__()
self.wait_count = 0
self.notify_count = 0
# only wait_for() and notify_all() are currently used, but wait()
# and notify() are also required according to the docs...
def wait(self, timeout=None):
self.wait_count += 1
return True
def wait_for(self, predicate, timeout=None):
res = predicate()
self.wait_count += 1
return res
def notify(self, n=1):
self.notify_count += 1
def notify_all(self):
self.notify_count += 1