summaryrefslogtreecommitdiff
path: root/synapse/util/caches
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/util/caches')
-rw-r--r--synapse/util/caches/cached_call.py2
-rw-r--r--synapse/util/caches/deferred_cache.py11
-rw-r--r--synapse/util/caches/lrucache.py57
-rw-r--r--synapse/util/caches/response_cache.py6
-rw-r--r--synapse/util/caches/stream_change_cache.py6
-rw-r--r--synapse/util/caches/ttlcache.py12
6 files changed, 45 insertions, 49 deletions
diff --git a/synapse/util/caches/cached_call.py b/synapse/util/caches/cached_call.py
index e58dd91e..470f4f91 100644
--- a/synapse/util/caches/cached_call.py
+++ b/synapse/util/caches/cached_call.py
@@ -85,7 +85,7 @@ class CachedCall(Generic[TV]):
# result in the deferred, since `awaiting` a deferred destroys its result.
# (Also, if it's a Failure, GCing the deferred would log a critical error
# about unhandled Failures)
- def got_result(r):
+ def got_result(r: Union[TV, Failure]) -> None:
self._result = r
self._deferred.addBoth(got_result)
diff --git a/synapse/util/caches/deferred_cache.py b/synapse/util/caches/deferred_cache.py
index 6262efe0..da502aec 100644
--- a/synapse/util/caches/deferred_cache.py
+++ b/synapse/util/caches/deferred_cache.py
@@ -31,6 +31,7 @@ from prometheus_client import Gauge
from twisted.internet import defer
from twisted.python import failure
+from twisted.python.failure import Failure
from synapse.util.async_helpers import ObservableDeferred
from synapse.util.caches.lrucache import LruCache
@@ -112,7 +113,7 @@ class DeferredCache(Generic[KT, VT]):
self.thread: Optional[threading.Thread] = None
@property
- def max_entries(self):
+ def max_entries(self) -> int:
return self.cache.max_size
def check_thread(self) -> None:
@@ -258,7 +259,7 @@ class DeferredCache(Generic[KT, VT]):
return False
- def cb(result) -> None:
+ def cb(result: VT) -> None:
if compare_and_pop():
self.cache.set(key, result, entry.callbacks)
else:
@@ -270,7 +271,7 @@ class DeferredCache(Generic[KT, VT]):
# not have been. Either way, let's double-check now.
entry.invalidate()
- def eb(_fail) -> None:
+ def eb(_fail: Failure) -> None:
compare_and_pop()
entry.invalidate()
@@ -284,11 +285,11 @@ class DeferredCache(Generic[KT, VT]):
def prefill(
self, key: KT, value: VT, callback: Optional[Callable[[], None]] = None
- ):
+ ) -> None:
callbacks = [callback] if callback else []
self.cache.set(key, value, callbacks=callbacks)
- def invalidate(self, key):
+ def invalidate(self, key) -> None:
"""Delete a key, or tree of entries
If the cache is backed by a regular dict, then "key" must be of
diff --git a/synapse/util/caches/lrucache.py b/synapse/util/caches/lrucache.py
index 4ff62b40..a0a7a9de 100644
--- a/synapse/util/caches/lrucache.py
+++ b/synapse/util/caches/lrucache.py
@@ -52,7 +52,7 @@ logger = logging.getLogger(__name__)
try:
from pympler.asizeof import Asizer
- def _get_size_of(val: Any, *, recurse=True) -> int:
+ def _get_size_of(val: Any, *, recurse: bool = True) -> int:
"""Get an estimate of the size in bytes of the object.
Args:
@@ -71,7 +71,7 @@ try:
except ImportError:
- def _get_size_of(val: Any, *, recurse=True) -> int:
+ def _get_size_of(val: Any, *, recurse: bool = True) -> int:
return 0
@@ -85,15 +85,6 @@ VT = TypeVar("VT")
# a general type var, distinct from either KT or VT
T = TypeVar("T")
-
-def enumerate_leaves(node, depth):
- if depth == 0:
- yield node
- else:
- for n in node.values():
- yield from enumerate_leaves(n, depth - 1)
-
-
P = TypeVar("P")
@@ -102,7 +93,7 @@ class _TimedListNode(ListNode[P]):
__slots__ = ["last_access_ts_secs"]
- def update_last_access(self, clock: Clock):
+ def update_last_access(self, clock: Clock) -> None:
self.last_access_ts_secs = int(clock.time())
@@ -115,7 +106,7 @@ GLOBAL_ROOT = ListNode["_Node"].create_root_node()
@wrap_as_background_process("LruCache._expire_old_entries")
-async def _expire_old_entries(clock: Clock, expiry_seconds: int):
+async def _expire_old_entries(clock: Clock, expiry_seconds: int) -> None:
"""Walks the global cache list to find cache entries that haven't been
accessed in the given number of seconds.
"""
@@ -163,7 +154,7 @@ async def _expire_old_entries(clock: Clock, expiry_seconds: int):
logger.info("Dropped %d items from caches", i)
-def setup_expire_lru_cache_entries(hs: "HomeServer"):
+def setup_expire_lru_cache_entries(hs: "HomeServer") -> None:
"""Start a background job that expires all cache entries if they have not
been accessed for the given number of seconds.
"""
@@ -183,7 +174,7 @@ def setup_expire_lru_cache_entries(hs: "HomeServer"):
)
-class _Node:
+class _Node(Generic[KT, VT]):
__slots__ = [
"_list_node",
"_global_list_node",
@@ -197,8 +188,8 @@ class _Node:
def __init__(
self,
root: "ListNode[_Node]",
- key,
- value,
+ key: KT,
+ value: VT,
cache: "weakref.ReferenceType[LruCache]",
clock: Clock,
callbacks: Collection[Callable[[], None]] = (),
@@ -409,7 +400,7 @@ class LruCache(Generic[KT, VT]):
def synchronized(f: FT) -> FT:
@wraps(f)
- def inner(*args, **kwargs):
+ def inner(*args: Any, **kwargs: Any) -> Any:
with lock:
return f(*args, **kwargs)
@@ -418,17 +409,19 @@ class LruCache(Generic[KT, VT]):
cached_cache_len = [0]
if size_callback is not None:
- def cache_len():
+ def cache_len() -> int:
return cached_cache_len[0]
else:
- def cache_len():
+ def cache_len() -> int:
return len(cache)
self.len = synchronized(cache_len)
- def add_node(key, value, callbacks: Collection[Callable[[], None]] = ()):
+ def add_node(
+ key: KT, value: VT, callbacks: Collection[Callable[[], None]] = ()
+ ) -> None:
node = _Node(
list_root,
key,
@@ -446,7 +439,7 @@ class LruCache(Generic[KT, VT]):
if caches.TRACK_MEMORY_USAGE and metrics:
metrics.inc_memory_usage(node.memory)
- def move_node_to_front(node: _Node):
+ def move_node_to_front(node: _Node) -> None:
node.move_to_front(real_clock, list_root)
def delete_node(node: _Node) -> int:
@@ -488,7 +481,7 @@ class LruCache(Generic[KT, VT]):
default: Optional[T] = None,
callbacks: Collection[Callable[[], None]] = (),
update_metrics: bool = True,
- ):
+ ) -> Union[None, T, VT]:
node = cache.get(key, None)
if node is not None:
move_node_to_front(node)
@@ -502,7 +495,9 @@ class LruCache(Generic[KT, VT]):
return default
@synchronized
- def cache_set(key: KT, value: VT, callbacks: Iterable[Callable[[], None]] = ()):
+ def cache_set(
+ key: KT, value: VT, callbacks: Iterable[Callable[[], None]] = ()
+ ) -> None:
node = cache.get(key, None)
if node is not None:
# We sometimes store large objects, e.g. dicts, which cause
@@ -547,7 +542,7 @@ class LruCache(Generic[KT, VT]):
...
@synchronized
- def cache_pop(key: KT, default: Optional[T] = None):
+ def cache_pop(key: KT, default: Optional[T] = None) -> Union[None, T, VT]:
node = cache.get(key, None)
if node:
delete_node(node)
@@ -612,25 +607,25 @@ class LruCache(Generic[KT, VT]):
self.contains = cache_contains
self.clear = cache_clear
- def __getitem__(self, key):
+ def __getitem__(self, key: KT) -> VT:
result = self.get(key, self.sentinel)
if result is self.sentinel:
raise KeyError()
else:
- return result
+ return cast(VT, result)
- def __setitem__(self, key, value):
+ def __setitem__(self, key: KT, value: VT) -> None:
self.set(key, value)
- def __delitem__(self, key, value):
+ def __delitem__(self, key: KT, value: VT) -> None:
result = self.pop(key, self.sentinel)
if result is self.sentinel:
raise KeyError()
- def __len__(self):
+ def __len__(self) -> int:
return self.len()
- def __contains__(self, key):
+ def __contains__(self, key: KT) -> bool:
return self.contains(key)
def set_cache_factor(self, factor: float) -> bool:
diff --git a/synapse/util/caches/response_cache.py b/synapse/util/caches/response_cache.py
index ed720433..88ccf443 100644
--- a/synapse/util/caches/response_cache.py
+++ b/synapse/util/caches/response_cache.py
@@ -104,8 +104,8 @@ class ResponseCache(Generic[KV]):
return None
def _set(
- self, context: ResponseCacheContext[KV], deferred: defer.Deferred
- ) -> defer.Deferred:
+ self, context: ResponseCacheContext[KV], deferred: "defer.Deferred[RV]"
+ ) -> "defer.Deferred[RV]":
"""Set the entry for the given key to the given deferred.
*deferred* should run its callbacks in the sentinel logcontext (ie,
@@ -126,7 +126,7 @@ class ResponseCache(Generic[KV]):
key = context.cache_key
self.pending_result_cache[key] = result
- def on_complete(r):
+ def on_complete(r: RV) -> RV:
# if this cache has a non-zero timeout, and the callback has not cleared
# the should_cache bit, we leave it in the cache for now and schedule
# its removal later.
diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py
index 27b1da23..330709b8 100644
--- a/synapse/util/caches/stream_change_cache.py
+++ b/synapse/util/caches/stream_change_cache.py
@@ -40,10 +40,10 @@ class StreamChangeCache:
self,
name: str,
current_stream_pos: int,
- max_size=10000,
+ max_size: int = 10000,
prefilled_cache: Optional[Mapping[EntityType, int]] = None,
- ):
- self._original_max_size = max_size
+ ) -> None:
+ self._original_max_size: int = max_size
self._max_size = math.floor(max_size)
self._entity_to_key: Dict[EntityType, int] = {}
diff --git a/synapse/util/caches/ttlcache.py b/synapse/util/caches/ttlcache.py
index 46afe3f9..0b9ac26b 100644
--- a/synapse/util/caches/ttlcache.py
+++ b/synapse/util/caches/ttlcache.py
@@ -159,12 +159,12 @@ class TTLCache(Generic[KT, VT]):
del self._expiry_list[0]
-@attr.s(frozen=True, slots=True)
-class _CacheEntry:
+@attr.s(frozen=True, slots=True, auto_attribs=True)
+class _CacheEntry: # Should be Generic[KT, VT]. See python-attrs/attrs#313
"""TTLCache entry"""
# expiry_time is the first attribute, so that entries are sorted by expiry.
- expiry_time = attr.ib(type=float)
- ttl = attr.ib(type=float)
- key = attr.ib()
- value = attr.ib()
+ expiry_time: float
+ ttl: float
+ key: Any # should be KT
+ value: Any # should be VT