diff options
author | Andrej Shadura <andrewsh@debian.org> | 2022-07-20 14:17:32 +0200 |
---|---|---|
committer | Andrej Shadura <andrewsh@debian.org> | 2022-07-20 14:17:32 +0200 |
commit | 4df717ab4138a59602fbce16d855794d3f611e60 (patch) | |
tree | 1a3db57b438b236b1d4dcfdbf4b49d320d8e748e /synapse/storage/databases | |
parent | cdbe14e39a29d617fd8358639042fe57e08ac978 (diff) |
New upstream version 1.63.0
Diffstat (limited to 'synapse/storage/databases')
18 files changed, 941 insertions, 370 deletions
diff --git a/synapse/storage/databases/main/__init__.py b/synapse/storage/databases/main/__init__.py index 11d9d16c..a3d31d37 100644 --- a/synapse/storage/databases/main/__init__.py +++ b/synapse/storage/databases/main/__init__.py @@ -45,7 +45,6 @@ from .event_push_actions import EventPushActionsStore from .events_bg_updates import EventsBackgroundUpdatesStore from .events_forward_extremities import EventForwardExtremitiesStore from .filtering import FilteringStore -from .group_server import GroupServerStore from .keys import KeyStore from .lock import LockStore from .media_repository import MediaRepositoryStore @@ -88,7 +87,6 @@ class DataStore( RoomStore, RoomBatchStore, RegistrationStore, - StreamWorkerStore, ProfileStore, PresenceStore, TransactionWorkerStore, @@ -105,19 +103,20 @@ class DataStore( PusherStore, PushRuleStore, ApplicationServiceTransactionStore, + EventPushActionsStore, + ServerMetricsStore, ReceiptsStore, EndToEndKeyStore, EndToEndRoomKeyStore, SearchStore, TagsStore, AccountDataStore, - EventPushActionsStore, + StreamWorkerStore, OpenIdStore, ClientIpWorkerStore, DeviceStore, DeviceInboxStore, UserDirectoryStore, - GroupServerStore, UserErasureStore, MonthlyActiveUsersWorkerStore, StatsStore, @@ -126,7 +125,6 @@ class DataStore( UIAuthStore, EventForwardExtremitiesStore, CacheInvalidationWorkerStore, - ServerMetricsStore, LockStore, SessionStore, ): @@ -197,6 +195,7 @@ class DataStore( self._min_stream_order_on_start = self.get_room_min_stream_ordering() def get_device_stream_token(self) -> int: + # TODO: shouldn't this be moved to `DeviceWorkerStore`? return self._device_list_id_gen.get_current_token() async def get_users(self) -> List[JsonDict]: diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py index 599b4183..422e0e65 100644 --- a/synapse/storage/databases/main/deviceinbox.py +++ b/synapse/storage/databases/main/deviceinbox.py @@ -834,8 +834,6 @@ class DeviceInboxWorkerStore(SQLBaseStore): class DeviceInboxBackgroundUpdateStore(SQLBaseStore): DEVICE_INBOX_STREAM_ID = "device_inbox_stream_drop" - REMOVE_DELETED_DEVICES = "remove_deleted_devices_from_device_inbox" - REMOVE_HIDDEN_DEVICES = "remove_hidden_devices_from_device_inbox" REMOVE_DEAD_DEVICES_FROM_INBOX = "remove_dead_devices_from_device_inbox" def __init__( @@ -857,15 +855,6 @@ class DeviceInboxBackgroundUpdateStore(SQLBaseStore): self.DEVICE_INBOX_STREAM_ID, self._background_drop_index_device_inbox ) - # Used to be a background update that deletes all device_inboxes for deleted - # devices. - self.db_pool.updates.register_noop_background_update( - self.REMOVE_DELETED_DEVICES - ) - # Used to be a background update that deletes all device_inboxes for hidden - # devices. - self.db_pool.updates.register_noop_background_update(self.REMOVE_HIDDEN_DEVICES) - self.db_pool.updates.register_background_update_handler( self.REMOVE_DEAD_DEVICES_FROM_INBOX, self._remove_dead_devices_from_device_inbox, diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py index d900064c..adde5d09 100644 --- a/synapse/storage/databases/main/devices.py +++ b/synapse/storage/databases/main/devices.py @@ -28,6 +28,8 @@ from typing import ( cast, ) +from typing_extensions import Literal + from synapse.api.constants import EduTypes from synapse.api.errors import Codes, StoreError from synapse.logging.opentracing import ( @@ -44,6 +46,8 @@ from synapse.storage.database import ( LoggingTransaction, make_tuple_comparison_clause, ) +from synapse.storage.databases.main.end_to_end_keys import EndToEndKeyWorkerStore +from synapse.storage.types import Cursor from synapse.types import JsonDict, get_verify_key_from_cross_signing_key from synapse.util import json_decoder, json_encoder from synapse.util.caches.descriptors import cached, cachedList @@ -65,7 +69,7 @@ DROP_DEVICE_LIST_STREAMS_NON_UNIQUE_INDEXES = ( BG_UPDATE_REMOVE_DUP_OUTBOUND_POKES = "remove_dup_outbound_pokes" -class DeviceWorkerStore(SQLBaseStore): +class DeviceWorkerStore(EndToEndKeyWorkerStore): def __init__( self, database: DatabasePool, @@ -74,7 +78,9 @@ class DeviceWorkerStore(SQLBaseStore): ): super().__init__(database, db_conn, hs) - device_list_max = self._device_list_id_gen.get_current_token() + # Type-ignore: _device_list_id_gen is mixed in from either DataStore (as a + # StreamIdGenerator) or SlavedDataStore (as a SlavedIdTracker). + device_list_max = self._device_list_id_gen.get_current_token() # type: ignore[attr-defined] device_list_prefill, min_device_list_id = self.db_pool.get_cache_dict( db_conn, "device_lists_stream", @@ -339,8 +345,9 @@ class DeviceWorkerStore(SQLBaseStore): # following this stream later. last_processed_stream_id = from_stream_id - query_map = {} - cross_signing_keys_by_user = {} + # A map of (user ID, device ID) to (stream ID, context). + query_map: Dict[Tuple[str, str], Tuple[int, Optional[str]]] = {} + cross_signing_keys_by_user: Dict[str, Dict[str, object]] = {} for user_id, device_id, update_stream_id, update_context in updates: # Calculate the remaining length budget. # Note that, for now, each entry in `cross_signing_keys_by_user` @@ -596,7 +603,7 @@ class DeviceWorkerStore(SQLBaseStore): txn=txn, table="device_lists_outbound_last_success", key_names=("destination", "user_id"), - key_values=((destination, user_id) for user_id, _ in rows), + key_values=[(destination, user_id) for user_id, _ in rows], value_names=("stream_id",), value_values=((stream_id,) for _, stream_id in rows), ) @@ -621,7 +628,9 @@ class DeviceWorkerStore(SQLBaseStore): The new stream ID. """ - async with self._device_list_id_gen.get_next() as stream_id: + # TODO: this looks like it's _writing_. Should this be on DeviceStore rather + # than DeviceWorkerStore? + async with self._device_list_id_gen.get_next() as stream_id: # type: ignore[attr-defined] await self.db_pool.runInteraction( "add_user_sig_change_to_streams", self._add_user_signature_change_txn, @@ -686,7 +695,7 @@ class DeviceWorkerStore(SQLBaseStore): } - users_needing_resync user_ids_not_in_cache = user_ids - user_ids_in_cache - results = {} + results: Dict[str, Dict[str, JsonDict]] = {} for user_id, device_id in query_list: if user_id not in user_ids_in_cache: continue @@ -727,7 +736,7 @@ class DeviceWorkerStore(SQLBaseStore): def get_cached_device_list_changes( self, from_key: int, - ) -> Optional[Set[str]]: + ) -> Optional[List[str]]: """Get set of users whose devices have changed since `from_key`, or None if that information is not in our cache. """ @@ -737,7 +746,7 @@ class DeviceWorkerStore(SQLBaseStore): async def get_users_whose_devices_changed( self, from_key: int, - user_ids: Optional[Iterable[str]] = None, + user_ids: Optional[Collection[str]] = None, to_key: Optional[int] = None, ) -> Set[str]: """Get set of users whose devices have changed since `from_key` that @@ -757,6 +766,7 @@ class DeviceWorkerStore(SQLBaseStore): """ # Get set of users who *may* have changed. Users not in the returned # list have definitely not changed. + user_ids_to_check: Optional[Collection[str]] if user_ids is None: # Get set of all users that have had device list changes since 'from_key' user_ids_to_check = self._device_list_stream_cache.get_all_entities_changed( @@ -772,7 +782,7 @@ class DeviceWorkerStore(SQLBaseStore): return set() def _get_users_whose_devices_changed_txn(txn: LoggingTransaction) -> Set[str]: - changes = set() + changes: Set[str] = set() stream_id_where_clause = "stream_id > ?" sql_args = [from_key] @@ -788,6 +798,9 @@ class DeviceWorkerStore(SQLBaseStore): """ # Query device changes with a batch of users at a time + # Assertion for mypy's benefit; see also + # https://mypy.readthedocs.io/en/stable/common_issues.html#narrowing-and-inner-functions + assert user_ids_to_check is not None for chunk in batch_iter(user_ids_to_check, 100): clause, args = make_in_list_sql_clause( txn.database_engine, "user_id", chunk @@ -854,7 +867,9 @@ class DeviceWorkerStore(SQLBaseStore): if last_id == current_id: return [], current_id, False - def _get_all_device_list_changes_for_remotes(txn): + def _get_all_device_list_changes_for_remotes( + txn: Cursor, + ) -> Tuple[List[Tuple[int, tuple]], int, bool]: # This query Does The Right Thing where it'll correctly apply the # bounds to the inner queries. sql = """ @@ -913,7 +928,7 @@ class DeviceWorkerStore(SQLBaseStore): desc="get_device_list_last_stream_id_for_remotes", ) - results = {user_id: None for user_id in user_ids} + results: Dict[str, Optional[str]] = {user_id: None for user_id in user_ids} results.update({row["user_id"]: row["stream_id"] for row in rows}) return results @@ -1193,6 +1208,65 @@ class DeviceWorkerStore(SQLBaseStore): return devices + @cached() + async def _get_min_device_lists_changes_in_room(self) -> int: + """Returns the minimum stream ID that we have entries for + `device_lists_changes_in_room` + """ + + return await self.db_pool.simple_select_one_onecol( + table="device_lists_changes_in_room", + keyvalues={}, + retcol="COALESCE(MIN(stream_id), 0)", + desc="get_min_device_lists_changes_in_room", + ) + + async def get_device_list_changes_in_rooms( + self, room_ids: Collection[str], from_id: int + ) -> Optional[Set[str]]: + """Return the set of users whose devices have changed in the given rooms + since the given stream ID. + + Returns None if the given stream ID is too old. + """ + + if not room_ids: + return set() + + min_stream_id = await self._get_min_device_lists_changes_in_room() + + if min_stream_id > from_id: + return None + + sql = """ + SELECT DISTINCT user_id FROM device_lists_changes_in_room + WHERE {clause} AND stream_id >= ? + """ + + def _get_device_list_changes_in_rooms_txn( + txn: LoggingTransaction, + clause: str, + args: List[Any], + ) -> Set[str]: + txn.execute(sql.format(clause=clause), args) + return {user_id for user_id, in txn} + + changes = set() + for chunk in batch_iter(room_ids, 1000): + clause, args = make_in_list_sql_clause( + self.database_engine, "room_id", chunk + ) + args.append(from_id) + + changes |= await self.db_pool.runInteraction( + "get_device_list_changes_in_rooms", + _get_device_list_changes_in_rooms_txn, + clause, + args, + ) + + return changes + class DeviceBackgroundUpdateStore(SQLBaseStore): def __init__( @@ -1240,15 +1314,6 @@ class DeviceBackgroundUpdateStore(SQLBaseStore): self._remove_duplicate_outbound_pokes, ) - # a pair of background updates that were added during the 1.14 release cycle, - # but replaced with 58/06dlols_unique_idx.py - self.db_pool.updates.register_noop_background_update( - "device_lists_outbound_last_success_unique_idx", - ) - self.db_pool.updates.register_noop_background_update( - "drop_device_lists_outbound_last_success_non_unique_idx", - ) - async def _drop_device_list_streams_non_unique_indexes( self, progress: JsonDict, batch_size: int ) -> int: @@ -1346,9 +1411,9 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): # Map of (user_id, device_id) -> bool. If there is an entry that implies # the device exists. - self.device_id_exists_cache = LruCache( - cache_name="device_id_exists", max_size=10000 - ) + self.device_id_exists_cache: LruCache[ + Tuple[str, str], Literal[True] + ] = LruCache(cache_name="device_id_exists", max_size=10000) async def store_device( self, @@ -1433,16 +1498,6 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): ) raise StoreError(500, "Problem storing device.") - async def delete_device(self, user_id: str, device_id: str) -> None: - """Delete a device and its device_inbox. - - Args: - user_id: The ID of the user which owns the device - device_id: The ID of the device to delete - """ - - await self.delete_devices(user_id, [device_id]) - async def delete_devices(self, user_id: str, device_ids: List[str]) -> None: """Deletes several devices. @@ -1670,7 +1725,7 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): context, ) - async with self._device_list_id_gen.get_next_mult( + async with self._device_list_id_gen.get_next_mult( # type: ignore[attr-defined] len(device_ids) ) as stream_ids: await self.db_pool.runInteraction( @@ -1723,7 +1778,7 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): device_ids: Iterable[str], hosts: Collection[str], stream_ids: List[int], - context: Dict[str, str], + context: Optional[Dict[str, str]], ) -> None: for host in hosts: txn.call_after( @@ -1894,7 +1949,7 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): [], ) - async with self._device_list_id_gen.get_next_mult(len(hosts)) as stream_ids: + async with self._device_list_id_gen.get_next_mult(len(hosts)) as stream_ids: # type: ignore[attr-defined] return await self.db_pool.runInteraction( "add_device_list_outbound_pokes", add_device_list_outbound_pokes_txn, diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py index b0199793..dd262703 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py @@ -17,6 +17,7 @@ from typing import TYPE_CHECKING, Dict, List, Optional, Tuple, Union, cast import attr +from synapse.api.constants import ReceiptTypes from synapse.metrics.background_process_metrics import wrap_as_background_process from synapse.storage._base import SQLBaseStore, db_to_json from synapse.storage.database import ( @@ -24,6 +25,8 @@ from synapse.storage.database import ( LoggingDatabaseConnection, LoggingTransaction, ) +from synapse.storage.databases.main.receipts import ReceiptsWorkerStore +from synapse.storage.databases.main.stream import StreamWorkerStore from synapse.util import json_encoder from synapse.util.caches.descriptors import cached @@ -79,15 +82,15 @@ class UserPushAction(EmailPushAction): profile_tag: str -@attr.s(slots=True, frozen=True, auto_attribs=True) +@attr.s(slots=True, auto_attribs=True) class NotifCounts: """ The per-user, per-room count of notifications. Used by sync and push. """ - notify_count: int - unread_count: int - highlight_count: int + notify_count: int = 0 + unread_count: int = 0 + highlight_count: int = 0 def _serialize_action(actions: List[Union[dict, str]], is_highlight: bool) -> str: @@ -119,7 +122,7 @@ def _deserialize_action(actions: str, is_highlight: bool) -> List[Union[dict, st return DEFAULT_NOTIF_ACTION -class EventPushActionsWorkerStore(SQLBaseStore): +class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBaseStore): def __init__( self, database: DatabasePool, @@ -140,20 +143,27 @@ class EventPushActionsWorkerStore(SQLBaseStore): self._find_stream_orderings_for_times, 10 * 60 * 1000 ) - self._rotate_delay = 3 self._rotate_count = 10000 self._doing_notif_rotation = False if hs.config.worker.run_background_tasks: self._rotate_notif_loop = self._clock.looping_call( - self._rotate_notifs, 30 * 60 * 1000 + self._rotate_notifs, 30 * 1000 ) - @cached(num_args=3, tree=True, max_entries=5000) + self.db_pool.updates.register_background_index_update( + "event_push_summary_unique_index", + index_name="event_push_summary_unique_index", + table="event_push_summary", + columns=["user_id", "room_id"], + unique=True, + replaces_index="event_push_summary_user_rm", + ) + + @cached(tree=True, max_entries=5000) async def get_unread_event_push_actions_by_room_for_user( self, room_id: str, user_id: str, - last_read_event_id: Optional[str], ) -> NotifCounts: """Get the notification count, the highlight count and the unread message count for a given user in a given room after the given read receipt. @@ -165,8 +175,6 @@ class EventPushActionsWorkerStore(SQLBaseStore): Args: room_id: The room to retrieve the counts in. user_id: The user to retrieve the counts for. - last_read_event_id: The event associated with the latest read receipt for - this user in this room. None if no receipt for this user in this room. Returns A dict containing the counts mentioned earlier in this docstring, @@ -178,7 +186,6 @@ class EventPushActionsWorkerStore(SQLBaseStore): self._get_unread_counts_by_receipt_txn, room_id, user_id, - last_read_event_id, ) def _get_unread_counts_by_receipt_txn( @@ -186,16 +193,17 @@ class EventPushActionsWorkerStore(SQLBaseStore): txn: LoggingTransaction, room_id: str, user_id: str, - last_read_event_id: Optional[str], ) -> NotifCounts: - stream_ordering = None + result = self.get_last_receipt_for_user_txn( + txn, + user_id, + room_id, + receipt_types=(ReceiptTypes.READ, ReceiptTypes.READ_PRIVATE), + ) - if last_read_event_id is not None: - stream_ordering = self.get_stream_id_for_event_txn( # type: ignore[attr-defined] - txn, - last_read_event_id, - allow_none=True, - ) + stream_ordering = None + if result: + _, stream_ordering = result if stream_ordering is None: # Either last_read_event_id is None, or it's an event we don't have (e.g. @@ -209,7 +217,7 @@ class EventPushActionsWorkerStore(SQLBaseStore): retcol="event_id", ) - stream_ordering = self.get_stream_id_for_event_txn(txn, event_id) # type: ignore[attr-defined] + stream_ordering = self.get_stream_id_for_event_txn(txn, event_id) return self._get_unread_counts_by_pos_txn( txn, room_id, user_id, stream_ordering @@ -218,49 +226,121 @@ class EventPushActionsWorkerStore(SQLBaseStore): def _get_unread_counts_by_pos_txn( self, txn: LoggingTransaction, room_id: str, user_id: str, stream_ordering: int ) -> NotifCounts: - sql = ( - "SELECT" - " COUNT(CASE WHEN notif = 1 THEN 1 END)," - " COUNT(CASE WHEN highlight = 1 THEN 1 END)," - " COUNT(CASE WHEN unread = 1 THEN 1 END)" - " FROM event_push_actions ea" - " WHERE user_id = ?" - " AND room_id = ?" - " AND stream_ordering > ?" - ) - - txn.execute(sql, (user_id, room_id, stream_ordering)) - row = txn.fetchone() + """Get the number of unread messages for a user/room that have happened + since the given stream ordering. + """ - (notif_count, highlight_count, unread_count) = (0, 0, 0) - - if row: - (notif_count, highlight_count, unread_count) = row + counts = NotifCounts() + # First we pull the counts from the summary table. + # + # We check that `last_receipt_stream_ordering` matches the stream + # ordering given. If it doesn't match then a new read receipt has arrived and + # we haven't yet updated the counts in `event_push_summary` to reflect + # that; in that case we simply ignore `event_push_summary` counts + # and do a manual count of all of the rows in the `event_push_actions` table + # for this user/room. + # + # If `last_receipt_stream_ordering` is null then that means it's up to + # date (as the row was written by an older version of Synapse that + # updated `event_push_summary` synchronously when persisting a new read + # receipt). txn.execute( """ - SELECT notif_count, unread_count FROM event_push_summary - WHERE room_id = ? AND user_id = ? AND stream_ordering > ? + SELECT stream_ordering, notif_count, COALESCE(unread_count, 0) + FROM event_push_summary + WHERE room_id = ? AND user_id = ? + AND ( + (last_receipt_stream_ordering IS NULL AND stream_ordering > ?) + OR last_receipt_stream_ordering = ? + ) """, - (room_id, user_id, stream_ordering), + (room_id, user_id, stream_ordering, stream_ordering), ) row = txn.fetchone() + summary_stream_ordering = 0 if row: - notif_count += row[0] - - if row[1] is not None: - # The unread_count column of event_push_summary is NULLable, so we need - # to make sure we don't try increasing the unread counts if it's NULL - # for this row. - unread_count += row[1] - - return NotifCounts( - notify_count=notif_count, - unread_count=unread_count, - highlight_count=highlight_count, + summary_stream_ordering = row[0] + counts.notify_count += row[1] + counts.unread_count += row[2] + + # Next we need to count highlights, which aren't summarized + sql = """ + SELECT COUNT(*) FROM event_push_actions + WHERE user_id = ? + AND room_id = ? + AND stream_ordering > ? + AND highlight = 1 + """ + txn.execute(sql, (user_id, room_id, stream_ordering)) + row = txn.fetchone() + if row: + counts.highlight_count += row[0] + + # Finally we need to count push actions that aren't included in the + # summary returned above, e.g. recent events that haven't been + # summarized yet, or the summary is empty due to a recent read receipt. + stream_ordering = max(stream_ordering, summary_stream_ordering) + notify_count, unread_count = self._get_notif_unread_count_for_user_room( + txn, room_id, user_id, stream_ordering ) + counts.notify_count += notify_count + counts.unread_count += unread_count + + return counts + + def _get_notif_unread_count_for_user_room( + self, + txn: LoggingTransaction, + room_id: str, + user_id: str, + stream_ordering: int, + max_stream_ordering: Optional[int] = None, + ) -> Tuple[int, int]: + """Returns the notify and unread counts from `event_push_actions` for + the given user/room in the given range. + + Does not consult `event_push_summary` table, which may include push + actions that have been deleted from `event_push_actions` table. + """ + + # If there have been no events in the room since the stream ordering, + # there can't be any push actions either. + if not self._events_stream_cache.has_entity_changed(room_id, stream_ordering): + return 0, 0 + + clause = "" + args = [user_id, room_id, stream_ordering] + if max_stream_ordering is not None: + clause = "AND ea.stream_ordering <= ?" + args.append(max_stream_ordering) + + # If the max stream ordering is less than the min stream ordering, + # then obviously there are zero push actions in that range. + if max_stream_ordering <= stream_ordering: + return 0, 0 + + sql = f""" + SELECT + COUNT(CASE WHEN notif = 1 THEN 1 END), + COUNT(CASE WHEN unread = 1 THEN 1 END) + FROM event_push_actions ea + WHERE user_id = ? + AND room_id = ? + AND ea.stream_ordering > ? + {clause} + """ + + txn.execute(sql, args) + row = txn.fetchone() + + if row: + return cast(Tuple[int, int], row) + + return 0, 0 + async def get_push_action_users_in_range( self, min_stream_ordering: int, max_stream_ordering: int ) -> List[str]: @@ -745,6 +825,19 @@ class EventPushActionsWorkerStore(SQLBaseStore): self._doing_notif_rotation = True try: + # First we recalculate push summaries and delete stale push actions + # for rooms/users with new receipts. + while True: + logger.debug("Handling new receipts") + + caught_up = await self.db_pool.runInteraction( + "_handle_new_receipts_for_notifs_txn", + self._handle_new_receipts_for_notifs_txn, + ) + if caught_up: + break + + # Then we update the event push summaries for any new events while True: logger.info("Rotating notifications") @@ -753,10 +846,115 @@ class EventPushActionsWorkerStore(SQLBaseStore): ) if caught_up: break - await self.hs.get_clock().sleep(self._rotate_delay) + + # Finally we clear out old event push actions. + await self._remove_old_push_actions_that_have_rotated() finally: self._doing_notif_rotation = False + def _handle_new_receipts_for_notifs_txn(self, txn: LoggingTransaction) -> bool: + """Check for new read receipts and delete from event push actions. + + Any push actions which predate the user's most recent read receipt are + now redundant, so we can remove them from `event_push_actions` and + update `event_push_summary`. + """ + + limit = 100 + + min_receipts_stream_id = self.db_pool.simple_select_one_onecol_txn( + txn, + table="event_push_summary_last_receipt_stream_id", + keyvalues={}, + retcol="stream_id", + ) + + max_receipts_stream_id = self._receipts_id_gen.get_current_token() + + sql = """ + SELECT r.stream_id, r.room_id, r.user_id, e.stream_ordering + FROM receipts_linearized AS r + INNER JOIN events AS e USING (event_id) + WHERE ? < r.stream_id AND r.stream_id <= ? AND user_id LIKE ? + ORDER BY r.stream_id ASC + LIMIT ? + """ + + # We only want local users, so we add a dodgy filter to the above query + # and recheck it below. + user_filter = "%:" + self.hs.hostname + + txn.execute( + sql, + ( + min_receipts_stream_id, + max_receipts_stream_id, + user_filter, + limit, + ), + ) + rows = txn.fetchall() + + old_rotate_stream_ordering = self.db_pool.simple_select_one_onecol_txn( + txn, + table="event_push_summary_stream_ordering", + keyvalues={}, + retcol="stream_ordering", + ) + + # For each new read receipt we delete push actions from before it and + # recalculate the summary. + for _, room_id, user_id, stream_ordering in rows: + # Only handle our own read receipts. + if not self.hs.is_mine_id(user_id): + continue + + txn.execute( + """ + DELETE FROM event_push_actions + WHERE room_id = ? + AND user_id = ? + AND stream_ordering <= ? + AND highlight = 0 + """, + (room_id, user_id, stream_ordering), + ) + + notif_count, unread_count = self._get_notif_unread_count_for_user_room( + txn, room_id, user_id, stream_ordering, old_rotate_stream_ordering + ) + + self.db_pool.simple_upsert_txn( + txn, + table="event_push_summary", + keyvalues={"room_id": room_id, "user_id": user_id}, + values={ + "notif_count": notif_count, + "unread_count": unread_count, + "stream_ordering": old_rotate_stream_ordering, + "last_receipt_stream_ordering": stream_ordering, + }, + ) + + # We always update `event_push_summary_last_receipt_stream_id` to + # ensure that we don't rescan the same receipts for remote users. + + upper_limit = max_receipts_stream_id + if len(rows) >= limit: + # If we pulled out a limited number of rows we only update the + # position to the last receipt we processed, so we continue + # processing the rest next iteration. + upper_limit = rows[-1][0] + + self.db_pool.simple_update_txn( + txn, + table="event_push_summary_last_receipt_stream_id", + keyvalues={}, + updatevalues={"stream_id": upper_limit}, + ) + + return len(rows) < limit + def _rotate_notifs_txn(self, txn: LoggingTransaction) -> bool: """Archives older notifications into event_push_summary. Returns whether the archiving process has caught up or not. @@ -782,20 +980,21 @@ class EventPushActionsWorkerStore(SQLBaseStore): stream_row = txn.fetchone() if stream_row: (offset_stream_ordering,) = stream_row - assert self.stream_ordering_day_ago is not None + + # We need to bound by the current token to ensure that we handle + # out-of-order writes correctly. rotate_to_stream_ordering = min( - self.stream_ordering_day_ago, offset_stream_ordering + offset_stream_ordering, self._stream_id_gen.get_current_token() ) - caught_up = offset_stream_ordering >= self.stream_ordering_day_ago + caught_up = False else: - rotate_to_stream_ordering = self.stream_ordering_day_ago + rotate_to_stream_ordering = self._stream_id_gen.get_current_token() caught_up = True logger.info("Rotating notifications up to: %s", rotate_to_stream_ordering) self._rotate_notifs_before_txn(txn, rotate_to_stream_ordering) - # We have caught up iff we were limited by `stream_ordering_day_ago` return caught_up def _rotate_notifs_before_txn( @@ -812,14 +1011,17 @@ class EventPushActionsWorkerStore(SQLBaseStore): sql = """ SELECT user_id, room_id, coalesce(old.%s, 0) + upd.cnt, - upd.stream_ordering, - old.user_id + upd.stream_ordering FROM ( SELECT user_id, room_id, count(*) as cnt, - max(stream_ordering) as stream_ordering - FROM event_push_actions - WHERE ? <= stream_ordering AND stream_ordering < ? - AND highlight = 0 + max(ea.stream_ordering) as stream_ordering + FROM event_push_actions AS ea + LEFT JOIN event_push_summary AS old USING (user_id, room_id) + WHERE ? < ea.stream_ordering AND ea.stream_ordering <= ? + AND ( + old.last_receipt_stream_ordering IS NULL + OR old.last_receipt_stream_ordering < ea.stream_ordering + ) AND %s = 1 GROUP BY user_id, room_id ) AS upd @@ -842,7 +1044,6 @@ class EventPushActionsWorkerStore(SQLBaseStore): summaries[(row[0], row[1])] = _EventPushSummary( unread_count=row[2], stream_ordering=row[3], - old_user_id=row[4], notif_count=0, ) @@ -863,115 +1064,95 @@ class EventPushActionsWorkerStore(SQLBaseStore): summaries[(row[0], row[1])] = _EventPushSummary( unread_count=0, stream_ordering=row[3], - old_user_id=row[4], notif_count=row[2], ) logger.info("Rotating notifications, handling %d rows", len(summaries)) - # If the `old.user_id` above is NULL then we know there isn't already an - # entry in the table, so we simply insert it. Otherwise we update the - # existing table. - self.db_pool.simple_insert_many_txn( + self.db_pool.simple_upsert_many_txn( txn, table="event_push_summary", - keys=( - "user_id", - "room_id", - "notif_count", - "unread_count", - "stream_ordering", - ), - values=[ + key_names=("user_id", "room_id"), + key_values=[(user_id, room_id) for user_id, room_id in summaries], + value_names=("notif_count", "unread_count", "stream_ordering"), + value_values=[ ( - user_id, - room_id, summary.notif_count, summary.unread_count, summary.stream_ordering, ) - for ((user_id, room_id), summary) in summaries.items() - if summary.old_user_id is None + for summary in summaries.values() ], ) - txn.execute_batch( - """ - UPDATE event_push_summary - SET notif_count = ?, unread_count = ?, stream_ordering = ? - WHERE user_id = ? AND room_id = ? - """, - ( - ( - summary.notif_count, - summary.unread_count, - summary.stream_ordering, - user_id, - room_id, - ) - for ((user_id, room_id), summary) in summaries.items() - if summary.old_user_id is not None - ), - ) - - txn.execute( - "DELETE FROM event_push_actions" - " WHERE ? <= stream_ordering AND stream_ordering < ? AND highlight = 0", - (old_rotate_stream_ordering, rotate_to_stream_ordering), - ) - - logger.info("Rotating notifications, deleted %s push actions", txn.rowcount) - txn.execute( "UPDATE event_push_summary_stream_ordering SET stream_ordering = ?", (rotate_to_stream_ordering,), ) - def _remove_old_push_actions_before_txn( - self, txn: LoggingTransaction, room_id: str, user_id: str, stream_ordering: int + async def _remove_old_push_actions_that_have_rotated( + self, ) -> None: - """ - Purges old push actions for a user and room before a given - stream_ordering. + """Clear out old push actions that have been summarized.""" - We however keep a months worth of highlighted notifications, so that - users can still get a list of recent highlights. - - Args: - txn: The transaction - room_id: Room ID to delete from - user_id: user ID to delete for - stream_ordering: The lowest stream ordering which will - not be deleted. - """ - txn.call_after( - self.get_unread_event_push_actions_by_room_for_user.invalidate, - (room_id, user_id), + # We want to clear out anything that older than a day that *has* already + # been rotated. + rotated_upto_stream_ordering = await self.db_pool.simple_select_one_onecol( + table="event_push_summary_stream_ordering", + keyvalues={}, + retcol="stream_ordering", ) - # We need to join on the events table to get the received_ts for - # event_push_actions and sqlite won't let us use a join in a delete so - # we can't just delete where received_ts < x. Furthermore we can - # only identify event_push_actions by a tuple of room_id, event_id - # we we can't use a subquery. - # Instead, we look up the stream ordering for the last event in that - # room received before the threshold time and delete event_push_actions - # in the room with a stream_odering before that. - txn.execute( - "DELETE FROM event_push_actions " - " WHERE user_id = ? AND room_id = ? AND " - " stream_ordering <= ?" - " AND ((stream_ordering < ? AND highlight = 1) or highlight = 0)", - (user_id, room_id, stream_ordering, self.stream_ordering_month_ago), + max_stream_ordering_to_delete = min( + rotated_upto_stream_ordering, self.stream_ordering_day_ago ) - txn.execute( - """ - DELETE FROM event_push_summary - WHERE room_id = ? AND user_id = ? AND stream_ordering <= ? - """, - (room_id, user_id, stream_ordering), - ) + def remove_old_push_actions_that_have_rotated_txn( + txn: LoggingTransaction, + ) -> bool: + # We don't want to clear out too much at a time, so we bound our + # deletes. + batch_size = self._rotate_count + + txn.execute( + """ + SELECT stream_ordering FROM event_push_actions + WHERE stream_ordering <= ? AND highlight = 0 + ORDER BY stream_ordering ASC LIMIT 1 OFFSET ? + """, + ( + max_stream_ordering_to_delete, + batch_size, + ), + ) + stream_row = txn.fetchone() + + if stream_row: + (stream_ordering,) = stream_row + else: + stream_ordering = max_stream_ordering_to_delete + + # We need to use a inclusive bound here to handle the case where a + # single stream ordering has more than `batch_size` rows. + txn.execute( + """ + DELETE FROM event_push_actions + WHERE stream_ordering <= ? AND highlight = 0 + """, + (stream_ordering,), + ) + + logger.info("Rotating notifications, deleted %s push actions", txn.rowcount) + + return txn.rowcount < batch_size + + while True: + done = await self.db_pool.runInteraction( + "_remove_old_push_actions_that_have_rotated", + remove_old_push_actions_that_have_rotated_txn, + ) + if done: + break class EventPushActionsStore(EventPushActionsWorkerStore): @@ -1000,6 +1181,16 @@ class EventPushActionsStore(EventPushActionsWorkerStore): where_clause="highlight=1", ) + # Add index to make deleting old push actions faster. + self.db_pool.updates.register_background_index_update( + "event_push_actions_stream_highlight_index", + index_name="event_push_actions_stream_highlight_index", + table="event_push_actions", + columns=["highlight", "stream_ordering"], + where_clause="highlight=0", + psql_only=True, + ) + async def get_push_actions_for_user( self, user_id: str, @@ -1075,5 +1266,4 @@ class _EventPushSummary: unread_count: int stream_ordering: int - old_user_id: str notif_count: int diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index 17e35cf6..eb4efbb9 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -16,6 +16,7 @@ import itertools import logging from collections import OrderedDict +from http import HTTPStatus from typing import ( TYPE_CHECKING, Any, @@ -35,6 +36,7 @@ from prometheus_client import Counter import synapse.metrics from synapse.api.constants import EventContentFields, EventTypes, RelationTypes +from synapse.api.errors import Codes, SynapseError from synapse.api.room_versions import RoomVersions from synapse.events import EventBase, relation_from_event from synapse.events.snapshot import EventContext @@ -46,7 +48,7 @@ from synapse.storage.database import ( ) from synapse.storage.databases.main.events_worker import EventCacheEntry from synapse.storage.databases.main.search import SearchEntry -from synapse.storage.engines.postgres import PostgresEngine +from synapse.storage.engines import PostgresEngine from synapse.storage.util.id_generators import AbstractStreamIdGenerator from synapse.storage.util.sequence import SequenceGenerator from synapse.types import JsonDict, StateMap, get_domain_from_id @@ -69,6 +71,24 @@ event_counter = Counter( ) +class PartialStateConflictError(SynapseError): + """An internal error raised when attempting to persist an event with partial state + after the room containing the event has been un-partial stated. + + This error should be handled by recomputing the event context and trying again. + + This error has an HTTP status code so that it can be transported over replication. + It should not be exposed to clients. + """ + + def __init__(self) -> None: + super().__init__( + HTTPStatus.CONFLICT, + msg="Cannot persist partial state event in un-partial stated room", + errcode=Codes.UNKNOWN, + ) + + @attr.s(slots=True, auto_attribs=True) class DeltaState: """Deltas to use to update the `current_state_events` table. @@ -154,6 +174,10 @@ class PersistEventsStore: Returns: Resolves when the events have been persisted + + Raises: + PartialStateConflictError: if attempting to persist a partial state event in + a room that has been un-partial stated. """ # We want to calculate the stream orderings as late as possible, as @@ -354,6 +378,9 @@ class PersistEventsStore: For each room, a list of the event ids which are the forward extremities. + Raises: + PartialStateConflictError: if attempting to persist a partial state event in + a room that has been un-partial stated. """ state_delta_for_room = state_delta_for_room or {} new_forward_extremities = new_forward_extremities or {} @@ -980,16 +1007,16 @@ class PersistEventsStore: self, room_id: str, state_delta: DeltaState, - stream_id: int, ) -> None: """Update the current state stored in the datatabase for the given room""" - await self.db_pool.runInteraction( - "update_current_state", - self._update_current_state_txn, - state_delta_by_room={room_id: state_delta}, - stream_id=stream_id, - ) + async with self._stream_id_gen.get_next() as stream_ordering: + await self.db_pool.runInteraction( + "update_current_state", + self._update_current_state_txn, + state_delta_by_room={room_id: state_delta}, + stream_id=stream_ordering, + ) def _update_current_state_txn( self, @@ -1304,6 +1331,10 @@ class PersistEventsStore: Returns: new list, without events which are already in the events table. + + Raises: + PartialStateConflictError: if attempting to persist a partial state event in + a room that has been un-partial stated. """ txn.execute( "SELECT event_id, outlier FROM events WHERE event_id in (%s)" @@ -1766,6 +1797,18 @@ class PersistEventsStore: self.store.get_invited_rooms_for_local_user.invalidate, (event.state_key,), ) + txn.call_after( + self.store.get_local_users_in_room.invalidate, + (event.room_id,), + ) + txn.call_after( + self.store.get_number_joined_users_in_room.invalidate, + (event.room_id,), + ) + txn.call_after( + self.store.get_user_in_room_with_profile.invalidate, + (event.room_id, event.state_key), + ) # The `_get_membership_from_event_id` is immutable, except for the # case where we look up an event *before* persisting it. @@ -2215,6 +2258,11 @@ class PersistEventsStore: txn: LoggingTransaction, events_and_contexts: Collection[Tuple[EventBase, EventContext]], ) -> None: + """ + Raises: + PartialStateConflictError: if attempting to persist a partial state event in + a room that has been un-partial stated. + """ state_groups = {} for event, context in events_and_contexts: if event.internal_metadata.is_outlier(): @@ -2239,19 +2287,37 @@ class PersistEventsStore: # if we have partial state for these events, record the fact. (This happens # here rather than in _store_event_txn because it also needs to happen when # we de-outlier an event.) - self.db_pool.simple_insert_many_txn( - txn, - table="partial_state_events", - keys=("room_id", "event_id"), - values=[ - ( - event.room_id, - event.event_id, - ) - for event, ctx in events_and_contexts - if ctx.partial_state - ], - ) + try: + self.db_pool.simple_insert_many_txn( + txn, + table="partial_state_events", + keys=("room_id", "event_id"), + values=[ + ( + event.room_id, + event.event_id, + ) + for event, ctx in events_and_contexts + if ctx.partial_state + ], + ) + except self.db_pool.engine.module.IntegrityError: + logger.info( + "Cannot persist events %s in rooms %s: room has been un-partial stated", + [ + event.event_id + for event, ctx in events_and_contexts + if ctx.partial_state + ], + list( + { + event.room_id + for event, ctx in events_and_contexts + if ctx.partial_state + } + ), + ) + raise PartialStateConflictError() self.db_pool.simple_upsert_many_txn( txn, @@ -2296,11 +2362,9 @@ class PersistEventsStore: self.db_pool.simple_insert_many_txn( txn, table="event_edges", - keys=("event_id", "prev_event_id", "room_id", "is_state"), + keys=("event_id", "prev_event_id"), values=[ - (ev.event_id, e_id, ev.room_id, False) - for ev in events - for e_id in ev.prev_event_ids() + (ev.event_id, e_id) for ev in events for e_id in ev.prev_event_ids() ], ) diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py index d5f00596..eeca85fc 100644 --- a/synapse/storage/databases/main/events_bg_updates.py +++ b/synapse/storage/databases/main/events_bg_updates.py @@ -1,4 +1,4 @@ -# Copyright 2019-2021 The Matrix.org Foundation C.I.C. +# Copyright 2019-2022 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -64,6 +64,9 @@ class _BackgroundUpdates: INDEX_STREAM_ORDERING2_TS = "index_stream_ordering2_ts" REPLACE_STREAM_ORDERING_COLUMN = "replace_stream_ordering_column" + EVENT_EDGES_DROP_INVALID_ROWS = "event_edges_drop_invalid_rows" + EVENT_EDGES_REPLACE_INDEX = "event_edges_replace_index" + @attr.s(slots=True, frozen=True, auto_attribs=True) class _CalculateChainCover: @@ -177,11 +180,6 @@ class EventsBackgroundUpdatesStore(SQLBaseStore): self._purged_chain_cover_index, ) - # The event_thread_relation background update was replaced with the - # event_arbitrary_relations one, which handles any relation to avoid - # needed to potentially crawl the entire events table in the future. - self.db_pool.updates.register_noop_background_update("event_thread_relation") - self.db_pool.updates.register_background_update_handler( "event_arbitrary_relations", self._event_arbitrary_relations, @@ -240,6 +238,21 @@ class EventsBackgroundUpdatesStore(SQLBaseStore): ################################################################################ + self.db_pool.updates.register_background_update_handler( + _BackgroundUpdates.EVENT_EDGES_DROP_INVALID_ROWS, + self._background_drop_invalid_event_edges_rows, + ) + + self.db_pool.updates.register_background_index_update( + _BackgroundUpdates.EVENT_EDGES_REPLACE_INDEX, + index_name="event_edges_event_id_prev_event_id_idx", + table="event_edges", + columns=["event_id", "prev_event_id"], + unique=True, + # the old index which just covered event_id is now redundant. + replaces_index="ev_edges_id", + ) + async def _background_reindex_fields_sender( self, progress: JsonDict, batch_size: int ) -> int: @@ -1290,3 +1303,99 @@ class EventsBackgroundUpdatesStore(SQLBaseStore): ) return 0 + + async def _background_drop_invalid_event_edges_rows( + self, progress: JsonDict, batch_size: int + ) -> int: + """Drop invalid rows from event_edges + + This only runs for postgres. For SQLite, it all happens synchronously. + + Firstly, drop any rows with is_state=True. These may have been added a long time + ago, but they are no longer used. + + We also drop rows that do not correspond to entries in `events`, and add a + foreign key. + """ + + last_event_id = progress.get("last_event_id", "") + + def drop_invalid_event_edges_txn(txn: LoggingTransaction) -> bool: + """Returns True if we're done.""" + + # first we need to find an endpoint. + txn.execute( + """ + SELECT event_id FROM event_edges + WHERE event_id > ? + ORDER BY event_id + LIMIT 1 OFFSET ? + """, + (last_event_id, batch_size), + ) + + endpoint = None + row = txn.fetchone() + + if row: + endpoint = row[0] + + where_clause = "ee.event_id > ?" + args = [last_event_id] + if endpoint: + where_clause += " AND ee.event_id <= ?" + args.append(endpoint) + + # now delete any that: + # - have is_state=TRUE, or + # - do not correspond to a row in `events` + txn.execute( + f""" + DELETE FROM event_edges + WHERE event_id IN ( + SELECT ee.event_id + FROM event_edges ee + LEFT JOIN events ev USING (event_id) + WHERE ({where_clause}) AND + (is_state OR ev.event_id IS NULL) + )""", + args, + ) + + logger.info( + "cleaned up event_edges up to %s: removed %i/%i rows", + endpoint, + txn.rowcount, + batch_size, + ) + + if endpoint is not None: + self.db_pool.updates._background_update_progress_txn( + txn, + _BackgroundUpdates.EVENT_EDGES_DROP_INVALID_ROWS, + {"last_event_id": endpoint}, + ) + return False + + # if that was the final batch, we validate the foreign key. + # + # The constraint should have been in place and enforced for new rows since + # before we started deleting invalid rows, so there's no chance for any + # invalid rows to have snuck in the meantime. In other words, this really + # ought to succeed. + logger.info("cleaned up event_edges; enabling foreign key") + txn.execute( + "ALTER TABLE event_edges VALIDATE CONSTRAINT event_edges_event_id_fkey" + ) + return True + + done = await self.db_pool.runInteraction( + desc="drop_invalid_event_edges", func=drop_invalid_event_edges_txn + ) + + if done: + await self.db_pool.updates._end_background_update( + _BackgroundUpdates.EVENT_EDGES_DROP_INVALID_ROWS + ) + + return batch_size diff --git a/synapse/storage/databases/main/group_server.py b/synapse/storage/databases/main/group_server.py deleted file mode 100644 index c15a7136..00000000 --- a/synapse/storage/databases/main/group_server.py +++ /dev/null @@ -1,34 +0,0 @@ -# Copyright 2017 Vector Creations Ltd -# Copyright 2018 New Vector Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from typing import TYPE_CHECKING - -from synapse.storage._base import SQLBaseStore -from synapse.storage.database import DatabasePool, LoggingDatabaseConnection - -if TYPE_CHECKING: - from synapse.server import HomeServer - - -class GroupServerStore(SQLBaseStore): - def __init__( - self, - database: DatabasePool, - db_conn: LoggingDatabaseConnection, - hs: "HomeServer", - ): - # Register a legacy groups background update as a no-op. - database.updates.register_noop_background_update("local_group_updates_index") - super().__init__(database, db_conn, hs) diff --git a/synapse/storage/databases/main/media_repository.py b/synapse/storage/databases/main/media_repository.py index d028be16..9b172a64 100644 --- a/synapse/storage/databases/main/media_repository.py +++ b/synapse/storage/databases/main/media_repository.py @@ -37,9 +37,6 @@ from synapse.types import JsonDict, UserID if TYPE_CHECKING: from synapse.server import HomeServer -BG_UPDATE_REMOVE_MEDIA_REPO_INDEX_WITHOUT_METHOD = ( - "media_repository_drop_index_wo_method" -) BG_UPDATE_REMOVE_MEDIA_REPO_INDEX_WITHOUT_METHOD_2 = ( "media_repository_drop_index_wo_method_2" ) @@ -111,13 +108,6 @@ class MediaRepositoryBackgroundUpdateStore(SQLBaseStore): unique=True, ) - # the original impl of _drop_media_index_without_method was broken (see - # https://github.com/matrix-org/synapse/issues/8649), so we replace the original - # impl with a no-op and run the fixed migration as - # media_repository_drop_index_wo_method_2. - self.db_pool.updates.register_noop_background_update( - BG_UPDATE_REMOVE_MEDIA_REPO_INDEX_WITHOUT_METHOD - ) self.db_pool.updates.register_background_update_handler( BG_UPDATE_REMOVE_MEDIA_REPO_INDEX_WITHOUT_METHOD_2, self._drop_media_index_without_method, diff --git a/synapse/storage/databases/main/purge_events.py b/synapse/storage/databases/main/purge_events.py index ba385f9f..87b0d090 100644 --- a/synapse/storage/databases/main/purge_events.py +++ b/synapse/storage/databases/main/purge_events.py @@ -214,10 +214,10 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore): # Delete all remote non-state events for table in ( + "event_edges", "events", "event_json", "event_auth", - "event_edges", "event_forward_extremities", "event_relations", "event_search", diff --git a/synapse/storage/databases/main/push_rule.py b/synapse/storage/databases/main/push_rule.py index d5aefe02..86649c1e 100644 --- a/synapse/storage/databases/main/push_rule.py +++ b/synapse/storage/databases/main/push_rule.py @@ -110,9 +110,9 @@ def _load_rules( # the abstract methods being implemented. class PushRulesWorkerStore( ApplicationServiceWorkerStore, - ReceiptsWorkerStore, PusherWorkerStore, RoomMemberWorkerStore, + ReceiptsWorkerStore, EventsWorkerStore, SQLBaseStore, metaclass=abc.ABCMeta, diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py index 21e954cc..0090c9f2 100644 --- a/synapse/storage/databases/main/receipts.py +++ b/synapse/storage/databases/main/receipts.py @@ -26,7 +26,7 @@ from typing import ( cast, ) -from synapse.api.constants import EduTypes, ReceiptTypes +from synapse.api.constants import EduTypes from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker from synapse.replication.tcp.streams import ReceiptsStream from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause @@ -36,6 +36,7 @@ from synapse.storage.database import ( LoggingTransaction, ) from synapse.storage.engines import PostgresEngine +from synapse.storage.engines._base import IsolationLevel from synapse.storage.util.id_generators import ( AbstractStreamIdTracker, MultiWriterIdGenerator, @@ -117,7 +118,7 @@ class ReceiptsWorkerStore(SQLBaseStore): return self._receipts_id_gen.get_current_token() async def get_last_receipt_event_id_for_user( - self, user_id: str, room_id: str, receipt_types: Iterable[str] + self, user_id: str, room_id: str, receipt_types: Collection[str] ) -> Optional[str]: """ Fetch the event ID for the latest receipt in a room with one of the given receipt types. @@ -125,58 +126,63 @@ class ReceiptsWorkerStore(SQLBaseStore): Args: user_id: The user to fetch receipts for. room_id: The room ID to fetch the receipt for. - receipt_type: The receipt types to fetch. Earlier receipt types - are given priority if multiple receipts point to the same event. + receipt_type: The receipt types to fetch. Returns: The latest receipt, if one exists. """ - latest_event_id: Optional[str] = None - latest_stream_ordering = 0 - for receipt_type in receipt_types: - result = await self._get_last_receipt_event_id_for_user( - user_id, room_id, receipt_type - ) - if result is None: - continue - event_id, stream_ordering = result - - if latest_event_id is None or latest_stream_ordering < stream_ordering: - latest_event_id = event_id - latest_stream_ordering = stream_ordering + result = await self.db_pool.runInteraction( + "get_last_receipt_event_id_for_user", + self.get_last_receipt_for_user_txn, + user_id, + room_id, + receipt_types, + ) + if not result: + return None - return latest_event_id + event_id, _ = result + return event_id - @cached() - async def _get_last_receipt_event_id_for_user( - self, user_id: str, room_id: str, receipt_type: str + def get_last_receipt_for_user_txn( + self, + txn: LoggingTransaction, + user_id: str, + room_id: str, + receipt_types: Collection[str], ) -> Optional[Tuple[str, int]]: """ - Fetch the event ID and stream ordering for the latest receipt. + Fetch the event ID and stream_ordering for the latest receipt in a room + with one of the given receipt types. Args: user_id: The user to fetch receipts for. room_id: The room ID to fetch the receipt for. - receipt_type: The receipt type to fetch. + receipt_type: The receipt types to fetch. Returns: - The event ID and stream ordering of the latest receipt, if one exists; - otherwise `None`. + The latest receipt, if one exists. """ - sql = """ + + clause, args = make_in_list_sql_clause( + self.database_engine, "receipt_type", receipt_types + ) + + sql = f""" SELECT event_id, stream_ordering FROM receipts_linearized INNER JOIN events USING (room_id, event_id) - WHERE user_id = ? + WHERE {clause} + AND user_id = ? AND room_id = ? - AND receipt_type = ? + ORDER BY stream_ordering DESC + LIMIT 1 """ - def f(txn: LoggingTransaction) -> Optional[Tuple[str, int]]: - txn.execute(sql, (user_id, room_id, receipt_type)) - return cast(Optional[Tuple[str, int]], txn.fetchone()) + args.extend((user_id, room_id)) + txn.execute(sql, args) - return await self.db_pool.runInteraction("get_own_receipt_for_user", f) + return cast(Optional[Tuple[str, int]], txn.fetchone()) async def get_receipts_for_user( self, user_id: str, receipt_types: Iterable[str] @@ -576,8 +582,11 @@ class ReceiptsWorkerStore(SQLBaseStore): ) -> None: self._get_receipts_for_user_with_orderings.invalidate((user_id, receipt_type)) self._get_linearized_receipts_for_room.invalidate((room_id,)) - self._get_last_receipt_event_id_for_user.invalidate( - (user_id, room_id, receipt_type) + + # We use this method to invalidate so that we don't end up with circular + # dependencies between the receipts and push action stores. + self._attempt_to_invalidate_cache( + "get_unread_event_push_actions_by_room_for_user", (room_id,) ) def process_replication_rows( @@ -673,17 +682,6 @@ class ReceiptsWorkerStore(SQLBaseStore): lock=False, ) - # When updating a local users read receipt, remove any push actions - # which resulted from the receipt's event and all earlier events. - if ( - self.hs.is_mine_id(user_id) - and receipt_type in (ReceiptTypes.READ, ReceiptTypes.READ_PRIVATE) - and stream_ordering is not None - ): - self._remove_old_push_actions_before_txn( # type: ignore[attr-defined] - txn, room_id=room_id, user_id=user_id, stream_ordering=stream_ordering - ) - return rx_ts def _graph_to_linear( @@ -764,6 +762,10 @@ class ReceiptsWorkerStore(SQLBaseStore): linearized_event_id, data, stream_id=stream_id, + # Read committed is actually beneficial here because we check for a receipt with + # greater stream order, and checking the very latest data at select time is better + # than the data at transaction start time. + isolation_level=IsolationLevel.READ_COMMITTED, ) # If the receipt was older than the currently persisted one, nothing to do. diff --git a/synapse/storage/databases/main/registration.py b/synapse/storage/databases/main/registration.py index 4991360b..cb63cd9b 100644 --- a/synapse/storage/databases/main/registration.py +++ b/synapse/storage/databases/main/registration.py @@ -1805,21 +1805,10 @@ class RegistrationBackgroundUpdateStore(RegistrationWorkerStore): columns=["creation_ts"], ) - # we no longer use refresh tokens, but it's possible that some people - # might have a background update queued to build this index. Just - # clear the background update. - self.db_pool.updates.register_noop_background_update( - "refresh_tokens_device_index" - ) - self.db_pool.updates.register_background_update_handler( "users_set_deactivated_flag", self._background_update_set_deactivated_flag ) - self.db_pool.updates.register_noop_background_update( - "user_threepids_grandfather" - ) - self.db_pool.updates.register_background_index_update( "user_external_ids_user_id_idx", index_name="user_external_ids_user_id_idx", diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py index 68d4fc2e..13d6a1d5 100644 --- a/synapse/storage/databases/main/room.py +++ b/synapse/storage/databases/main/room.py @@ -32,12 +32,17 @@ from typing import ( import attr -from synapse.api.constants import EventContentFields, EventTypes, JoinRules +from synapse.api.constants import ( + EventContentFields, + EventTypes, + JoinRules, + PublicRoomsFilterFields, +) from synapse.api.errors import StoreError from synapse.api.room_versions import RoomVersion, RoomVersions from synapse.config.homeserver import HomeServerConfig from synapse.events import EventBase -from synapse.storage._base import SQLBaseStore, db_to_json +from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause from synapse.storage.database import ( DatabasePool, LoggingDatabaseConnection, @@ -199,10 +204,29 @@ class RoomWorkerStore(CacheInvalidationWorkerStore): desc="get_public_room_ids", ) + def _construct_room_type_where_clause( + self, room_types: Union[List[Union[str, None]], None] + ) -> Tuple[Union[str, None], List[str]]: + if not room_types or not self.config.experimental.msc3827_enabled: + return None, [] + else: + # We use None when we want get rooms without a type + is_null_clause = "" + if None in room_types: + is_null_clause = "OR room_type IS NULL" + room_types = [value for value in room_types if value is not None] + + list_clause, args = make_in_list_sql_clause( + self.database_engine, "room_type", room_types + ) + + return f"({list_clause} {is_null_clause})", args + async def count_public_rooms( self, network_tuple: Optional[ThirdPartyInstanceID], ignore_non_federatable: bool, + search_filter: Optional[dict], ) -> int: """Counts the number of public rooms as tracked in the room_stats_current and room_stats_state table. @@ -210,11 +234,20 @@ class RoomWorkerStore(CacheInvalidationWorkerStore): Args: network_tuple ignore_non_federatable: If true filters out non-federatable rooms + search_filter """ def _count_public_rooms_txn(txn: LoggingTransaction) -> int: query_args = [] + room_type_clause, args = self._construct_room_type_where_clause( + search_filter.get(PublicRoomsFilterFields.ROOM_TYPES, None) + if search_filter + else None + ) + room_type_clause = f" AND {room_type_clause}" if room_type_clause else "" + query_args += args + if network_tuple: if network_tuple.appservice_id: published_sql = """ @@ -249,6 +282,7 @@ class RoomWorkerStore(CacheInvalidationWorkerStore): OR join_rules = '{JoinRules.KNOCK_RESTRICTED}' OR history_visibility = 'world_readable' ) + {room_type_clause} AND joined_members > 0 """ @@ -347,8 +381,12 @@ class RoomWorkerStore(CacheInvalidationWorkerStore): if ignore_non_federatable: where_clauses.append("is_federatable") - if search_filter and search_filter.get("generic_search_term", None): - search_term = "%" + search_filter["generic_search_term"] + "%" + if search_filter and search_filter.get( + PublicRoomsFilterFields.GENERIC_SEARCH_TERM, None + ): + search_term = ( + "%" + search_filter[PublicRoomsFilterFields.GENERIC_SEARCH_TERM] + "%" + ) where_clauses.append( """ @@ -365,6 +403,15 @@ class RoomWorkerStore(CacheInvalidationWorkerStore): search_term.lower(), ] + room_type_clause, args = self._construct_room_type_where_clause( + search_filter.get(PublicRoomsFilterFields.ROOM_TYPES, None) + if search_filter + else None + ) + if room_type_clause: + where_clauses.append(room_type_clause) + query_args += args + where_clause = "" if where_clauses: where_clause = " AND " + " AND ".join(where_clauses) @@ -373,7 +420,7 @@ class RoomWorkerStore(CacheInvalidationWorkerStore): sql = f""" SELECT room_id, name, topic, canonical_alias, joined_members, - avatar, history_visibility, guest_access, join_rules + avatar, history_visibility, guest_access, join_rules, room_type FROM ( {published_sql} ) published @@ -1109,17 +1156,25 @@ class RoomWorkerStore(CacheInvalidationWorkerStore): return room_servers async def clear_partial_state_room(self, room_id: str) -> bool: - # this can race with incoming events, so we watch out for FK errors. - # TODO(faster_joins): this still doesn't completely fix the race, since the persist process - # is not atomic. I fear we need an application-level lock. + """Clears the partial state flag for a room. + + Args: + room_id: The room whose partial state flag is to be cleared. + + Returns: + `True` if the partial state flag has been cleared successfully. + + `False` if the partial state flag could not be cleared because the room + still contains events with partial state. + """ try: await self.db_pool.runInteraction( "clear_partial_state_room", self._clear_partial_state_room_txn, room_id ) return True - except self.db_pool.engine.module.DatabaseError as e: - # TODO(faster_joins): how do we distinguish between FK errors and other errors? - logger.warning( + except self.db_pool.engine.module.IntegrityError as e: + # Assume that any `IntegrityError`s are due to partial state events. + logger.info( "Exception while clearing lazy partial-state-room %s, retrying: %s", room_id, e, @@ -1164,6 +1219,7 @@ class _BackgroundUpdates: POPULATE_ROOM_DEPTH_MIN_DEPTH2 = "populate_room_depth_min_depth2" REPLACE_ROOM_DEPTH_MIN_DEPTH = "replace_room_depth_min_depth" POPULATE_ROOMS_CREATOR_COLUMN = "populate_rooms_creator_column" + ADD_ROOM_TYPE_COLUMN = "add_room_type_column" _REPLACE_ROOM_DEPTH_SQL_COMMANDS = ( @@ -1198,6 +1254,11 @@ class RoomBackgroundUpdateStore(SQLBaseStore): self._background_add_rooms_room_version_column, ) + self.db_pool.updates.register_background_update_handler( + _BackgroundUpdates.ADD_ROOM_TYPE_COLUMN, + self._background_add_room_type_column, + ) + # BG updates to change the type of room_depth.min_depth self.db_pool.updates.register_background_update_handler( _BackgroundUpdates.POPULATE_ROOM_DEPTH_MIN_DEPTH2, @@ -1567,6 +1628,69 @@ class RoomBackgroundUpdateStore(SQLBaseStore): return batch_size + async def _background_add_room_type_column( + self, progress: JsonDict, batch_size: int + ) -> int: + """Background update to go and add room_type information to `room_stats_state` + table from `event_json` table. + """ + + last_room_id = progress.get("room_id", "") + + def _background_add_room_type_column_txn( + txn: LoggingTransaction, + ) -> bool: + sql = """ + SELECT state.room_id, json FROM event_json + INNER JOIN current_state_events AS state USING (event_id) + WHERE state.room_id > ? AND type = 'm.room.create' + ORDER BY state.room_id + LIMIT ? + """ + + txn.execute(sql, (last_room_id, batch_size)) + room_id_to_create_event_results = txn.fetchall() + + new_last_room_id = None + for room_id, event_json in room_id_to_create_event_results: + event_dict = db_to_json(event_json) + + room_type = event_dict.get("content", {}).get( + EventContentFields.ROOM_TYPE, None + ) + if isinstance(room_type, str): + self.db_pool.simple_update_txn( + txn, + table="room_stats_state", + keyvalues={"room_id": room_id}, + updatevalues={"room_type": room_type}, + ) + + new_last_room_id = room_id + + if new_last_room_id is None: + return True + + self.db_pool.updates._background_update_progress_txn( + txn, + _BackgroundUpdates.ADD_ROOM_TYPE_COLUMN, + {"room_id": new_last_room_id}, + ) + + return False + + end = await self.db_pool.runInteraction( + "_background_add_room_type_column", + _background_add_room_type_column_txn, + ) + + if end: + await self.db_pool.updates._end_background_update( + _BackgroundUpdates.ADD_ROOM_TYPE_COLUMN + ) + + return batch_size + class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore): def __init__( diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py index 31bc8c56..0b5e4e42 100644 --- a/synapse/storage/databases/main/roommember.py +++ b/synapse/storage/databases/main/roommember.py @@ -212,6 +212,60 @@ class RoomMemberWorkerStore(EventsWorkerStore): txn.execute(sql, (room_id, Membership.JOIN)) return [r[0] for r in txn] + @cached() + def get_user_in_room_with_profile( + self, room_id: str, user_id: str + ) -> Dict[str, ProfileInfo]: + raise NotImplementedError() + + @cachedList( + cached_method_name="get_user_in_room_with_profile", list_name="user_ids" + ) + async def get_subset_users_in_room_with_profiles( + self, room_id: str, user_ids: Collection[str] + ) -> Dict[str, ProfileInfo]: + """Get a mapping from user ID to profile information for a list of users + in a given room. + + The profile information comes directly from this room's `m.room.member` + events, and so may be specific to this room rather than part of a user's + global profile. To avoid privacy leaks, the profile data should only be + revealed to users who are already in this room. + + Args: + room_id: The ID of the room to retrieve the users of. + user_ids: a list of users in the room to run the query for + + Returns: + A mapping from user ID to ProfileInfo. + """ + + def _get_subset_users_in_room_with_profiles( + txn: LoggingTransaction, + ) -> Dict[str, ProfileInfo]: + clause, ids = make_in_list_sql_clause( + self.database_engine, "m.user_id", user_ids + ) + + sql = """ + SELECT state_key, display_name, avatar_url FROM room_memberships as m + INNER JOIN current_state_events as c + ON m.event_id = c.event_id + AND m.room_id = c.room_id + AND m.user_id = c.state_key + WHERE c.type = 'm.room.member' AND c.room_id = ? AND m.membership = ? AND %s + """ % ( + clause, + ) + txn.execute(sql, (room_id, Membership.JOIN, *ids)) + + return {r[0]: ProfileInfo(display_name=r[1], avatar_url=r[2]) for r in txn} + + return await self.db_pool.runInteraction( + "get_subset_users_in_room_with_profiles", + _get_subset_users_in_room_with_profiles, + ) + @cached(max_entries=100000, iterable=True) async def get_users_in_room_with_profiles( self, room_id: str @@ -338,6 +392,15 @@ class RoomMemberWorkerStore(EventsWorkerStore): ) @cached() + async def get_number_joined_users_in_room(self, room_id: str) -> int: + return await self.db_pool.simple_select_one_onecol( + table="current_state_events", + keyvalues={"room_id": room_id, "membership": Membership.JOIN}, + retcol="COUNT(*)", + desc="get_number_joined_users_in_room", + ) + + @cached() async def get_invited_rooms_for_local_user( self, user_id: str ) -> List[RoomsForUser]: @@ -416,6 +479,17 @@ class RoomMemberWorkerStore(EventsWorkerStore): user_id: str, membership_list: List[str], ) -> List[RoomsForUser]: + """Get all the rooms for this *local* user where the membership for this user + matches one in the membership list. + + Args: + user_id: The user ID. + membership_list: A list of synapse.api.constants.Membership + values which the user must be in. + + Returns: + The RoomsForUser that the user matches the membership types. + """ # Paranoia check. if not self.hs.is_mine_id(user_id): raise Exception( @@ -444,6 +518,18 @@ class RoomMemberWorkerStore(EventsWorkerStore): return results + @cached(iterable=True) + async def get_local_users_in_room(self, room_id: str) -> List[str]: + """ + Retrieves a list of the current roommembers who are local to the server. + """ + return await self.db_pool.simple_select_onecol( + table="local_current_membership", + keyvalues={"room_id": room_id, "membership": Membership.JOIN}, + retcol="user_id", + desc="get_local_users_in_room", + ) + async def get_local_current_membership_for_user_in_room( self, user_id: str, room_id: str ) -> Tuple[Optional[str], Optional[str]]: diff --git a/synapse/storage/databases/main/search.py b/synapse/storage/databases/main/search.py index 78e0773b..f6e24b68 100644 --- a/synapse/storage/databases/main/search.py +++ b/synapse/storage/databases/main/search.py @@ -113,7 +113,6 @@ class SearchBackgroundUpdateStore(SearchWorkerStore): EVENT_SEARCH_UPDATE_NAME = "event_search" EVENT_SEARCH_ORDER_UPDATE_NAME = "event_search_order" - EVENT_SEARCH_USE_GIST_POSTGRES_NAME = "event_search_postgres_gist" EVENT_SEARCH_USE_GIN_POSTGRES_NAME = "event_search_postgres_gin" EVENT_SEARCH_DELETE_NON_STRINGS = "event_search_sqlite_delete_non_strings" @@ -132,15 +131,6 @@ class SearchBackgroundUpdateStore(SearchWorkerStore): self.EVENT_SEARCH_ORDER_UPDATE_NAME, self._background_reindex_search_order ) - # we used to have a background update to turn the GIN index into a - # GIST one; we no longer do that (obviously) because we actually want - # a GIN index. However, it's possible that some people might still have - # the background update queued, so we register a handler to clear the - # background update. - self.db_pool.updates.register_noop_background_update( - self.EVENT_SEARCH_USE_GIST_POSTGRES_NAME - ) - self.db_pool.updates.register_background_update_handler( self.EVENT_SEARCH_USE_GIN_POSTGRES_NAME, self._background_reindex_gin_search ) diff --git a/synapse/storage/databases/main/state.py b/synapse/storage/databases/main/state.py index bdd00273..9674c4a7 100644 --- a/synapse/storage/databases/main/state.py +++ b/synapse/storage/databases/main/state.py @@ -127,13 +127,8 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore): NotFoundError: if the room is unknown """ - # First we try looking up room version from the database, but for old - # rooms we might not have added the room version to it yet so we fall - # back to previous behaviour and look in current state events. - # # We really should have an entry in the rooms table for every room we - # care about, but let's be a bit paranoid (at least while the background - # update is happening) to avoid breaking existing rooms. + # care about, but let's be a bit paranoid. room_version = self.db_pool.simple_select_one_onecol_txn( txn, table="rooms", @@ -440,6 +435,7 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore): ) # TODO(faster_joins): need to do something about workers here + # https://github.com/matrix-org/synapse/issues/12994 txn.call_after(self.is_partial_state_event.invalidate, (event.event_id,)) txn.call_after( self._get_state_group_for_event.prefill, diff --git a/synapse/storage/databases/main/stats.py b/synapse/storage/databases/main/stats.py index b95dbef6..b4c652ac 100644 --- a/synapse/storage/databases/main/stats.py +++ b/synapse/storage/databases/main/stats.py @@ -16,7 +16,7 @@ import logging from enum import Enum from itertools import chain -from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, cast +from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union, cast from typing_extensions import Counter @@ -120,11 +120,6 @@ class StatsStore(StateDeltasStore): self.db_pool.updates.register_background_update_handler( "populate_stats_process_users", self._populate_stats_process_users ) - # we no longer need to perform clean-up, but we will give ourselves - # the potential to reintroduce it in the future – so documentation - # will still encourage the use of this no-op handler. - self.db_pool.updates.register_noop_background_update("populate_stats_cleanup") - self.db_pool.updates.register_noop_background_update("populate_stats_prepare") async def _populate_stats_process_users( self, progress: JsonDict, batch_size: int @@ -243,6 +238,7 @@ class StatsStore(StateDeltasStore): * avatar * canonical_alias * guest_access + * room_type A is_federatable key can also be included with a boolean value. @@ -268,6 +264,7 @@ class StatsStore(StateDeltasStore): "avatar", "canonical_alias", "guest_access", + "room_type", ): field = fields.get(col, sentinel) if field is not sentinel and (not isinstance(field, str) or "\0" in field): @@ -300,6 +297,7 @@ class StatsStore(StateDeltasStore): keyvalues={id_col: id}, retcol="completed_delta_stream_id", allow_none=True, + desc="get_earliest_token_for_stats", ) async def bulk_update_stats_delta( @@ -576,7 +574,7 @@ class StatsStore(StateDeltasStore): state_event_map = await self.get_events(event_ids, get_prev_content=False) # type: ignore[attr-defined] - room_state = { + room_state: Dict[str, Union[None, bool, str]] = { "join_rules": None, "history_visibility": None, "encryption": None, @@ -585,6 +583,7 @@ class StatsStore(StateDeltasStore): "avatar": None, "canonical_alias": None, "is_federatable": True, + "room_type": None, } for event in state_event_map.values(): @@ -608,6 +607,9 @@ class StatsStore(StateDeltasStore): room_state["is_federatable"] = ( event.content.get(EventContentFields.FEDERATE, True) is True ) + room_type = event.content.get(EventContentFields.ROOM_TYPE) + if isinstance(room_type, str): + room_state["room_type"] = room_type await self.update_room_state(room_id, room_state) diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py index 8e88784d..3a1df777 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py @@ -46,10 +46,12 @@ from typing import ( Set, Tuple, cast, + overload, ) import attr from frozendict import frozendict +from typing_extensions import Literal from twisted.internet import defer @@ -795,6 +797,24 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): ) return RoomStreamToken(topo, stream_ordering) + @overload + def get_stream_id_for_event_txn( + self, + txn: LoggingTransaction, + event_id: str, + allow_none: Literal[False] = False, + ) -> int: + ... + + @overload + def get_stream_id_for_event_txn( + self, + txn: LoggingTransaction, + event_id: str, + allow_none: bool = False, + ) -> Optional[int]: + ... + def get_stream_id_for_event_txn( self, txn: LoggingTransaction, |