summaryrefslogtreecommitdiff
path: root/synapse/replication/tcp/streams/events.py
diff options
context:
space:
mode:
authorAndrej Shadura <andrewsh@debian.org>2020-05-25 11:54:03 +0200
committerAndrej Shadura <andrewsh@debian.org>2020-05-25 11:54:03 +0200
commitda7f96aa2a3b1485dafa016f38aac1d4376b64e7 (patch)
tree2c4cdc1096370cff409af72b9e97d3edf5f84c59 /synapse/replication/tcp/streams/events.py
parentbef77b10b264d8c082a3604656e186ab162e5d64 (diff)
New upstream version 1.13.0
Diffstat (limited to 'synapse/replication/tcp/streams/events.py')
-rw-r--r--synapse/replication/tcp/streams/events.py114
1 files changed, 97 insertions, 17 deletions
diff --git a/synapse/replication/tcp/streams/events.py b/synapse/replication/tcp/streams/events.py
index b3afabb8..890e75d8 100644
--- a/synapse/replication/tcp/streams/events.py
+++ b/synapse/replication/tcp/streams/events.py
@@ -15,11 +15,12 @@
# limitations under the License.
import heapq
-from typing import Tuple, Type
+from collections import Iterable
+from typing import List, Tuple, Type
import attr
-from ._base import Stream
+from ._base import Stream, StreamUpdateResult, Token
"""Handling of the 'events' replication stream
@@ -116,28 +117,107 @@ class EventsStream(Stream):
def __init__(self, hs):
self._store = hs.get_datastore()
- self.current_token = self._store.get_current_events_token # type: ignore
+ super().__init__(
+ hs.get_instance_name(),
+ self._store.get_current_events_token,
+ self._update_function,
+ )
- super(EventsStream, self).__init__(hs)
+ async def _update_function(
+ self,
+ instance_name: str,
+ from_token: Token,
+ current_token: Token,
+ target_row_count: int,
+ ) -> StreamUpdateResult:
+
+ # the events stream merges together three separate sources:
+ # * new events
+ # * current_state changes
+ # * events which were previously outliers, but have now been de-outliered.
+ #
+ # The merge operation is complicated by the fact that we only have a single
+ # "stream token" which is supposed to indicate how far we have got through
+ # all three streams. It's therefore no good to return rows 1-1000 from the
+ # "new events" table if the state_deltas are limited to rows 1-100 by the
+ # target_row_count.
+ #
+ # In other words: we must pick a new upper limit, and must return *all* rows
+ # up to that point for each of the three sources.
+ #
+ # Start by trying to split the target_row_count up. We expect to have a
+ # negligible number of ex-outliers, and a rough approximation based on recent
+ # traffic on sw1v.org shows that there are approximately the same number of
+ # event rows between a given pair of stream ids as there are state
+ # updates, so let's split our target_row_count among those two types. The target
+ # is only an approximation - it doesn't matter if we end up going a bit over it.
+
+ target_row_count //= 2
+
+ # now we fetch up to that many rows from the events table
- async def update_function(self, from_token, current_token, limit=None):
event_rows = await self._store.get_all_new_forward_event_rows(
- from_token, current_token, limit
- )
- event_updates = (
- (row[0], EventsStreamEventRow.TypeId, row[1:]) for row in event_rows
+ from_token, current_token, target_row_count
+ ) # type: List[Tuple]
+
+ # we rely on get_all_new_forward_event_rows strictly honouring the limit, so
+ # that we know it is safe to just take upper_limit = event_rows[-1][0].
+ assert (
+ len(event_rows) <= target_row_count
+ ), "get_all_new_forward_event_rows did not honour row limit"
+
+ # if we hit the limit on event_updates, there's no point in going beyond the
+ # last stream_id in the batch for the other sources.
+
+ if len(event_rows) == target_row_count:
+ limited = True
+ upper_limit = event_rows[-1][0] # type: int
+ else:
+ limited = False
+ upper_limit = current_token
+
+ # next up is the state delta table.
+ (
+ state_rows,
+ upper_limit,
+ state_rows_limited,
+ ) = await self._store.get_all_updated_current_state_deltas(
+ from_token, upper_limit, target_row_count
)
- state_rows = await self._store.get_all_updated_current_state_deltas(
- from_token, current_token, limit
- )
- state_updates = (
- (row[0], EventsStreamCurrentStateRow.TypeId, row[1:]) for row in state_rows
- )
+ limited = limited or state_rows_limited
- all_updates = heapq.merge(event_updates, state_updates)
+ # finally, fetch the ex-outliers rows. We assume there are few enough of these
+ # not to bother with the limit.
- return all_updates
+ ex_outliers_rows = await self._store.get_ex_outlier_stream_rows(
+ from_token, upper_limit
+ ) # type: List[Tuple]
+
+ # we now need to turn the raw database rows returned into tuples suitable
+ # for the replication protocol (basically, we add an identifier to
+ # distinguish the row type). At the same time, we can limit the event_rows
+ # to the max stream_id from state_rows.
+
+ event_updates = (
+ (stream_id, (EventsStreamEventRow.TypeId, rest))
+ for (stream_id, *rest) in event_rows
+ if stream_id <= upper_limit
+ ) # type: Iterable[Tuple[int, Tuple]]
+
+ state_updates = (
+ (stream_id, (EventsStreamCurrentStateRow.TypeId, rest))
+ for (stream_id, *rest) in state_rows
+ ) # type: Iterable[Tuple[int, Tuple]]
+
+ ex_outliers_updates = (
+ (stream_id, (EventsStreamEventRow.TypeId, rest))
+ for (stream_id, *rest) in ex_outliers_rows
+ ) # type: Iterable[Tuple[int, Tuple]]
+
+ # we need to return a sorted list, so merge them together.
+ updates = list(heapq.merge(event_updates, state_updates, ex_outliers_updates))
+ return updates, upper_limit, limited
@classmethod
def parse_row(cls, row):