summaryrefslogtreecommitdiff
path: root/synapse/storage/databases/main/purge_events.py
diff options
context:
space:
mode:
authorAndrej Shadura <andrewsh@debian.org>2021-06-16 10:28:18 +0200
committerAndrej Shadura <andrewsh@debian.org>2021-06-16 10:28:18 +0200
commit219af4a8aef838c5e3689a2aa71cf72f2fd75aa2 (patch)
tree3183d9a61335f862a9ddd3b3de2c804aaa93a6bf /synapse/storage/databases/main/purge_events.py
parent396a9dfc77fc34b77d7ef552048f22ecb94e91ea (diff)
New upstream version 1.36.0
Diffstat (limited to 'synapse/storage/databases/main/purge_events.py')
-rw-r--r--synapse/storage/databases/main/purge_events.py26
1 files changed, 21 insertions, 5 deletions
diff --git a/synapse/storage/databases/main/purge_events.py b/synapse/storage/databases/main/purge_events.py
index 8f83748b..7fb7780d 100644
--- a/synapse/storage/databases/main/purge_events.py
+++ b/synapse/storage/databases/main/purge_events.py
@@ -16,14 +16,14 @@ import logging
from typing import Any, List, Set, Tuple
from synapse.api.errors import SynapseError
-from synapse.storage._base import SQLBaseStore
+from synapse.storage.databases.main import CacheInvalidationWorkerStore
from synapse.storage.databases.main.state import StateGroupWorkerStore
from synapse.types import RoomStreamToken
logger = logging.getLogger(__name__)
-class PurgeEventsStore(StateGroupWorkerStore, SQLBaseStore):
+class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
async def purge_history(
self, room_id: str, token: str, delete_local_events: bool
) -> Set[int]:
@@ -203,8 +203,6 @@ class PurgeEventsStore(StateGroupWorkerStore, SQLBaseStore):
"DELETE FROM event_to_state_groups "
"WHERE event_id IN (SELECT event_id from events_to_purge)"
)
- for event_id, _ in event_rows:
- txn.call_after(self._get_state_group_for_event.invalidate, (event_id,))
# Delete all remote non-state events
for table in (
@@ -283,6 +281,20 @@ class PurgeEventsStore(StateGroupWorkerStore, SQLBaseStore):
# so make sure to keep this actually last.
txn.execute("DROP TABLE events_to_purge")
+ for event_id, should_delete in event_rows:
+ self._invalidate_cache_and_stream(
+ txn, self._get_state_group_for_event, (event_id,)
+ )
+
+ # XXX: This is racy, since have_seen_events could be called between the
+ # transaction completing and the invalidation running. On the other hand,
+ # that's no different to calling `have_seen_events` just before the
+ # event is deleted from the database.
+ if should_delete:
+ self._invalidate_cache_and_stream(
+ txn, self.have_seen_event, (room_id, event_id)
+ )
+
logger.info("[purge] done")
return referenced_state_groups
@@ -422,7 +434,11 @@ class PurgeEventsStore(StateGroupWorkerStore, SQLBaseStore):
# index on them. In any case we should be clearing out 'stream' tables
# periodically anyway (#5888)
- # TODO: we could probably usefully do a bunch of cache invalidation here
+ # TODO: we could probably usefully do a bunch more cache invalidation here
+
+ # XXX: as with purge_history, this is racy, but no worse than other races
+ # that already exist.
+ self._invalidate_cache_and_stream(txn, self.have_seen_event, (room_id,))
logger.info("[purge] done")