diff options
author | Antonio Russo <aerusso@aerusso.net> | 2023-09-27 18:16:24 -0600 |
---|---|---|
committer | Antonio Russo <aerusso@aerusso.net> | 2023-09-27 18:16:24 -0600 |
commit | b230e2110fd27e03a5452ce95dbbe8ea8dd87037 (patch) | |
tree | 6fb976a0de5fa566a805537ade0ddae5502ae138 /synapse/storage/controllers | |
parent | 44b86bfd0ddbeaf838f80677985bce34ec04d72f (diff) |
New upstream version 1.93.0
Diffstat (limited to 'synapse/storage/controllers')
-rw-r--r-- | synapse/storage/controllers/persist_events.py | 17 |
1 files changed, 7 insertions, 10 deletions
diff --git a/synapse/storage/controllers/persist_events.py b/synapse/storage/controllers/persist_events.py index abd1d149..f39ae2d6 100644 --- a/synapse/storage/controllers/persist_events.py +++ b/synapse/storage/controllers/persist_events.py @@ -19,6 +19,7 @@ import logging from collections import deque from typing import ( TYPE_CHECKING, + AbstractSet, Any, Awaitable, Callable, @@ -154,12 +155,13 @@ class _UpdateCurrentStateTask: _EventPersistQueueTask = Union[_PersistEventsTask, _UpdateCurrentStateTask] +_PersistResult = TypeVar("_PersistResult") @attr.s(auto_attribs=True, slots=True) -class _EventPersistQueueItem: +class _EventPersistQueueItem(Generic[_PersistResult]): task: _EventPersistQueueTask - deferred: ObservableDeferred + deferred: ObservableDeferred[_PersistResult] parent_opentracing_span_contexts: List = attr.ib(factory=list) """A list of opentracing spans waiting for this batch""" @@ -168,9 +170,6 @@ class _EventPersistQueueItem: """The opentracing span under which the persistence actually happened""" -_PersistResult = TypeVar("_PersistResult") - - class _EventPeristenceQueue(Generic[_PersistResult]): """Queues up tasks so that they can be processed with only one concurrent transaction per room. @@ -620,7 +619,7 @@ class EventsPersistenceStorageController: ) for room_id, ev_ctx_rm in events_by_room.items(): - latest_event_ids = set( + latest_event_ids = ( await self.main_store.get_latest_event_ids_in_room(room_id) ) new_latest_event_ids = await self._calculate_new_extremities( @@ -742,7 +741,7 @@ class EventsPersistenceStorageController: self, room_id: str, event_contexts: List[Tuple[EventBase, EventContext]], - latest_event_ids: Collection[str], + latest_event_ids: AbstractSet[str], ) -> Set[str]: """Calculates the new forward extremities for a room given events to persist. @@ -760,8 +759,6 @@ class EventsPersistenceStorageController: and not event.internal_metadata.is_soft_failed() ] - latest_event_ids = set(latest_event_ids) - # start with the existing forward extremities result = set(latest_event_ids) @@ -800,7 +797,7 @@ class EventsPersistenceStorageController: self, room_id: str, events_context: List[Tuple[EventBase, EventContext]], - old_latest_event_ids: Set[str], + old_latest_event_ids: AbstractSet[str], new_latest_event_ids: Set[str], ) -> Tuple[Optional[StateMap[str]], Optional[StateMap[str]], Set[str]]: """Calculate the current state dict after adding some new events to |