summaryrefslogtreecommitdiff
path: root/synapse/storage/controllers
diff options
context:
space:
mode:
authorAntonio Russo <aerusso@aerusso.net>2023-09-27 18:16:24 -0600
committerAntonio Russo <aerusso@aerusso.net>2023-09-27 18:16:24 -0600
commitb230e2110fd27e03a5452ce95dbbe8ea8dd87037 (patch)
tree6fb976a0de5fa566a805537ade0ddae5502ae138 /synapse/storage/controllers
parent44b86bfd0ddbeaf838f80677985bce34ec04d72f (diff)
New upstream version 1.93.0
Diffstat (limited to 'synapse/storage/controllers')
-rw-r--r--synapse/storage/controllers/persist_events.py17
1 files changed, 7 insertions, 10 deletions
diff --git a/synapse/storage/controllers/persist_events.py b/synapse/storage/controllers/persist_events.py
index abd1d149..f39ae2d6 100644
--- a/synapse/storage/controllers/persist_events.py
+++ b/synapse/storage/controllers/persist_events.py
@@ -19,6 +19,7 @@ import logging
from collections import deque
from typing import (
TYPE_CHECKING,
+ AbstractSet,
Any,
Awaitable,
Callable,
@@ -154,12 +155,13 @@ class _UpdateCurrentStateTask:
_EventPersistQueueTask = Union[_PersistEventsTask, _UpdateCurrentStateTask]
+_PersistResult = TypeVar("_PersistResult")
@attr.s(auto_attribs=True, slots=True)
-class _EventPersistQueueItem:
+class _EventPersistQueueItem(Generic[_PersistResult]):
task: _EventPersistQueueTask
- deferred: ObservableDeferred
+ deferred: ObservableDeferred[_PersistResult]
parent_opentracing_span_contexts: List = attr.ib(factory=list)
"""A list of opentracing spans waiting for this batch"""
@@ -168,9 +170,6 @@ class _EventPersistQueueItem:
"""The opentracing span under which the persistence actually happened"""
-_PersistResult = TypeVar("_PersistResult")
-
-
class _EventPeristenceQueue(Generic[_PersistResult]):
"""Queues up tasks so that they can be processed with only one concurrent
transaction per room.
@@ -620,7 +619,7 @@ class EventsPersistenceStorageController:
)
for room_id, ev_ctx_rm in events_by_room.items():
- latest_event_ids = set(
+ latest_event_ids = (
await self.main_store.get_latest_event_ids_in_room(room_id)
)
new_latest_event_ids = await self._calculate_new_extremities(
@@ -742,7 +741,7 @@ class EventsPersistenceStorageController:
self,
room_id: str,
event_contexts: List[Tuple[EventBase, EventContext]],
- latest_event_ids: Collection[str],
+ latest_event_ids: AbstractSet[str],
) -> Set[str]:
"""Calculates the new forward extremities for a room given events to
persist.
@@ -760,8 +759,6 @@ class EventsPersistenceStorageController:
and not event.internal_metadata.is_soft_failed()
]
- latest_event_ids = set(latest_event_ids)
-
# start with the existing forward extremities
result = set(latest_event_ids)
@@ -800,7 +797,7 @@ class EventsPersistenceStorageController:
self,
room_id: str,
events_context: List[Tuple[EventBase, EventContext]],
- old_latest_event_ids: Set[str],
+ old_latest_event_ids: AbstractSet[str],
new_latest_event_ids: Set[str],
) -> Tuple[Optional[StateMap[str]], Optional[StateMap[str]], Set[str]]:
"""Calculate the current state dict after adding some new events to