diff options
author | Antonio Russo <aerusso@aerusso.net> | 2023-08-23 23:23:20 -0600 |
---|---|---|
committer | Antonio Russo <aerusso@aerusso.net> | 2023-08-23 23:23:20 -0600 |
commit | 7d14b344339cd9e55ae85d2802fbae1e781d0042 (patch) | |
tree | 56b482d87cb5f15312e036f91d2da714b4103375 /synapse/storage/controllers | |
parent | 6027905201d37bb95ac4855e5d19abcce0cec062 (diff) |
New upstream version 1.90.0
Diffstat (limited to 'synapse/storage/controllers')
-rw-r--r-- | synapse/storage/controllers/__init__.py | 2 | ||||
-rw-r--r-- | synapse/storage/controllers/persist_events.py | 32 | ||||
-rw-r--r-- | synapse/storage/controllers/purge_events.py | 22 | ||||
-rw-r--r-- | synapse/storage/controllers/state.py | 187 | ||||
-rw-r--r-- | synapse/storage/controllers/stats.py | 112 |
5 files changed, 324 insertions, 31 deletions
diff --git a/synapse/storage/controllers/__init__.py b/synapse/storage/controllers/__init__.py index 45101cda..0ef86026 100644 --- a/synapse/storage/controllers/__init__.py +++ b/synapse/storage/controllers/__init__.py @@ -19,6 +19,7 @@ from synapse.storage.controllers.persist_events import ( ) from synapse.storage.controllers.purge_events import PurgeEventsStorageController from synapse.storage.controllers.state import StateStorageController +from synapse.storage.controllers.stats import StatsController from synapse.storage.databases import Databases from synapse.storage.databases.main import DataStore @@ -40,6 +41,7 @@ class StorageControllers: self.purge_events = PurgeEventsStorageController(hs, stores) self.state = StateStorageController(hs, stores) + self.stats = StatsController(hs, stores) self.persistence = None if stores.persist_events: diff --git a/synapse/storage/controllers/persist_events.py b/synapse/storage/controllers/persist_events.py index f1d2c71c..35cd1089 100644 --- a/synapse/storage/controllers/persist_events.py +++ b/synapse/storage/controllers/persist_events.py @@ -45,6 +45,7 @@ from twisted.internet import defer from synapse.api.constants import EventTypes, Membership from synapse.events import EventBase from synapse.events.snapshot import EventContext +from synapse.handlers.worker_lock import DELETE_ROOM_LOCK_NAME from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable from synapse.logging.opentracing import ( SynapseTags, @@ -338,6 +339,7 @@ class EventsPersistenceStorageController: ) self._state_resolution_handler = hs.get_state_resolution_handler() self._state_controller = state_controller + self.hs = hs async def _process_event_persist_queue_task( self, @@ -350,15 +352,22 @@ class EventsPersistenceStorageController: A dictionary of event ID to event ID we didn't persist as we already had another event persisted with the same TXN ID. """ - if isinstance(task, _PersistEventsTask): - return await self._persist_event_batch(room_id, task) - elif isinstance(task, _UpdateCurrentStateTask): - await self._update_current_state(room_id, task) - return {} - else: - raise AssertionError( - f"Found an unexpected task type in event persistence queue: {task}" - ) + + # Ensure that the room can't be deleted while we're persisting events to + # it. We might already have taken out the lock, but since this is just a + # "read" lock its inherently reentrant. + async with self.hs.get_worker_locks_handler().acquire_read_write_lock( + DELETE_ROOM_LOCK_NAME, room_id, write=False + ): + if isinstance(task, _PersistEventsTask): + return await self._persist_event_batch(room_id, task) + elif isinstance(task, _UpdateCurrentStateTask): + await self._update_current_state(room_id, task) + return {} + else: + raise AssertionError( + f"Found an unexpected task type in event persistence queue: {task}" + ) @trace async def persist_events( @@ -839,9 +848,8 @@ class EventsPersistenceStorageController: "group" % (ev.event_id,) ) continue - - if ctx.prev_group: - state_group_deltas[(ctx.prev_group, ctx.state_group)] = ctx.delta_ids + if ctx.state_group_deltas: + state_group_deltas.update(ctx.state_group_deltas) # We need to map the event_ids to their state groups. First, let's # check if the event is one we're persisting, in which case we can diff --git a/synapse/storage/controllers/purge_events.py b/synapse/storage/controllers/purge_events.py index 9ca50d6a..c599397b 100644 --- a/synapse/storage/controllers/purge_events.py +++ b/synapse/storage/controllers/purge_events.py @@ -16,6 +16,7 @@ import itertools import logging from typing import TYPE_CHECKING, Set +from synapse.logging.context import nested_logging_context from synapse.storage.databases import Databases if TYPE_CHECKING: @@ -33,8 +34,9 @@ class PurgeEventsStorageController: async def purge_room(self, room_id: str) -> None: """Deletes all record of a room""" - state_groups_to_delete = await self.stores.main.purge_room(room_id) - await self.stores.state.purge_room_state(room_id, state_groups_to_delete) + with nested_logging_context(room_id): + state_groups_to_delete = await self.stores.main.purge_room(room_id) + await self.stores.state.purge_room_state(room_id, state_groups_to_delete) async def purge_history( self, room_id: str, token: str, delete_local_events: bool @@ -51,15 +53,17 @@ class PurgeEventsStorageController: (instead of just marking them as outliers and deleting their state groups). """ - state_groups = await self.stores.main.purge_history( - room_id, token, delete_local_events - ) - - logger.info("[purge] finding state groups that can be deleted") + with nested_logging_context(room_id): + state_groups = await self.stores.main.purge_history( + room_id, token, delete_local_events + ) - sg_to_delete = await self._find_unreferenced_groups(state_groups) + logger.info("[purge] finding state groups that can be deleted") + sg_to_delete = await self._find_unreferenced_groups(state_groups) - await self.stores.state.purge_unreferenced_state_groups(room_id, sg_to_delete) + await self.stores.state.purge_unreferenced_state_groups( + room_id, sg_to_delete + ) async def _find_unreferenced_groups(self, state_groups: Set[int]) -> Set[int]: """Used when purging history to figure out which state groups can be diff --git a/synapse/storage/controllers/state.py b/synapse/storage/controllers/state.py index 9d7a8a79..278c7832 100644 --- a/synapse/storage/controllers/state.py +++ b/synapse/storage/controllers/state.py @@ -12,22 +12,24 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging +from itertools import chain from typing import ( TYPE_CHECKING, AbstractSet, Any, - Awaitable, Callable, Collection, Dict, + FrozenSet, Iterable, List, Mapping, Optional, Tuple, + Union, ) -from synapse.api.constants import EventTypes +from synapse.api.constants import EventTypes, Membership from synapse.events import EventBase from synapse.logging.opentracing import tag_args, trace from synapse.storage.roommember import ProfileInfo @@ -35,14 +37,20 @@ from synapse.storage.util.partial_state_events_tracker import ( PartialCurrentStateTracker, PartialStateEventsTracker, ) -from synapse.types import MutableStateMap, StateMap +from synapse.types import MutableStateMap, StateMap, get_domain_from_id from synapse.types.state import StateFilter +from synapse.util.async_helpers import Linearizer +from synapse.util.caches import intern_string +from synapse.util.caches.descriptors import cached from synapse.util.cancellation import cancellable +from synapse.util.metrics import Measure if TYPE_CHECKING: from synapse.server import HomeServer + from synapse.state import _StateCacheEntry from synapse.storage.databases import Databases + logger = logging.getLogger(__name__) @@ -53,10 +61,15 @@ class StateStorageController: def __init__(self, hs: "HomeServer", stores: "Databases"): self._is_mine_id = hs.is_mine_id + self._clock = hs.get_clock() self.stores = stores self._partial_state_events_tracker = PartialStateEventsTracker(stores.main) self._partial_state_room_tracker = PartialCurrentStateTracker(stores.main) + # Used by `_get_joined_hosts` to ensure only one thing mutates the cache + # at a time. Keyed by room_id. + self._joined_host_linearizer = Linearizer("_JoinedHostsCache") + def notify_event_un_partial_stated(self, event_id: str) -> None: self._partial_state_events_tracker.notify_un_partial_stated(event_id) @@ -67,6 +80,8 @@ class StateStorageController: """ self._partial_state_room_tracker.notify_un_partial_stated(room_id) + @trace + @tag_args async def get_state_group_delta( self, state_group: int ) -> Tuple[Optional[int], Optional[StateMap[str]]]: @@ -84,6 +99,8 @@ class StateStorageController: state_group_delta = await self.stores.state.get_state_group_delta(state_group) return state_group_delta.prev_group, state_group_delta.delta_ids + @trace + @tag_args async def get_state_groups_ids( self, _room_id: str, event_ids: Collection[str], await_full_state: bool = True ) -> Dict[int, MutableStateMap[str]]: @@ -114,6 +131,8 @@ class StateStorageController: return group_to_state + @trace + @tag_args async def get_state_ids_for_group( self, state_group: int, state_filter: Optional[StateFilter] = None ) -> StateMap[str]: @@ -130,6 +149,8 @@ class StateStorageController: return group_to_state[state_group] + @trace + @tag_args async def get_state_groups( self, room_id: str, event_ids: Collection[str] ) -> Dict[int, List[EventBase]]: @@ -165,9 +186,11 @@ class StateStorageController: for group, event_id_map in group_to_ids.items() } - def _get_state_groups_from_groups( + @trace + @tag_args + async def _get_state_groups_from_groups( self, groups: List[int], state_filter: StateFilter - ) -> Awaitable[Dict[int, StateMap[str]]]: + ) -> Dict[int, StateMap[str]]: """Returns the state groups for a given set of groups, filtering on types of state events. @@ -180,9 +203,12 @@ class StateStorageController: Dict of state group to state map. """ - return self.stores.state._get_state_groups_from_groups(groups, state_filter) + return await self.stores.state._get_state_groups_from_groups( + groups, state_filter + ) @trace + @tag_args async def get_state_for_events( self, event_ids: Collection[str], state_filter: Optional[StateFilter] = None ) -> Dict[str, StateMap[EventBase]]: @@ -280,6 +306,8 @@ class StateStorageController: return {event: event_to_state[event] for event in event_ids} + @trace + @tag_args async def get_state_for_event( self, event_id: str, state_filter: Optional[StateFilter] = None ) -> StateMap[EventBase]: @@ -303,6 +331,7 @@ class StateStorageController: return state_map[event_id] @trace + @tag_args async def get_state_ids_for_event( self, event_id: str, @@ -333,9 +362,11 @@ class StateStorageController: ) return state_map[event_id] - def get_state_for_groups( + @trace + @tag_args + async def get_state_for_groups( self, groups: Iterable[int], state_filter: Optional[StateFilter] = None - ) -> Awaitable[Dict[int, MutableStateMap[str]]]: + ) -> Dict[int, MutableStateMap[str]]: """Gets the state at each of a list of state groups, optionally filtering by type/state_key @@ -347,7 +378,7 @@ class StateStorageController: Returns: Dict of state group to state map. """ - return self.stores.state._get_state_for_groups( + return await self.stores.state._get_state_for_groups( groups, state_filter or StateFilter.all() ) @@ -402,6 +433,8 @@ class StateStorageController: event_id, room_id, prev_group, delta_ids, current_state_ids ) + @trace + @tag_args @cancellable async def get_current_state_ids( self, @@ -442,6 +475,8 @@ class StateStorageController: room_id, on_invalidate=on_invalidate ) + @trace + @tag_args async def get_canonical_alias_for_room(self, room_id: str) -> Optional[str]: """Get canonical alias for room, if any @@ -464,8 +499,10 @@ class StateStorageController: if not event: return None - return event.content.get("canonical_alias") + return event.content.get("alias") + @trace + @tag_args async def get_current_state_deltas( self, prev_stream_id: int, max_stream_id: int ) -> Tuple[int, List[Dict[str, Any]]]: @@ -500,6 +537,7 @@ class StateStorageController: ) @trace + @tag_args async def get_current_state( self, room_id: str, state_filter: Optional[StateFilter] = None ) -> StateMap[EventBase]: @@ -516,6 +554,8 @@ class StateStorageController: return state_map + @trace + @tag_args async def get_current_state_event( self, room_id: str, event_type: str, state_key: str ) -> Optional[EventBase]: @@ -527,6 +567,8 @@ class StateStorageController: ) return state_map.get(key) + @trace + @tag_args async def get_current_hosts_in_room(self, room_id: str) -> AbstractSet[str]: """Get current hosts in room based on current state. @@ -538,6 +580,8 @@ class StateStorageController: return await self.stores.main.get_current_hosts_in_room(room_id) + @trace + @tag_args async def get_current_hosts_in_room_ordered(self, room_id: str) -> List[str]: """Get current hosts in room based on current state. @@ -553,6 +597,8 @@ class StateStorageController: return await self.stores.main.get_current_hosts_in_room_ordered(room_id) + @trace + @tag_args async def get_current_hosts_in_room_or_partial_state_approximation( self, room_id: str ) -> Collection[str]: @@ -582,6 +628,8 @@ class StateStorageController: return hosts + @trace + @tag_args async def get_users_in_room_with_profiles( self, room_id: str ) -> Mapping[str, ProfileInfo]: @@ -593,3 +641,122 @@ class StateStorageController: await self._partial_state_room_tracker.await_full_state(room_id) return await self.stores.main.get_users_in_room_with_profiles(room_id) + + async def get_joined_hosts( + self, room_id: str, state_entry: "_StateCacheEntry" + ) -> FrozenSet[str]: + state_group: Union[object, int] = state_entry.state_group + if not state_group: + # If state_group is None it means it has yet to be assigned a + # state group, i.e. we need to make sure that calls with a state_group + # of None don't hit previous cached calls with a None state_group. + # To do this we set the state_group to a new object as object() != object() + state_group = object() + + assert state_group is not None + with Measure(self._clock, "get_joined_hosts"): + return await self._get_joined_hosts( + room_id, state_group, state_entry=state_entry + ) + + @cached(num_args=2, max_entries=10000, iterable=True) + async def _get_joined_hosts( + self, + room_id: str, + state_group: Union[object, int], + state_entry: "_StateCacheEntry", + ) -> FrozenSet[str]: + # We don't use `state_group`, it's there so that we can cache based on + # it. However, its important that its never None, since two + # current_state's with a state_group of None are likely to be different. + # + # The `state_group` must match the `state_entry.state_group` (if not None). + assert state_group is not None + assert state_entry.state_group is None or state_entry.state_group == state_group + + # We use a secondary cache of previous work to allow us to build up the + # joined hosts for the given state group based on previous state groups. + # + # We cache one object per room containing the results of the last state + # group we got joined hosts for. The idea is that generally + # `get_joined_hosts` is called with the "current" state group for the + # room, and so consecutive calls will be for consecutive state groups + # which point to the previous state group. + cache = await self.stores.main._get_joined_hosts_cache(room_id) + + # If the state group in the cache matches, we already have the data we need. + if state_entry.state_group == cache.state_group: + return frozenset(cache.hosts_to_joined_users) + + # Since we'll mutate the cache we need to lock. + async with self._joined_host_linearizer.queue(room_id): + if state_entry.state_group == cache.state_group: + # Same state group, so nothing to do. We've already checked for + # this above, but the cache may have changed while waiting on + # the lock. + pass + elif state_entry.prev_group == cache.state_group: + # The cached work is for the previous state group, so we work out + # the delta. + assert state_entry.delta_ids is not None + for (typ, state_key), event_id in state_entry.delta_ids.items(): + if typ != EventTypes.Member: + continue + + host = intern_string(get_domain_from_id(state_key)) + user_id = state_key + known_joins = cache.hosts_to_joined_users.setdefault(host, set()) + + event = await self.stores.main.get_event(event_id) + if event.membership == Membership.JOIN: + known_joins.add(user_id) + else: + known_joins.discard(user_id) + + if not known_joins: + cache.hosts_to_joined_users.pop(host, None) + else: + # The cache doesn't match the state group or prev state group, + # so we calculate the result from first principles. + # + # We need to fetch all hosts joined to the room according to `state` by + # inspecting all join memberships in `state`. However, if the `state` is + # relatively recent then many of its events are likely to be held in + # the current state of the room, which is easily available and likely + # cached. + # + # We therefore compute the set of `state` events not in the + # current state and only fetch those. + current_memberships = ( + await self.stores.main._get_approximate_current_memberships_in_room( + room_id + ) + ) + unknown_state_events = {} + joined_users_in_current_state = [] + + state = await state_entry.get_state( + self, StateFilter.from_types([(EventTypes.Member, None)]) + ) + + for (type, state_key), event_id in state.items(): + if event_id not in current_memberships: + unknown_state_events[type, state_key] = event_id + elif current_memberships[event_id] == Membership.JOIN: + joined_users_in_current_state.append(state_key) + + joined_user_ids = await self.stores.main.get_joined_user_ids_from_state( + room_id, unknown_state_events + ) + + cache.hosts_to_joined_users = {} + for user_id in chain(joined_user_ids, joined_users_in_current_state): + host = intern_string(get_domain_from_id(user_id)) + cache.hosts_to_joined_users.setdefault(host, set()).add(user_id) + + if state_entry.state_group: + cache.state_group = state_entry.state_group + else: + cache.state_group = object() + + return frozenset(cache.hosts_to_joined_users) diff --git a/synapse/storage/controllers/stats.py b/synapse/storage/controllers/stats.py new file mode 100644 index 00000000..2a03528f --- /dev/null +++ b/synapse/storage/controllers/stats.py @@ -0,0 +1,112 @@ +# Copyright 2023 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +from typing import TYPE_CHECKING, Collection, Counter, List, Tuple + +from synapse.api.errors import SynapseError +from synapse.storage.database import LoggingTransaction +from synapse.storage.databases import Databases +from synapse.storage.engines import PostgresEngine + +if TYPE_CHECKING: + from synapse.server import HomeServer + +logger = logging.getLogger(__name__) + + +class StatsController: + """High level interface for getting statistics.""" + + def __init__(self, hs: "HomeServer", stores: Databases): + self.stores = stores + + async def get_room_db_size_estimate(self) -> List[Tuple[str, int]]: + """Get an estimate of the largest rooms and how much database space they + use, in bytes. + + Only works against PostgreSQL. + + Note: this uses the postgres statistics so is a very rough estimate. + """ + + # Note: We look at both tables on the main and state databases. + if not isinstance(self.stores.main.database_engine, PostgresEngine): + raise SynapseError(400, "Endpoint requires using PostgreSQL") + + if not isinstance(self.stores.state.database_engine, PostgresEngine): + raise SynapseError(400, "Endpoint requires using PostgreSQL") + + # For each "large" table, we go through and get the largest rooms + # and an estimate of how much space they take. We can then sum the + # results and return the top 10. + # + # This isn't the most accurate, but given all of these are estimates + # anyway its good enough. + room_estimates: Counter[str] = Counter() + + # Return size of the table on disk, including indexes and TOAST. + table_sql = """ + SELECT pg_total_relation_size(?) + """ + + # Get an estimate for the largest rooms and their frequency. + # + # Note: the cast here is a hack to cast from `anyarray` to an actual + # type. This ensures that psycopg2 passes us a back a a Python list. + column_sql = """ + SELECT + most_common_vals::TEXT::TEXT[], most_common_freqs::TEXT::NUMERIC[] + FROM pg_stats + WHERE tablename = ? and attname = 'room_id' + """ + + def get_room_db_size_estimate_txn( + txn: LoggingTransaction, + tables: Collection[str], + ) -> None: + for table in tables: + txn.execute(table_sql, (table,)) + row = txn.fetchone() + assert row is not None + (table_size,) = row + + txn.execute(column_sql, (table,)) + row = txn.fetchone() + assert row is not None + vals, freqs = row + + for room_id, freq in zip(vals, freqs): + room_estimates[room_id] += int(freq * table_size) + + await self.stores.main.db_pool.runInteraction( + "get_room_db_size_estimate_main", + get_room_db_size_estimate_txn, + ( + "event_json", + "events", + "event_search", + "event_edges", + "event_push_actions", + "stream_ordering_to_exterm", + ), + ) + + await self.stores.state.db_pool.runInteraction( + "get_room_db_size_estimate_state", + get_room_db_size_estimate_txn, + ("state_groups_state",), + ) + + return room_estimates.most_common(10) |