summaryrefslogtreecommitdiff
path: root/synapse/storage/_base.py
diff options
context:
space:
mode:
authorErik Johnston <erikj@matrix.org>2016-08-24 15:05:56 +0100
committerErik Johnston <erikj@matrix.org>2016-08-24 15:05:56 +0100
commitc1c15ad12f8bda0d65778bd03543ad1f14a1cfc2 (patch)
tree1c843a49d3d5168ff998a54f50d30cdc3814f104 /synapse/storage/_base.py
parentcfb5c3f91265d2b9423b47cec2b555b39c46bc4b (diff)
Imported Upstream version 0.17.1
Diffstat (limited to 'synapse/storage/_base.py')
-rw-r--r--synapse/storage/_base.py74
1 files changed, 66 insertions, 8 deletions
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 0117fdc6..49fa8614 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -19,6 +19,7 @@ from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
from synapse.util.caches.dictionary_cache import DictionaryCache
from synapse.util.caches.descriptors import Cache
from synapse.util.caches import intern_dict
+from synapse.storage.engines import PostgresEngine
import synapse.metrics
@@ -165,7 +166,7 @@ class SQLBaseStore(object):
self._txn_perf_counters = PerformanceCounters()
self._get_event_counters = PerformanceCounters()
- self._get_event_cache = Cache("*getEvent*", keylen=3, lru=True,
+ self._get_event_cache = Cache("*getEvent*", keylen=3,
max_entries=hs.config.event_cache_size)
self._state_group_cache = DictionaryCache(
@@ -305,13 +306,14 @@ class SQLBaseStore(object):
func, *args, **kwargs
)
- with PreserveLoggingContext():
- result = yield self._db_pool.runWithConnection(
- inner_func, *args, **kwargs
- )
-
- for after_callback, after_args in after_callbacks:
- after_callback(*after_args)
+ try:
+ with PreserveLoggingContext():
+ result = yield self._db_pool.runWithConnection(
+ inner_func, *args, **kwargs
+ )
+ finally:
+ for after_callback, after_args in after_callbacks:
+ after_callback(*after_args)
defer.returnValue(result)
@defer.inlineCallbacks
@@ -860,6 +862,62 @@ class SQLBaseStore(object):
return cache, min_val
+ def _invalidate_cache_and_stream(self, txn, cache_func, keys):
+ """Invalidates the cache and adds it to the cache stream so slaves
+ will know to invalidate their caches.
+
+ This should only be used to invalidate caches where slaves won't
+ otherwise know from other replication streams that the cache should
+ be invalidated.
+ """
+ txn.call_after(cache_func.invalidate, keys)
+
+ if isinstance(self.database_engine, PostgresEngine):
+ # get_next() returns a context manager which is designed to wrap
+ # the transaction. However, we want to only get an ID when we want
+ # to use it, here, so we need to call __enter__ manually, and have
+ # __exit__ called after the transaction finishes.
+ ctx = self._cache_id_gen.get_next()
+ stream_id = ctx.__enter__()
+ txn.call_after(ctx.__exit__, None, None, None)
+ txn.call_after(self.hs.get_notifier().on_new_replication_data)
+
+ self._simple_insert_txn(
+ txn,
+ table="cache_invalidation_stream",
+ values={
+ "stream_id": stream_id,
+ "cache_func": cache_func.__name__,
+ "keys": list(keys),
+ "invalidation_ts": self.clock.time_msec(),
+ }
+ )
+
+ def get_all_updated_caches(self, last_id, current_id, limit):
+ if last_id == current_id:
+ return defer.succeed([])
+
+ def get_all_updated_caches_txn(txn):
+ # We purposefully don't bound by the current token, as we want to
+ # send across cache invalidations as quickly as possible. Cache
+ # invalidations are idempotent, so duplicates are fine.
+ sql = (
+ "SELECT stream_id, cache_func, keys, invalidation_ts"
+ " FROM cache_invalidation_stream"
+ " WHERE stream_id > ? ORDER BY stream_id ASC LIMIT ?"
+ )
+ txn.execute(sql, (last_id, limit,))
+ return txn.fetchall()
+ return self.runInteraction(
+ "get_all_updated_caches", get_all_updated_caches_txn
+ )
+
+ def get_cache_stream_token(self):
+ if self._cache_id_gen:
+ return self._cache_id_gen.get_current_token()
+ else:
+ return 0
+
class _RollbackButIsFineException(Exception):
""" This exception is used to rollback a transaction without implying