summaryrefslogtreecommitdiff
path: root/synapse/storage/controllers
diff options
context:
space:
mode:
authorAntonio Russo <aerusso@aerusso.net>2023-08-23 23:23:20 -0600
committerAntonio Russo <aerusso@aerusso.net>2023-08-23 23:23:20 -0600
commit7d14b344339cd9e55ae85d2802fbae1e781d0042 (patch)
tree56b482d87cb5f15312e036f91d2da714b4103375 /synapse/storage/controllers
parent6027905201d37bb95ac4855e5d19abcce0cec062 (diff)
New upstream version 1.90.0
Diffstat (limited to 'synapse/storage/controllers')
-rw-r--r--synapse/storage/controllers/__init__.py2
-rw-r--r--synapse/storage/controllers/persist_events.py32
-rw-r--r--synapse/storage/controllers/purge_events.py22
-rw-r--r--synapse/storage/controllers/state.py187
-rw-r--r--synapse/storage/controllers/stats.py112
5 files changed, 324 insertions, 31 deletions
diff --git a/synapse/storage/controllers/__init__.py b/synapse/storage/controllers/__init__.py
index 45101cda..0ef86026 100644
--- a/synapse/storage/controllers/__init__.py
+++ b/synapse/storage/controllers/__init__.py
@@ -19,6 +19,7 @@ from synapse.storage.controllers.persist_events import (
)
from synapse.storage.controllers.purge_events import PurgeEventsStorageController
from synapse.storage.controllers.state import StateStorageController
+from synapse.storage.controllers.stats import StatsController
from synapse.storage.databases import Databases
from synapse.storage.databases.main import DataStore
@@ -40,6 +41,7 @@ class StorageControllers:
self.purge_events = PurgeEventsStorageController(hs, stores)
self.state = StateStorageController(hs, stores)
+ self.stats = StatsController(hs, stores)
self.persistence = None
if stores.persist_events:
diff --git a/synapse/storage/controllers/persist_events.py b/synapse/storage/controllers/persist_events.py
index f1d2c71c..35cd1089 100644
--- a/synapse/storage/controllers/persist_events.py
+++ b/synapse/storage/controllers/persist_events.py
@@ -45,6 +45,7 @@ from twisted.internet import defer
from synapse.api.constants import EventTypes, Membership
from synapse.events import EventBase
from synapse.events.snapshot import EventContext
+from synapse.handlers.worker_lock import DELETE_ROOM_LOCK_NAME
from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable
from synapse.logging.opentracing import (
SynapseTags,
@@ -338,6 +339,7 @@ class EventsPersistenceStorageController:
)
self._state_resolution_handler = hs.get_state_resolution_handler()
self._state_controller = state_controller
+ self.hs = hs
async def _process_event_persist_queue_task(
self,
@@ -350,15 +352,22 @@ class EventsPersistenceStorageController:
A dictionary of event ID to event ID we didn't persist as we already
had another event persisted with the same TXN ID.
"""
- if isinstance(task, _PersistEventsTask):
- return await self._persist_event_batch(room_id, task)
- elif isinstance(task, _UpdateCurrentStateTask):
- await self._update_current_state(room_id, task)
- return {}
- else:
- raise AssertionError(
- f"Found an unexpected task type in event persistence queue: {task}"
- )
+
+ # Ensure that the room can't be deleted while we're persisting events to
+ # it. We might already have taken out the lock, but since this is just a
+ # "read" lock its inherently reentrant.
+ async with self.hs.get_worker_locks_handler().acquire_read_write_lock(
+ DELETE_ROOM_LOCK_NAME, room_id, write=False
+ ):
+ if isinstance(task, _PersistEventsTask):
+ return await self._persist_event_batch(room_id, task)
+ elif isinstance(task, _UpdateCurrentStateTask):
+ await self._update_current_state(room_id, task)
+ return {}
+ else:
+ raise AssertionError(
+ f"Found an unexpected task type in event persistence queue: {task}"
+ )
@trace
async def persist_events(
@@ -839,9 +848,8 @@ class EventsPersistenceStorageController:
"group" % (ev.event_id,)
)
continue
-
- if ctx.prev_group:
- state_group_deltas[(ctx.prev_group, ctx.state_group)] = ctx.delta_ids
+ if ctx.state_group_deltas:
+ state_group_deltas.update(ctx.state_group_deltas)
# We need to map the event_ids to their state groups. First, let's
# check if the event is one we're persisting, in which case we can
diff --git a/synapse/storage/controllers/purge_events.py b/synapse/storage/controllers/purge_events.py
index 9ca50d6a..c599397b 100644
--- a/synapse/storage/controllers/purge_events.py
+++ b/synapse/storage/controllers/purge_events.py
@@ -16,6 +16,7 @@ import itertools
import logging
from typing import TYPE_CHECKING, Set
+from synapse.logging.context import nested_logging_context
from synapse.storage.databases import Databases
if TYPE_CHECKING:
@@ -33,8 +34,9 @@ class PurgeEventsStorageController:
async def purge_room(self, room_id: str) -> None:
"""Deletes all record of a room"""
- state_groups_to_delete = await self.stores.main.purge_room(room_id)
- await self.stores.state.purge_room_state(room_id, state_groups_to_delete)
+ with nested_logging_context(room_id):
+ state_groups_to_delete = await self.stores.main.purge_room(room_id)
+ await self.stores.state.purge_room_state(room_id, state_groups_to_delete)
async def purge_history(
self, room_id: str, token: str, delete_local_events: bool
@@ -51,15 +53,17 @@ class PurgeEventsStorageController:
(instead of just marking them as outliers and deleting their
state groups).
"""
- state_groups = await self.stores.main.purge_history(
- room_id, token, delete_local_events
- )
-
- logger.info("[purge] finding state groups that can be deleted")
+ with nested_logging_context(room_id):
+ state_groups = await self.stores.main.purge_history(
+ room_id, token, delete_local_events
+ )
- sg_to_delete = await self._find_unreferenced_groups(state_groups)
+ logger.info("[purge] finding state groups that can be deleted")
+ sg_to_delete = await self._find_unreferenced_groups(state_groups)
- await self.stores.state.purge_unreferenced_state_groups(room_id, sg_to_delete)
+ await self.stores.state.purge_unreferenced_state_groups(
+ room_id, sg_to_delete
+ )
async def _find_unreferenced_groups(self, state_groups: Set[int]) -> Set[int]:
"""Used when purging history to figure out which state groups can be
diff --git a/synapse/storage/controllers/state.py b/synapse/storage/controllers/state.py
index 9d7a8a79..278c7832 100644
--- a/synapse/storage/controllers/state.py
+++ b/synapse/storage/controllers/state.py
@@ -12,22 +12,24 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
+from itertools import chain
from typing import (
TYPE_CHECKING,
AbstractSet,
Any,
- Awaitable,
Callable,
Collection,
Dict,
+ FrozenSet,
Iterable,
List,
Mapping,
Optional,
Tuple,
+ Union,
)
-from synapse.api.constants import EventTypes
+from synapse.api.constants import EventTypes, Membership
from synapse.events import EventBase
from synapse.logging.opentracing import tag_args, trace
from synapse.storage.roommember import ProfileInfo
@@ -35,14 +37,20 @@ from synapse.storage.util.partial_state_events_tracker import (
PartialCurrentStateTracker,
PartialStateEventsTracker,
)
-from synapse.types import MutableStateMap, StateMap
+from synapse.types import MutableStateMap, StateMap, get_domain_from_id
from synapse.types.state import StateFilter
+from synapse.util.async_helpers import Linearizer
+from synapse.util.caches import intern_string
+from synapse.util.caches.descriptors import cached
from synapse.util.cancellation import cancellable
+from synapse.util.metrics import Measure
if TYPE_CHECKING:
from synapse.server import HomeServer
+ from synapse.state import _StateCacheEntry
from synapse.storage.databases import Databases
+
logger = logging.getLogger(__name__)
@@ -53,10 +61,15 @@ class StateStorageController:
def __init__(self, hs: "HomeServer", stores: "Databases"):
self._is_mine_id = hs.is_mine_id
+ self._clock = hs.get_clock()
self.stores = stores
self._partial_state_events_tracker = PartialStateEventsTracker(stores.main)
self._partial_state_room_tracker = PartialCurrentStateTracker(stores.main)
+ # Used by `_get_joined_hosts` to ensure only one thing mutates the cache
+ # at a time. Keyed by room_id.
+ self._joined_host_linearizer = Linearizer("_JoinedHostsCache")
+
def notify_event_un_partial_stated(self, event_id: str) -> None:
self._partial_state_events_tracker.notify_un_partial_stated(event_id)
@@ -67,6 +80,8 @@ class StateStorageController:
"""
self._partial_state_room_tracker.notify_un_partial_stated(room_id)
+ @trace
+ @tag_args
async def get_state_group_delta(
self, state_group: int
) -> Tuple[Optional[int], Optional[StateMap[str]]]:
@@ -84,6 +99,8 @@ class StateStorageController:
state_group_delta = await self.stores.state.get_state_group_delta(state_group)
return state_group_delta.prev_group, state_group_delta.delta_ids
+ @trace
+ @tag_args
async def get_state_groups_ids(
self, _room_id: str, event_ids: Collection[str], await_full_state: bool = True
) -> Dict[int, MutableStateMap[str]]:
@@ -114,6 +131,8 @@ class StateStorageController:
return group_to_state
+ @trace
+ @tag_args
async def get_state_ids_for_group(
self, state_group: int, state_filter: Optional[StateFilter] = None
) -> StateMap[str]:
@@ -130,6 +149,8 @@ class StateStorageController:
return group_to_state[state_group]
+ @trace
+ @tag_args
async def get_state_groups(
self, room_id: str, event_ids: Collection[str]
) -> Dict[int, List[EventBase]]:
@@ -165,9 +186,11 @@ class StateStorageController:
for group, event_id_map in group_to_ids.items()
}
- def _get_state_groups_from_groups(
+ @trace
+ @tag_args
+ async def _get_state_groups_from_groups(
self, groups: List[int], state_filter: StateFilter
- ) -> Awaitable[Dict[int, StateMap[str]]]:
+ ) -> Dict[int, StateMap[str]]:
"""Returns the state groups for a given set of groups, filtering on
types of state events.
@@ -180,9 +203,12 @@ class StateStorageController:
Dict of state group to state map.
"""
- return self.stores.state._get_state_groups_from_groups(groups, state_filter)
+ return await self.stores.state._get_state_groups_from_groups(
+ groups, state_filter
+ )
@trace
+ @tag_args
async def get_state_for_events(
self, event_ids: Collection[str], state_filter: Optional[StateFilter] = None
) -> Dict[str, StateMap[EventBase]]:
@@ -280,6 +306,8 @@ class StateStorageController:
return {event: event_to_state[event] for event in event_ids}
+ @trace
+ @tag_args
async def get_state_for_event(
self, event_id: str, state_filter: Optional[StateFilter] = None
) -> StateMap[EventBase]:
@@ -303,6 +331,7 @@ class StateStorageController:
return state_map[event_id]
@trace
+ @tag_args
async def get_state_ids_for_event(
self,
event_id: str,
@@ -333,9 +362,11 @@ class StateStorageController:
)
return state_map[event_id]
- def get_state_for_groups(
+ @trace
+ @tag_args
+ async def get_state_for_groups(
self, groups: Iterable[int], state_filter: Optional[StateFilter] = None
- ) -> Awaitable[Dict[int, MutableStateMap[str]]]:
+ ) -> Dict[int, MutableStateMap[str]]:
"""Gets the state at each of a list of state groups, optionally
filtering by type/state_key
@@ -347,7 +378,7 @@ class StateStorageController:
Returns:
Dict of state group to state map.
"""
- return self.stores.state._get_state_for_groups(
+ return await self.stores.state._get_state_for_groups(
groups, state_filter or StateFilter.all()
)
@@ -402,6 +433,8 @@ class StateStorageController:
event_id, room_id, prev_group, delta_ids, current_state_ids
)
+ @trace
+ @tag_args
@cancellable
async def get_current_state_ids(
self,
@@ -442,6 +475,8 @@ class StateStorageController:
room_id, on_invalidate=on_invalidate
)
+ @trace
+ @tag_args
async def get_canonical_alias_for_room(self, room_id: str) -> Optional[str]:
"""Get canonical alias for room, if any
@@ -464,8 +499,10 @@ class StateStorageController:
if not event:
return None
- return event.content.get("canonical_alias")
+ return event.content.get("alias")
+ @trace
+ @tag_args
async def get_current_state_deltas(
self, prev_stream_id: int, max_stream_id: int
) -> Tuple[int, List[Dict[str, Any]]]:
@@ -500,6 +537,7 @@ class StateStorageController:
)
@trace
+ @tag_args
async def get_current_state(
self, room_id: str, state_filter: Optional[StateFilter] = None
) -> StateMap[EventBase]:
@@ -516,6 +554,8 @@ class StateStorageController:
return state_map
+ @trace
+ @tag_args
async def get_current_state_event(
self, room_id: str, event_type: str, state_key: str
) -> Optional[EventBase]:
@@ -527,6 +567,8 @@ class StateStorageController:
)
return state_map.get(key)
+ @trace
+ @tag_args
async def get_current_hosts_in_room(self, room_id: str) -> AbstractSet[str]:
"""Get current hosts in room based on current state.
@@ -538,6 +580,8 @@ class StateStorageController:
return await self.stores.main.get_current_hosts_in_room(room_id)
+ @trace
+ @tag_args
async def get_current_hosts_in_room_ordered(self, room_id: str) -> List[str]:
"""Get current hosts in room based on current state.
@@ -553,6 +597,8 @@ class StateStorageController:
return await self.stores.main.get_current_hosts_in_room_ordered(room_id)
+ @trace
+ @tag_args
async def get_current_hosts_in_room_or_partial_state_approximation(
self, room_id: str
) -> Collection[str]:
@@ -582,6 +628,8 @@ class StateStorageController:
return hosts
+ @trace
+ @tag_args
async def get_users_in_room_with_profiles(
self, room_id: str
) -> Mapping[str, ProfileInfo]:
@@ -593,3 +641,122 @@ class StateStorageController:
await self._partial_state_room_tracker.await_full_state(room_id)
return await self.stores.main.get_users_in_room_with_profiles(room_id)
+
+ async def get_joined_hosts(
+ self, room_id: str, state_entry: "_StateCacheEntry"
+ ) -> FrozenSet[str]:
+ state_group: Union[object, int] = state_entry.state_group
+ if not state_group:
+ # If state_group is None it means it has yet to be assigned a
+ # state group, i.e. we need to make sure that calls with a state_group
+ # of None don't hit previous cached calls with a None state_group.
+ # To do this we set the state_group to a new object as object() != object()
+ state_group = object()
+
+ assert state_group is not None
+ with Measure(self._clock, "get_joined_hosts"):
+ return await self._get_joined_hosts(
+ room_id, state_group, state_entry=state_entry
+ )
+
+ @cached(num_args=2, max_entries=10000, iterable=True)
+ async def _get_joined_hosts(
+ self,
+ room_id: str,
+ state_group: Union[object, int],
+ state_entry: "_StateCacheEntry",
+ ) -> FrozenSet[str]:
+ # We don't use `state_group`, it's there so that we can cache based on
+ # it. However, its important that its never None, since two
+ # current_state's with a state_group of None are likely to be different.
+ #
+ # The `state_group` must match the `state_entry.state_group` (if not None).
+ assert state_group is not None
+ assert state_entry.state_group is None or state_entry.state_group == state_group
+
+ # We use a secondary cache of previous work to allow us to build up the
+ # joined hosts for the given state group based on previous state groups.
+ #
+ # We cache one object per room containing the results of the last state
+ # group we got joined hosts for. The idea is that generally
+ # `get_joined_hosts` is called with the "current" state group for the
+ # room, and so consecutive calls will be for consecutive state groups
+ # which point to the previous state group.
+ cache = await self.stores.main._get_joined_hosts_cache(room_id)
+
+ # If the state group in the cache matches, we already have the data we need.
+ if state_entry.state_group == cache.state_group:
+ return frozenset(cache.hosts_to_joined_users)
+
+ # Since we'll mutate the cache we need to lock.
+ async with self._joined_host_linearizer.queue(room_id):
+ if state_entry.state_group == cache.state_group:
+ # Same state group, so nothing to do. We've already checked for
+ # this above, but the cache may have changed while waiting on
+ # the lock.
+ pass
+ elif state_entry.prev_group == cache.state_group:
+ # The cached work is for the previous state group, so we work out
+ # the delta.
+ assert state_entry.delta_ids is not None
+ for (typ, state_key), event_id in state_entry.delta_ids.items():
+ if typ != EventTypes.Member:
+ continue
+
+ host = intern_string(get_domain_from_id(state_key))
+ user_id = state_key
+ known_joins = cache.hosts_to_joined_users.setdefault(host, set())
+
+ event = await self.stores.main.get_event(event_id)
+ if event.membership == Membership.JOIN:
+ known_joins.add(user_id)
+ else:
+ known_joins.discard(user_id)
+
+ if not known_joins:
+ cache.hosts_to_joined_users.pop(host, None)
+ else:
+ # The cache doesn't match the state group or prev state group,
+ # so we calculate the result from first principles.
+ #
+ # We need to fetch all hosts joined to the room according to `state` by
+ # inspecting all join memberships in `state`. However, if the `state` is
+ # relatively recent then many of its events are likely to be held in
+ # the current state of the room, which is easily available and likely
+ # cached.
+ #
+ # We therefore compute the set of `state` events not in the
+ # current state and only fetch those.
+ current_memberships = (
+ await self.stores.main._get_approximate_current_memberships_in_room(
+ room_id
+ )
+ )
+ unknown_state_events = {}
+ joined_users_in_current_state = []
+
+ state = await state_entry.get_state(
+ self, StateFilter.from_types([(EventTypes.Member, None)])
+ )
+
+ for (type, state_key), event_id in state.items():
+ if event_id not in current_memberships:
+ unknown_state_events[type, state_key] = event_id
+ elif current_memberships[event_id] == Membership.JOIN:
+ joined_users_in_current_state.append(state_key)
+
+ joined_user_ids = await self.stores.main.get_joined_user_ids_from_state(
+ room_id, unknown_state_events
+ )
+
+ cache.hosts_to_joined_users = {}
+ for user_id in chain(joined_user_ids, joined_users_in_current_state):
+ host = intern_string(get_domain_from_id(user_id))
+ cache.hosts_to_joined_users.setdefault(host, set()).add(user_id)
+
+ if state_entry.state_group:
+ cache.state_group = state_entry.state_group
+ else:
+ cache.state_group = object()
+
+ return frozenset(cache.hosts_to_joined_users)
diff --git a/synapse/storage/controllers/stats.py b/synapse/storage/controllers/stats.py
new file mode 100644
index 00000000..2a03528f
--- /dev/null
+++ b/synapse/storage/controllers/stats.py
@@ -0,0 +1,112 @@
+# Copyright 2023 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+from typing import TYPE_CHECKING, Collection, Counter, List, Tuple
+
+from synapse.api.errors import SynapseError
+from synapse.storage.database import LoggingTransaction
+from synapse.storage.databases import Databases
+from synapse.storage.engines import PostgresEngine
+
+if TYPE_CHECKING:
+ from synapse.server import HomeServer
+
+logger = logging.getLogger(__name__)
+
+
+class StatsController:
+ """High level interface for getting statistics."""
+
+ def __init__(self, hs: "HomeServer", stores: Databases):
+ self.stores = stores
+
+ async def get_room_db_size_estimate(self) -> List[Tuple[str, int]]:
+ """Get an estimate of the largest rooms and how much database space they
+ use, in bytes.
+
+ Only works against PostgreSQL.
+
+ Note: this uses the postgres statistics so is a very rough estimate.
+ """
+
+ # Note: We look at both tables on the main and state databases.
+ if not isinstance(self.stores.main.database_engine, PostgresEngine):
+ raise SynapseError(400, "Endpoint requires using PostgreSQL")
+
+ if not isinstance(self.stores.state.database_engine, PostgresEngine):
+ raise SynapseError(400, "Endpoint requires using PostgreSQL")
+
+ # For each "large" table, we go through and get the largest rooms
+ # and an estimate of how much space they take. We can then sum the
+ # results and return the top 10.
+ #
+ # This isn't the most accurate, but given all of these are estimates
+ # anyway its good enough.
+ room_estimates: Counter[str] = Counter()
+
+ # Return size of the table on disk, including indexes and TOAST.
+ table_sql = """
+ SELECT pg_total_relation_size(?)
+ """
+
+ # Get an estimate for the largest rooms and their frequency.
+ #
+ # Note: the cast here is a hack to cast from `anyarray` to an actual
+ # type. This ensures that psycopg2 passes us a back a a Python list.
+ column_sql = """
+ SELECT
+ most_common_vals::TEXT::TEXT[], most_common_freqs::TEXT::NUMERIC[]
+ FROM pg_stats
+ WHERE tablename = ? and attname = 'room_id'
+ """
+
+ def get_room_db_size_estimate_txn(
+ txn: LoggingTransaction,
+ tables: Collection[str],
+ ) -> None:
+ for table in tables:
+ txn.execute(table_sql, (table,))
+ row = txn.fetchone()
+ assert row is not None
+ (table_size,) = row
+
+ txn.execute(column_sql, (table,))
+ row = txn.fetchone()
+ assert row is not None
+ vals, freqs = row
+
+ for room_id, freq in zip(vals, freqs):
+ room_estimates[room_id] += int(freq * table_size)
+
+ await self.stores.main.db_pool.runInteraction(
+ "get_room_db_size_estimate_main",
+ get_room_db_size_estimate_txn,
+ (
+ "event_json",
+ "events",
+ "event_search",
+ "event_edges",
+ "event_push_actions",
+ "stream_ordering_to_exterm",
+ ),
+ )
+
+ await self.stores.state.db_pool.runInteraction(
+ "get_room_db_size_estimate_state",
+ get_room_db_size_estimate_txn,
+ ("state_groups_state",),
+ )
+
+ return room_estimates.most_common(10)