summaryrefslogtreecommitdiff
path: root/synapse/storage/databases
diff options
context:
space:
mode:
authorAndrej Shadura <andrewsh@debian.org>2022-07-20 14:17:32 +0200
committerAndrej Shadura <andrewsh@debian.org>2022-07-20 14:17:32 +0200
commit4df717ab4138a59602fbce16d855794d3f611e60 (patch)
tree1a3db57b438b236b1d4dcfdbf4b49d320d8e748e /synapse/storage/databases
parentcdbe14e39a29d617fd8358639042fe57e08ac978 (diff)
New upstream version 1.63.0
Diffstat (limited to 'synapse/storage/databases')
-rw-r--r--synapse/storage/databases/main/__init__.py9
-rw-r--r--synapse/storage/databases/main/deviceinbox.py11
-rw-r--r--synapse/storage/databases/main/devices.py129
-rw-r--r--synapse/storage/databases/main/event_push_actions.py490
-rw-r--r--synapse/storage/databases/main/events.py114
-rw-r--r--synapse/storage/databases/main/events_bg_updates.py121
-rw-r--r--synapse/storage/databases/main/group_server.py34
-rw-r--r--synapse/storage/databases/main/media_repository.py10
-rw-r--r--synapse/storage/databases/main/purge_events.py2
-rw-r--r--synapse/storage/databases/main/push_rule.py2
-rw-r--r--synapse/storage/databases/main/receipts.py92
-rw-r--r--synapse/storage/databases/main/registration.py11
-rw-r--r--synapse/storage/databases/main/room.py146
-rw-r--r--synapse/storage/databases/main/roommember.py86
-rw-r--r--synapse/storage/databases/main/search.py10
-rw-r--r--synapse/storage/databases/main/state.py8
-rw-r--r--synapse/storage/databases/main/stats.py16
-rw-r--r--synapse/storage/databases/main/stream.py20
18 files changed, 941 insertions, 370 deletions
diff --git a/synapse/storage/databases/main/__init__.py b/synapse/storage/databases/main/__init__.py
index 11d9d16c..a3d31d37 100644
--- a/synapse/storage/databases/main/__init__.py
+++ b/synapse/storage/databases/main/__init__.py
@@ -45,7 +45,6 @@ from .event_push_actions import EventPushActionsStore
from .events_bg_updates import EventsBackgroundUpdatesStore
from .events_forward_extremities import EventForwardExtremitiesStore
from .filtering import FilteringStore
-from .group_server import GroupServerStore
from .keys import KeyStore
from .lock import LockStore
from .media_repository import MediaRepositoryStore
@@ -88,7 +87,6 @@ class DataStore(
RoomStore,
RoomBatchStore,
RegistrationStore,
- StreamWorkerStore,
ProfileStore,
PresenceStore,
TransactionWorkerStore,
@@ -105,19 +103,20 @@ class DataStore(
PusherStore,
PushRuleStore,
ApplicationServiceTransactionStore,
+ EventPushActionsStore,
+ ServerMetricsStore,
ReceiptsStore,
EndToEndKeyStore,
EndToEndRoomKeyStore,
SearchStore,
TagsStore,
AccountDataStore,
- EventPushActionsStore,
+ StreamWorkerStore,
OpenIdStore,
ClientIpWorkerStore,
DeviceStore,
DeviceInboxStore,
UserDirectoryStore,
- GroupServerStore,
UserErasureStore,
MonthlyActiveUsersWorkerStore,
StatsStore,
@@ -126,7 +125,6 @@ class DataStore(
UIAuthStore,
EventForwardExtremitiesStore,
CacheInvalidationWorkerStore,
- ServerMetricsStore,
LockStore,
SessionStore,
):
@@ -197,6 +195,7 @@ class DataStore(
self._min_stream_order_on_start = self.get_room_min_stream_ordering()
def get_device_stream_token(self) -> int:
+ # TODO: shouldn't this be moved to `DeviceWorkerStore`?
return self._device_list_id_gen.get_current_token()
async def get_users(self) -> List[JsonDict]:
diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py
index 599b4183..422e0e65 100644
--- a/synapse/storage/databases/main/deviceinbox.py
+++ b/synapse/storage/databases/main/deviceinbox.py
@@ -834,8 +834,6 @@ class DeviceInboxWorkerStore(SQLBaseStore):
class DeviceInboxBackgroundUpdateStore(SQLBaseStore):
DEVICE_INBOX_STREAM_ID = "device_inbox_stream_drop"
- REMOVE_DELETED_DEVICES = "remove_deleted_devices_from_device_inbox"
- REMOVE_HIDDEN_DEVICES = "remove_hidden_devices_from_device_inbox"
REMOVE_DEAD_DEVICES_FROM_INBOX = "remove_dead_devices_from_device_inbox"
def __init__(
@@ -857,15 +855,6 @@ class DeviceInboxBackgroundUpdateStore(SQLBaseStore):
self.DEVICE_INBOX_STREAM_ID, self._background_drop_index_device_inbox
)
- # Used to be a background update that deletes all device_inboxes for deleted
- # devices.
- self.db_pool.updates.register_noop_background_update(
- self.REMOVE_DELETED_DEVICES
- )
- # Used to be a background update that deletes all device_inboxes for hidden
- # devices.
- self.db_pool.updates.register_noop_background_update(self.REMOVE_HIDDEN_DEVICES)
-
self.db_pool.updates.register_background_update_handler(
self.REMOVE_DEAD_DEVICES_FROM_INBOX,
self._remove_dead_devices_from_device_inbox,
diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py
index d900064c..adde5d09 100644
--- a/synapse/storage/databases/main/devices.py
+++ b/synapse/storage/databases/main/devices.py
@@ -28,6 +28,8 @@ from typing import (
cast,
)
+from typing_extensions import Literal
+
from synapse.api.constants import EduTypes
from synapse.api.errors import Codes, StoreError
from synapse.logging.opentracing import (
@@ -44,6 +46,8 @@ from synapse.storage.database import (
LoggingTransaction,
make_tuple_comparison_clause,
)
+from synapse.storage.databases.main.end_to_end_keys import EndToEndKeyWorkerStore
+from synapse.storage.types import Cursor
from synapse.types import JsonDict, get_verify_key_from_cross_signing_key
from synapse.util import json_decoder, json_encoder
from synapse.util.caches.descriptors import cached, cachedList
@@ -65,7 +69,7 @@ DROP_DEVICE_LIST_STREAMS_NON_UNIQUE_INDEXES = (
BG_UPDATE_REMOVE_DUP_OUTBOUND_POKES = "remove_dup_outbound_pokes"
-class DeviceWorkerStore(SQLBaseStore):
+class DeviceWorkerStore(EndToEndKeyWorkerStore):
def __init__(
self,
database: DatabasePool,
@@ -74,7 +78,9 @@ class DeviceWorkerStore(SQLBaseStore):
):
super().__init__(database, db_conn, hs)
- device_list_max = self._device_list_id_gen.get_current_token()
+ # Type-ignore: _device_list_id_gen is mixed in from either DataStore (as a
+ # StreamIdGenerator) or SlavedDataStore (as a SlavedIdTracker).
+ device_list_max = self._device_list_id_gen.get_current_token() # type: ignore[attr-defined]
device_list_prefill, min_device_list_id = self.db_pool.get_cache_dict(
db_conn,
"device_lists_stream",
@@ -339,8 +345,9 @@ class DeviceWorkerStore(SQLBaseStore):
# following this stream later.
last_processed_stream_id = from_stream_id
- query_map = {}
- cross_signing_keys_by_user = {}
+ # A map of (user ID, device ID) to (stream ID, context).
+ query_map: Dict[Tuple[str, str], Tuple[int, Optional[str]]] = {}
+ cross_signing_keys_by_user: Dict[str, Dict[str, object]] = {}
for user_id, device_id, update_stream_id, update_context in updates:
# Calculate the remaining length budget.
# Note that, for now, each entry in `cross_signing_keys_by_user`
@@ -596,7 +603,7 @@ class DeviceWorkerStore(SQLBaseStore):
txn=txn,
table="device_lists_outbound_last_success",
key_names=("destination", "user_id"),
- key_values=((destination, user_id) for user_id, _ in rows),
+ key_values=[(destination, user_id) for user_id, _ in rows],
value_names=("stream_id",),
value_values=((stream_id,) for _, stream_id in rows),
)
@@ -621,7 +628,9 @@ class DeviceWorkerStore(SQLBaseStore):
The new stream ID.
"""
- async with self._device_list_id_gen.get_next() as stream_id:
+ # TODO: this looks like it's _writing_. Should this be on DeviceStore rather
+ # than DeviceWorkerStore?
+ async with self._device_list_id_gen.get_next() as stream_id: # type: ignore[attr-defined]
await self.db_pool.runInteraction(
"add_user_sig_change_to_streams",
self._add_user_signature_change_txn,
@@ -686,7 +695,7 @@ class DeviceWorkerStore(SQLBaseStore):
} - users_needing_resync
user_ids_not_in_cache = user_ids - user_ids_in_cache
- results = {}
+ results: Dict[str, Dict[str, JsonDict]] = {}
for user_id, device_id in query_list:
if user_id not in user_ids_in_cache:
continue
@@ -727,7 +736,7 @@ class DeviceWorkerStore(SQLBaseStore):
def get_cached_device_list_changes(
self,
from_key: int,
- ) -> Optional[Set[str]]:
+ ) -> Optional[List[str]]:
"""Get set of users whose devices have changed since `from_key`, or None
if that information is not in our cache.
"""
@@ -737,7 +746,7 @@ class DeviceWorkerStore(SQLBaseStore):
async def get_users_whose_devices_changed(
self,
from_key: int,
- user_ids: Optional[Iterable[str]] = None,
+ user_ids: Optional[Collection[str]] = None,
to_key: Optional[int] = None,
) -> Set[str]:
"""Get set of users whose devices have changed since `from_key` that
@@ -757,6 +766,7 @@ class DeviceWorkerStore(SQLBaseStore):
"""
# Get set of users who *may* have changed. Users not in the returned
# list have definitely not changed.
+ user_ids_to_check: Optional[Collection[str]]
if user_ids is None:
# Get set of all users that have had device list changes since 'from_key'
user_ids_to_check = self._device_list_stream_cache.get_all_entities_changed(
@@ -772,7 +782,7 @@ class DeviceWorkerStore(SQLBaseStore):
return set()
def _get_users_whose_devices_changed_txn(txn: LoggingTransaction) -> Set[str]:
- changes = set()
+ changes: Set[str] = set()
stream_id_where_clause = "stream_id > ?"
sql_args = [from_key]
@@ -788,6 +798,9 @@ class DeviceWorkerStore(SQLBaseStore):
"""
# Query device changes with a batch of users at a time
+ # Assertion for mypy's benefit; see also
+ # https://mypy.readthedocs.io/en/stable/common_issues.html#narrowing-and-inner-functions
+ assert user_ids_to_check is not None
for chunk in batch_iter(user_ids_to_check, 100):
clause, args = make_in_list_sql_clause(
txn.database_engine, "user_id", chunk
@@ -854,7 +867,9 @@ class DeviceWorkerStore(SQLBaseStore):
if last_id == current_id:
return [], current_id, False
- def _get_all_device_list_changes_for_remotes(txn):
+ def _get_all_device_list_changes_for_remotes(
+ txn: Cursor,
+ ) -> Tuple[List[Tuple[int, tuple]], int, bool]:
# This query Does The Right Thing where it'll correctly apply the
# bounds to the inner queries.
sql = """
@@ -913,7 +928,7 @@ class DeviceWorkerStore(SQLBaseStore):
desc="get_device_list_last_stream_id_for_remotes",
)
- results = {user_id: None for user_id in user_ids}
+ results: Dict[str, Optional[str]] = {user_id: None for user_id in user_ids}
results.update({row["user_id"]: row["stream_id"] for row in rows})
return results
@@ -1193,6 +1208,65 @@ class DeviceWorkerStore(SQLBaseStore):
return devices
+ @cached()
+ async def _get_min_device_lists_changes_in_room(self) -> int:
+ """Returns the minimum stream ID that we have entries for
+ `device_lists_changes_in_room`
+ """
+
+ return await self.db_pool.simple_select_one_onecol(
+ table="device_lists_changes_in_room",
+ keyvalues={},
+ retcol="COALESCE(MIN(stream_id), 0)",
+ desc="get_min_device_lists_changes_in_room",
+ )
+
+ async def get_device_list_changes_in_rooms(
+ self, room_ids: Collection[str], from_id: int
+ ) -> Optional[Set[str]]:
+ """Return the set of users whose devices have changed in the given rooms
+ since the given stream ID.
+
+ Returns None if the given stream ID is too old.
+ """
+
+ if not room_ids:
+ return set()
+
+ min_stream_id = await self._get_min_device_lists_changes_in_room()
+
+ if min_stream_id > from_id:
+ return None
+
+ sql = """
+ SELECT DISTINCT user_id FROM device_lists_changes_in_room
+ WHERE {clause} AND stream_id >= ?
+ """
+
+ def _get_device_list_changes_in_rooms_txn(
+ txn: LoggingTransaction,
+ clause: str,
+ args: List[Any],
+ ) -> Set[str]:
+ txn.execute(sql.format(clause=clause), args)
+ return {user_id for user_id, in txn}
+
+ changes = set()
+ for chunk in batch_iter(room_ids, 1000):
+ clause, args = make_in_list_sql_clause(
+ self.database_engine, "room_id", chunk
+ )
+ args.append(from_id)
+
+ changes |= await self.db_pool.runInteraction(
+ "get_device_list_changes_in_rooms",
+ _get_device_list_changes_in_rooms_txn,
+ clause,
+ args,
+ )
+
+ return changes
+
class DeviceBackgroundUpdateStore(SQLBaseStore):
def __init__(
@@ -1240,15 +1314,6 @@ class DeviceBackgroundUpdateStore(SQLBaseStore):
self._remove_duplicate_outbound_pokes,
)
- # a pair of background updates that were added during the 1.14 release cycle,
- # but replaced with 58/06dlols_unique_idx.py
- self.db_pool.updates.register_noop_background_update(
- "device_lists_outbound_last_success_unique_idx",
- )
- self.db_pool.updates.register_noop_background_update(
- "drop_device_lists_outbound_last_success_non_unique_idx",
- )
-
async def _drop_device_list_streams_non_unique_indexes(
self, progress: JsonDict, batch_size: int
) -> int:
@@ -1346,9 +1411,9 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
# Map of (user_id, device_id) -> bool. If there is an entry that implies
# the device exists.
- self.device_id_exists_cache = LruCache(
- cache_name="device_id_exists", max_size=10000
- )
+ self.device_id_exists_cache: LruCache[
+ Tuple[str, str], Literal[True]
+ ] = LruCache(cache_name="device_id_exists", max_size=10000)
async def store_device(
self,
@@ -1433,16 +1498,6 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
)
raise StoreError(500, "Problem storing device.")
- async def delete_device(self, user_id: str, device_id: str) -> None:
- """Delete a device and its device_inbox.
-
- Args:
- user_id: The ID of the user which owns the device
- device_id: The ID of the device to delete
- """
-
- await self.delete_devices(user_id, [device_id])
-
async def delete_devices(self, user_id: str, device_ids: List[str]) -> None:
"""Deletes several devices.
@@ -1670,7 +1725,7 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
context,
)
- async with self._device_list_id_gen.get_next_mult(
+ async with self._device_list_id_gen.get_next_mult( # type: ignore[attr-defined]
len(device_ids)
) as stream_ids:
await self.db_pool.runInteraction(
@@ -1723,7 +1778,7 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
device_ids: Iterable[str],
hosts: Collection[str],
stream_ids: List[int],
- context: Dict[str, str],
+ context: Optional[Dict[str, str]],
) -> None:
for host in hosts:
txn.call_after(
@@ -1894,7 +1949,7 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
[],
)
- async with self._device_list_id_gen.get_next_mult(len(hosts)) as stream_ids:
+ async with self._device_list_id_gen.get_next_mult(len(hosts)) as stream_ids: # type: ignore[attr-defined]
return await self.db_pool.runInteraction(
"add_device_list_outbound_pokes",
add_device_list_outbound_pokes_txn,
diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py
index b0199793..dd262703 100644
--- a/synapse/storage/databases/main/event_push_actions.py
+++ b/synapse/storage/databases/main/event_push_actions.py
@@ -17,6 +17,7 @@ from typing import TYPE_CHECKING, Dict, List, Optional, Tuple, Union, cast
import attr
+from synapse.api.constants import ReceiptTypes
from synapse.metrics.background_process_metrics import wrap_as_background_process
from synapse.storage._base import SQLBaseStore, db_to_json
from synapse.storage.database import (
@@ -24,6 +25,8 @@ from synapse.storage.database import (
LoggingDatabaseConnection,
LoggingTransaction,
)
+from synapse.storage.databases.main.receipts import ReceiptsWorkerStore
+from synapse.storage.databases.main.stream import StreamWorkerStore
from synapse.util import json_encoder
from synapse.util.caches.descriptors import cached
@@ -79,15 +82,15 @@ class UserPushAction(EmailPushAction):
profile_tag: str
-@attr.s(slots=True, frozen=True, auto_attribs=True)
+@attr.s(slots=True, auto_attribs=True)
class NotifCounts:
"""
The per-user, per-room count of notifications. Used by sync and push.
"""
- notify_count: int
- unread_count: int
- highlight_count: int
+ notify_count: int = 0
+ unread_count: int = 0
+ highlight_count: int = 0
def _serialize_action(actions: List[Union[dict, str]], is_highlight: bool) -> str:
@@ -119,7 +122,7 @@ def _deserialize_action(actions: str, is_highlight: bool) -> List[Union[dict, st
return DEFAULT_NOTIF_ACTION
-class EventPushActionsWorkerStore(SQLBaseStore):
+class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBaseStore):
def __init__(
self,
database: DatabasePool,
@@ -140,20 +143,27 @@ class EventPushActionsWorkerStore(SQLBaseStore):
self._find_stream_orderings_for_times, 10 * 60 * 1000
)
- self._rotate_delay = 3
self._rotate_count = 10000
self._doing_notif_rotation = False
if hs.config.worker.run_background_tasks:
self._rotate_notif_loop = self._clock.looping_call(
- self._rotate_notifs, 30 * 60 * 1000
+ self._rotate_notifs, 30 * 1000
)
- @cached(num_args=3, tree=True, max_entries=5000)
+ self.db_pool.updates.register_background_index_update(
+ "event_push_summary_unique_index",
+ index_name="event_push_summary_unique_index",
+ table="event_push_summary",
+ columns=["user_id", "room_id"],
+ unique=True,
+ replaces_index="event_push_summary_user_rm",
+ )
+
+ @cached(tree=True, max_entries=5000)
async def get_unread_event_push_actions_by_room_for_user(
self,
room_id: str,
user_id: str,
- last_read_event_id: Optional[str],
) -> NotifCounts:
"""Get the notification count, the highlight count and the unread message count
for a given user in a given room after the given read receipt.
@@ -165,8 +175,6 @@ class EventPushActionsWorkerStore(SQLBaseStore):
Args:
room_id: The room to retrieve the counts in.
user_id: The user to retrieve the counts for.
- last_read_event_id: The event associated with the latest read receipt for
- this user in this room. None if no receipt for this user in this room.
Returns
A dict containing the counts mentioned earlier in this docstring,
@@ -178,7 +186,6 @@ class EventPushActionsWorkerStore(SQLBaseStore):
self._get_unread_counts_by_receipt_txn,
room_id,
user_id,
- last_read_event_id,
)
def _get_unread_counts_by_receipt_txn(
@@ -186,16 +193,17 @@ class EventPushActionsWorkerStore(SQLBaseStore):
txn: LoggingTransaction,
room_id: str,
user_id: str,
- last_read_event_id: Optional[str],
) -> NotifCounts:
- stream_ordering = None
+ result = self.get_last_receipt_for_user_txn(
+ txn,
+ user_id,
+ room_id,
+ receipt_types=(ReceiptTypes.READ, ReceiptTypes.READ_PRIVATE),
+ )
- if last_read_event_id is not None:
- stream_ordering = self.get_stream_id_for_event_txn( # type: ignore[attr-defined]
- txn,
- last_read_event_id,
- allow_none=True,
- )
+ stream_ordering = None
+ if result:
+ _, stream_ordering = result
if stream_ordering is None:
# Either last_read_event_id is None, or it's an event we don't have (e.g.
@@ -209,7 +217,7 @@ class EventPushActionsWorkerStore(SQLBaseStore):
retcol="event_id",
)
- stream_ordering = self.get_stream_id_for_event_txn(txn, event_id) # type: ignore[attr-defined]
+ stream_ordering = self.get_stream_id_for_event_txn(txn, event_id)
return self._get_unread_counts_by_pos_txn(
txn, room_id, user_id, stream_ordering
@@ -218,49 +226,121 @@ class EventPushActionsWorkerStore(SQLBaseStore):
def _get_unread_counts_by_pos_txn(
self, txn: LoggingTransaction, room_id: str, user_id: str, stream_ordering: int
) -> NotifCounts:
- sql = (
- "SELECT"
- " COUNT(CASE WHEN notif = 1 THEN 1 END),"
- " COUNT(CASE WHEN highlight = 1 THEN 1 END),"
- " COUNT(CASE WHEN unread = 1 THEN 1 END)"
- " FROM event_push_actions ea"
- " WHERE user_id = ?"
- " AND room_id = ?"
- " AND stream_ordering > ?"
- )
-
- txn.execute(sql, (user_id, room_id, stream_ordering))
- row = txn.fetchone()
+ """Get the number of unread messages for a user/room that have happened
+ since the given stream ordering.
+ """
- (notif_count, highlight_count, unread_count) = (0, 0, 0)
-
- if row:
- (notif_count, highlight_count, unread_count) = row
+ counts = NotifCounts()
+ # First we pull the counts from the summary table.
+ #
+ # We check that `last_receipt_stream_ordering` matches the stream
+ # ordering given. If it doesn't match then a new read receipt has arrived and
+ # we haven't yet updated the counts in `event_push_summary` to reflect
+ # that; in that case we simply ignore `event_push_summary` counts
+ # and do a manual count of all of the rows in the `event_push_actions` table
+ # for this user/room.
+ #
+ # If `last_receipt_stream_ordering` is null then that means it's up to
+ # date (as the row was written by an older version of Synapse that
+ # updated `event_push_summary` synchronously when persisting a new read
+ # receipt).
txn.execute(
"""
- SELECT notif_count, unread_count FROM event_push_summary
- WHERE room_id = ? AND user_id = ? AND stream_ordering > ?
+ SELECT stream_ordering, notif_count, COALESCE(unread_count, 0)
+ FROM event_push_summary
+ WHERE room_id = ? AND user_id = ?
+ AND (
+ (last_receipt_stream_ordering IS NULL AND stream_ordering > ?)
+ OR last_receipt_stream_ordering = ?
+ )
""",
- (room_id, user_id, stream_ordering),
+ (room_id, user_id, stream_ordering, stream_ordering),
)
row = txn.fetchone()
+ summary_stream_ordering = 0
if row:
- notif_count += row[0]
-
- if row[1] is not None:
- # The unread_count column of event_push_summary is NULLable, so we need
- # to make sure we don't try increasing the unread counts if it's NULL
- # for this row.
- unread_count += row[1]
-
- return NotifCounts(
- notify_count=notif_count,
- unread_count=unread_count,
- highlight_count=highlight_count,
+ summary_stream_ordering = row[0]
+ counts.notify_count += row[1]
+ counts.unread_count += row[2]
+
+ # Next we need to count highlights, which aren't summarized
+ sql = """
+ SELECT COUNT(*) FROM event_push_actions
+ WHERE user_id = ?
+ AND room_id = ?
+ AND stream_ordering > ?
+ AND highlight = 1
+ """
+ txn.execute(sql, (user_id, room_id, stream_ordering))
+ row = txn.fetchone()
+ if row:
+ counts.highlight_count += row[0]
+
+ # Finally we need to count push actions that aren't included in the
+ # summary returned above, e.g. recent events that haven't been
+ # summarized yet, or the summary is empty due to a recent read receipt.
+ stream_ordering = max(stream_ordering, summary_stream_ordering)
+ notify_count, unread_count = self._get_notif_unread_count_for_user_room(
+ txn, room_id, user_id, stream_ordering
)
+ counts.notify_count += notify_count
+ counts.unread_count += unread_count
+
+ return counts
+
+ def _get_notif_unread_count_for_user_room(
+ self,
+ txn: LoggingTransaction,
+ room_id: str,
+ user_id: str,
+ stream_ordering: int,
+ max_stream_ordering: Optional[int] = None,
+ ) -> Tuple[int, int]:
+ """Returns the notify and unread counts from `event_push_actions` for
+ the given user/room in the given range.
+
+ Does not consult `event_push_summary` table, which may include push
+ actions that have been deleted from `event_push_actions` table.
+ """
+
+ # If there have been no events in the room since the stream ordering,
+ # there can't be any push actions either.
+ if not self._events_stream_cache.has_entity_changed(room_id, stream_ordering):
+ return 0, 0
+
+ clause = ""
+ args = [user_id, room_id, stream_ordering]
+ if max_stream_ordering is not None:
+ clause = "AND ea.stream_ordering <= ?"
+ args.append(max_stream_ordering)
+
+ # If the max stream ordering is less than the min stream ordering,
+ # then obviously there are zero push actions in that range.
+ if max_stream_ordering <= stream_ordering:
+ return 0, 0
+
+ sql = f"""
+ SELECT
+ COUNT(CASE WHEN notif = 1 THEN 1 END),
+ COUNT(CASE WHEN unread = 1 THEN 1 END)
+ FROM event_push_actions ea
+ WHERE user_id = ?
+ AND room_id = ?
+ AND ea.stream_ordering > ?
+ {clause}
+ """
+
+ txn.execute(sql, args)
+ row = txn.fetchone()
+
+ if row:
+ return cast(Tuple[int, int], row)
+
+ return 0, 0
+
async def get_push_action_users_in_range(
self, min_stream_ordering: int, max_stream_ordering: int
) -> List[str]:
@@ -745,6 +825,19 @@ class EventPushActionsWorkerStore(SQLBaseStore):
self._doing_notif_rotation = True
try:
+ # First we recalculate push summaries and delete stale push actions
+ # for rooms/users with new receipts.
+ while True:
+ logger.debug("Handling new receipts")
+
+ caught_up = await self.db_pool.runInteraction(
+ "_handle_new_receipts_for_notifs_txn",
+ self._handle_new_receipts_for_notifs_txn,
+ )
+ if caught_up:
+ break
+
+ # Then we update the event push summaries for any new events
while True:
logger.info("Rotating notifications")
@@ -753,10 +846,115 @@ class EventPushActionsWorkerStore(SQLBaseStore):
)
if caught_up:
break
- await self.hs.get_clock().sleep(self._rotate_delay)
+
+ # Finally we clear out old event push actions.
+ await self._remove_old_push_actions_that_have_rotated()
finally:
self._doing_notif_rotation = False
+ def _handle_new_receipts_for_notifs_txn(self, txn: LoggingTransaction) -> bool:
+ """Check for new read receipts and delete from event push actions.
+
+ Any push actions which predate the user's most recent read receipt are
+ now redundant, so we can remove them from `event_push_actions` and
+ update `event_push_summary`.
+ """
+
+ limit = 100
+
+ min_receipts_stream_id = self.db_pool.simple_select_one_onecol_txn(
+ txn,
+ table="event_push_summary_last_receipt_stream_id",
+ keyvalues={},
+ retcol="stream_id",
+ )
+
+ max_receipts_stream_id = self._receipts_id_gen.get_current_token()
+
+ sql = """
+ SELECT r.stream_id, r.room_id, r.user_id, e.stream_ordering
+ FROM receipts_linearized AS r
+ INNER JOIN events AS e USING (event_id)
+ WHERE ? < r.stream_id AND r.stream_id <= ? AND user_id LIKE ?
+ ORDER BY r.stream_id ASC
+ LIMIT ?
+ """
+
+ # We only want local users, so we add a dodgy filter to the above query
+ # and recheck it below.
+ user_filter = "%:" + self.hs.hostname
+
+ txn.execute(
+ sql,
+ (
+ min_receipts_stream_id,
+ max_receipts_stream_id,
+ user_filter,
+ limit,
+ ),
+ )
+ rows = txn.fetchall()
+
+ old_rotate_stream_ordering = self.db_pool.simple_select_one_onecol_txn(
+ txn,
+ table="event_push_summary_stream_ordering",
+ keyvalues={},
+ retcol="stream_ordering",
+ )
+
+ # For each new read receipt we delete push actions from before it and
+ # recalculate the summary.
+ for _, room_id, user_id, stream_ordering in rows:
+ # Only handle our own read receipts.
+ if not self.hs.is_mine_id(user_id):
+ continue
+
+ txn.execute(
+ """
+ DELETE FROM event_push_actions
+ WHERE room_id = ?
+ AND user_id = ?
+ AND stream_ordering <= ?
+ AND highlight = 0
+ """,
+ (room_id, user_id, stream_ordering),
+ )
+
+ notif_count, unread_count = self._get_notif_unread_count_for_user_room(
+ txn, room_id, user_id, stream_ordering, old_rotate_stream_ordering
+ )
+
+ self.db_pool.simple_upsert_txn(
+ txn,
+ table="event_push_summary",
+ keyvalues={"room_id": room_id, "user_id": user_id},
+ values={
+ "notif_count": notif_count,
+ "unread_count": unread_count,
+ "stream_ordering": old_rotate_stream_ordering,
+ "last_receipt_stream_ordering": stream_ordering,
+ },
+ )
+
+ # We always update `event_push_summary_last_receipt_stream_id` to
+ # ensure that we don't rescan the same receipts for remote users.
+
+ upper_limit = max_receipts_stream_id
+ if len(rows) >= limit:
+ # If we pulled out a limited number of rows we only update the
+ # position to the last receipt we processed, so we continue
+ # processing the rest next iteration.
+ upper_limit = rows[-1][0]
+
+ self.db_pool.simple_update_txn(
+ txn,
+ table="event_push_summary_last_receipt_stream_id",
+ keyvalues={},
+ updatevalues={"stream_id": upper_limit},
+ )
+
+ return len(rows) < limit
+
def _rotate_notifs_txn(self, txn: LoggingTransaction) -> bool:
"""Archives older notifications into event_push_summary. Returns whether
the archiving process has caught up or not.
@@ -782,20 +980,21 @@ class EventPushActionsWorkerStore(SQLBaseStore):
stream_row = txn.fetchone()
if stream_row:
(offset_stream_ordering,) = stream_row
- assert self.stream_ordering_day_ago is not None
+
+ # We need to bound by the current token to ensure that we handle
+ # out-of-order writes correctly.
rotate_to_stream_ordering = min(
- self.stream_ordering_day_ago, offset_stream_ordering
+ offset_stream_ordering, self._stream_id_gen.get_current_token()
)
- caught_up = offset_stream_ordering >= self.stream_ordering_day_ago
+ caught_up = False
else:
- rotate_to_stream_ordering = self.stream_ordering_day_ago
+ rotate_to_stream_ordering = self._stream_id_gen.get_current_token()
caught_up = True
logger.info("Rotating notifications up to: %s", rotate_to_stream_ordering)
self._rotate_notifs_before_txn(txn, rotate_to_stream_ordering)
- # We have caught up iff we were limited by `stream_ordering_day_ago`
return caught_up
def _rotate_notifs_before_txn(
@@ -812,14 +1011,17 @@ class EventPushActionsWorkerStore(SQLBaseStore):
sql = """
SELECT user_id, room_id,
coalesce(old.%s, 0) + upd.cnt,
- upd.stream_ordering,
- old.user_id
+ upd.stream_ordering
FROM (
SELECT user_id, room_id, count(*) as cnt,
- max(stream_ordering) as stream_ordering
- FROM event_push_actions
- WHERE ? <= stream_ordering AND stream_ordering < ?
- AND highlight = 0
+ max(ea.stream_ordering) as stream_ordering
+ FROM event_push_actions AS ea
+ LEFT JOIN event_push_summary AS old USING (user_id, room_id)
+ WHERE ? < ea.stream_ordering AND ea.stream_ordering <= ?
+ AND (
+ old.last_receipt_stream_ordering IS NULL
+ OR old.last_receipt_stream_ordering < ea.stream_ordering
+ )
AND %s = 1
GROUP BY user_id, room_id
) AS upd
@@ -842,7 +1044,6 @@ class EventPushActionsWorkerStore(SQLBaseStore):
summaries[(row[0], row[1])] = _EventPushSummary(
unread_count=row[2],
stream_ordering=row[3],
- old_user_id=row[4],
notif_count=0,
)
@@ -863,115 +1064,95 @@ class EventPushActionsWorkerStore(SQLBaseStore):
summaries[(row[0], row[1])] = _EventPushSummary(
unread_count=0,
stream_ordering=row[3],
- old_user_id=row[4],
notif_count=row[2],
)
logger.info("Rotating notifications, handling %d rows", len(summaries))
- # If the `old.user_id` above is NULL then we know there isn't already an
- # entry in the table, so we simply insert it. Otherwise we update the
- # existing table.
- self.db_pool.simple_insert_many_txn(
+ self.db_pool.simple_upsert_many_txn(
txn,
table="event_push_summary",
- keys=(
- "user_id",
- "room_id",
- "notif_count",
- "unread_count",
- "stream_ordering",
- ),
- values=[
+ key_names=("user_id", "room_id"),
+ key_values=[(user_id, room_id) for user_id, room_id in summaries],
+ value_names=("notif_count", "unread_count", "stream_ordering"),
+ value_values=[
(
- user_id,
- room_id,
summary.notif_count,
summary.unread_count,
summary.stream_ordering,
)
- for ((user_id, room_id), summary) in summaries.items()
- if summary.old_user_id is None
+ for summary in summaries.values()
],
)
- txn.execute_batch(
- """
- UPDATE event_push_summary
- SET notif_count = ?, unread_count = ?, stream_ordering = ?
- WHERE user_id = ? AND room_id = ?
- """,
- (
- (
- summary.notif_count,
- summary.unread_count,
- summary.stream_ordering,
- user_id,
- room_id,
- )
- for ((user_id, room_id), summary) in summaries.items()
- if summary.old_user_id is not None
- ),
- )
-
- txn.execute(
- "DELETE FROM event_push_actions"
- " WHERE ? <= stream_ordering AND stream_ordering < ? AND highlight = 0",
- (old_rotate_stream_ordering, rotate_to_stream_ordering),
- )
-
- logger.info("Rotating notifications, deleted %s push actions", txn.rowcount)
-
txn.execute(
"UPDATE event_push_summary_stream_ordering SET stream_ordering = ?",
(rotate_to_stream_ordering,),
)
- def _remove_old_push_actions_before_txn(
- self, txn: LoggingTransaction, room_id: str, user_id: str, stream_ordering: int
+ async def _remove_old_push_actions_that_have_rotated(
+ self,
) -> None:
- """
- Purges old push actions for a user and room before a given
- stream_ordering.
+ """Clear out old push actions that have been summarized."""
- We however keep a months worth of highlighted notifications, so that
- users can still get a list of recent highlights.
-
- Args:
- txn: The transaction
- room_id: Room ID to delete from
- user_id: user ID to delete for
- stream_ordering: The lowest stream ordering which will
- not be deleted.
- """
- txn.call_after(
- self.get_unread_event_push_actions_by_room_for_user.invalidate,
- (room_id, user_id),
+ # We want to clear out anything that older than a day that *has* already
+ # been rotated.
+ rotated_upto_stream_ordering = await self.db_pool.simple_select_one_onecol(
+ table="event_push_summary_stream_ordering",
+ keyvalues={},
+ retcol="stream_ordering",
)
- # We need to join on the events table to get the received_ts for
- # event_push_actions and sqlite won't let us use a join in a delete so
- # we can't just delete where received_ts < x. Furthermore we can
- # only identify event_push_actions by a tuple of room_id, event_id
- # we we can't use a subquery.
- # Instead, we look up the stream ordering for the last event in that
- # room received before the threshold time and delete event_push_actions
- # in the room with a stream_odering before that.
- txn.execute(
- "DELETE FROM event_push_actions "
- " WHERE user_id = ? AND room_id = ? AND "
- " stream_ordering <= ?"
- " AND ((stream_ordering < ? AND highlight = 1) or highlight = 0)",
- (user_id, room_id, stream_ordering, self.stream_ordering_month_ago),
+ max_stream_ordering_to_delete = min(
+ rotated_upto_stream_ordering, self.stream_ordering_day_ago
)
- txn.execute(
- """
- DELETE FROM event_push_summary
- WHERE room_id = ? AND user_id = ? AND stream_ordering <= ?
- """,
- (room_id, user_id, stream_ordering),
- )
+ def remove_old_push_actions_that_have_rotated_txn(
+ txn: LoggingTransaction,
+ ) -> bool:
+ # We don't want to clear out too much at a time, so we bound our
+ # deletes.
+ batch_size = self._rotate_count
+
+ txn.execute(
+ """
+ SELECT stream_ordering FROM event_push_actions
+ WHERE stream_ordering <= ? AND highlight = 0
+ ORDER BY stream_ordering ASC LIMIT 1 OFFSET ?
+ """,
+ (
+ max_stream_ordering_to_delete,
+ batch_size,
+ ),
+ )
+ stream_row = txn.fetchone()
+
+ if stream_row:
+ (stream_ordering,) = stream_row
+ else:
+ stream_ordering = max_stream_ordering_to_delete
+
+ # We need to use a inclusive bound here to handle the case where a
+ # single stream ordering has more than `batch_size` rows.
+ txn.execute(
+ """
+ DELETE FROM event_push_actions
+ WHERE stream_ordering <= ? AND highlight = 0
+ """,
+ (stream_ordering,),
+ )
+
+ logger.info("Rotating notifications, deleted %s push actions", txn.rowcount)
+
+ return txn.rowcount < batch_size
+
+ while True:
+ done = await self.db_pool.runInteraction(
+ "_remove_old_push_actions_that_have_rotated",
+ remove_old_push_actions_that_have_rotated_txn,
+ )
+ if done:
+ break
class EventPushActionsStore(EventPushActionsWorkerStore):
@@ -1000,6 +1181,16 @@ class EventPushActionsStore(EventPushActionsWorkerStore):
where_clause="highlight=1",
)
+ # Add index to make deleting old push actions faster.
+ self.db_pool.updates.register_background_index_update(
+ "event_push_actions_stream_highlight_index",
+ index_name="event_push_actions_stream_highlight_index",
+ table="event_push_actions",
+ columns=["highlight", "stream_ordering"],
+ where_clause="highlight=0",
+ psql_only=True,
+ )
+
async def get_push_actions_for_user(
self,
user_id: str,
@@ -1075,5 +1266,4 @@ class _EventPushSummary:
unread_count: int
stream_ordering: int
- old_user_id: str
notif_count: int
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index 17e35cf6..eb4efbb9 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -16,6 +16,7 @@
import itertools
import logging
from collections import OrderedDict
+from http import HTTPStatus
from typing import (
TYPE_CHECKING,
Any,
@@ -35,6 +36,7 @@ from prometheus_client import Counter
import synapse.metrics
from synapse.api.constants import EventContentFields, EventTypes, RelationTypes
+from synapse.api.errors import Codes, SynapseError
from synapse.api.room_versions import RoomVersions
from synapse.events import EventBase, relation_from_event
from synapse.events.snapshot import EventContext
@@ -46,7 +48,7 @@ from synapse.storage.database import (
)
from synapse.storage.databases.main.events_worker import EventCacheEntry
from synapse.storage.databases.main.search import SearchEntry
-from synapse.storage.engines.postgres import PostgresEngine
+from synapse.storage.engines import PostgresEngine
from synapse.storage.util.id_generators import AbstractStreamIdGenerator
from synapse.storage.util.sequence import SequenceGenerator
from synapse.types import JsonDict, StateMap, get_domain_from_id
@@ -69,6 +71,24 @@ event_counter = Counter(
)
+class PartialStateConflictError(SynapseError):
+ """An internal error raised when attempting to persist an event with partial state
+ after the room containing the event has been un-partial stated.
+
+ This error should be handled by recomputing the event context and trying again.
+
+ This error has an HTTP status code so that it can be transported over replication.
+ It should not be exposed to clients.
+ """
+
+ def __init__(self) -> None:
+ super().__init__(
+ HTTPStatus.CONFLICT,
+ msg="Cannot persist partial state event in un-partial stated room",
+ errcode=Codes.UNKNOWN,
+ )
+
+
@attr.s(slots=True, auto_attribs=True)
class DeltaState:
"""Deltas to use to update the `current_state_events` table.
@@ -154,6 +174,10 @@ class PersistEventsStore:
Returns:
Resolves when the events have been persisted
+
+ Raises:
+ PartialStateConflictError: if attempting to persist a partial state event in
+ a room that has been un-partial stated.
"""
# We want to calculate the stream orderings as late as possible, as
@@ -354,6 +378,9 @@ class PersistEventsStore:
For each room, a list of the event ids which are the forward
extremities.
+ Raises:
+ PartialStateConflictError: if attempting to persist a partial state event in
+ a room that has been un-partial stated.
"""
state_delta_for_room = state_delta_for_room or {}
new_forward_extremities = new_forward_extremities or {}
@@ -980,16 +1007,16 @@ class PersistEventsStore:
self,
room_id: str,
state_delta: DeltaState,
- stream_id: int,
) -> None:
"""Update the current state stored in the datatabase for the given room"""
- await self.db_pool.runInteraction(
- "update_current_state",
- self._update_current_state_txn,
- state_delta_by_room={room_id: state_delta},
- stream_id=stream_id,
- )
+ async with self._stream_id_gen.get_next() as stream_ordering:
+ await self.db_pool.runInteraction(
+ "update_current_state",
+ self._update_current_state_txn,
+ state_delta_by_room={room_id: state_delta},
+ stream_id=stream_ordering,
+ )
def _update_current_state_txn(
self,
@@ -1304,6 +1331,10 @@ class PersistEventsStore:
Returns:
new list, without events which are already in the events table.
+
+ Raises:
+ PartialStateConflictError: if attempting to persist a partial state event in
+ a room that has been un-partial stated.
"""
txn.execute(
"SELECT event_id, outlier FROM events WHERE event_id in (%s)"
@@ -1766,6 +1797,18 @@ class PersistEventsStore:
self.store.get_invited_rooms_for_local_user.invalidate,
(event.state_key,),
)
+ txn.call_after(
+ self.store.get_local_users_in_room.invalidate,
+ (event.room_id,),
+ )
+ txn.call_after(
+ self.store.get_number_joined_users_in_room.invalidate,
+ (event.room_id,),
+ )
+ txn.call_after(
+ self.store.get_user_in_room_with_profile.invalidate,
+ (event.room_id, event.state_key),
+ )
# The `_get_membership_from_event_id` is immutable, except for the
# case where we look up an event *before* persisting it.
@@ -2215,6 +2258,11 @@ class PersistEventsStore:
txn: LoggingTransaction,
events_and_contexts: Collection[Tuple[EventBase, EventContext]],
) -> None:
+ """
+ Raises:
+ PartialStateConflictError: if attempting to persist a partial state event in
+ a room that has been un-partial stated.
+ """
state_groups = {}
for event, context in events_and_contexts:
if event.internal_metadata.is_outlier():
@@ -2239,19 +2287,37 @@ class PersistEventsStore:
# if we have partial state for these events, record the fact. (This happens
# here rather than in _store_event_txn because it also needs to happen when
# we de-outlier an event.)
- self.db_pool.simple_insert_many_txn(
- txn,
- table="partial_state_events",
- keys=("room_id", "event_id"),
- values=[
- (
- event.room_id,
- event.event_id,
- )
- for event, ctx in events_and_contexts
- if ctx.partial_state
- ],
- )
+ try:
+ self.db_pool.simple_insert_many_txn(
+ txn,
+ table="partial_state_events",
+ keys=("room_id", "event_id"),
+ values=[
+ (
+ event.room_id,
+ event.event_id,
+ )
+ for event, ctx in events_and_contexts
+ if ctx.partial_state
+ ],
+ )
+ except self.db_pool.engine.module.IntegrityError:
+ logger.info(
+ "Cannot persist events %s in rooms %s: room has been un-partial stated",
+ [
+ event.event_id
+ for event, ctx in events_and_contexts
+ if ctx.partial_state
+ ],
+ list(
+ {
+ event.room_id
+ for event, ctx in events_and_contexts
+ if ctx.partial_state
+ }
+ ),
+ )
+ raise PartialStateConflictError()
self.db_pool.simple_upsert_many_txn(
txn,
@@ -2296,11 +2362,9 @@ class PersistEventsStore:
self.db_pool.simple_insert_many_txn(
txn,
table="event_edges",
- keys=("event_id", "prev_event_id", "room_id", "is_state"),
+ keys=("event_id", "prev_event_id"),
values=[
- (ev.event_id, e_id, ev.room_id, False)
- for ev in events
- for e_id in ev.prev_event_ids()
+ (ev.event_id, e_id) for ev in events for e_id in ev.prev_event_ids()
],
)
diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py
index d5f00596..eeca85fc 100644
--- a/synapse/storage/databases/main/events_bg_updates.py
+++ b/synapse/storage/databases/main/events_bg_updates.py
@@ -1,4 +1,4 @@
-# Copyright 2019-2021 The Matrix.org Foundation C.I.C.
+# Copyright 2019-2022 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -64,6 +64,9 @@ class _BackgroundUpdates:
INDEX_STREAM_ORDERING2_TS = "index_stream_ordering2_ts"
REPLACE_STREAM_ORDERING_COLUMN = "replace_stream_ordering_column"
+ EVENT_EDGES_DROP_INVALID_ROWS = "event_edges_drop_invalid_rows"
+ EVENT_EDGES_REPLACE_INDEX = "event_edges_replace_index"
+
@attr.s(slots=True, frozen=True, auto_attribs=True)
class _CalculateChainCover:
@@ -177,11 +180,6 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
self._purged_chain_cover_index,
)
- # The event_thread_relation background update was replaced with the
- # event_arbitrary_relations one, which handles any relation to avoid
- # needed to potentially crawl the entire events table in the future.
- self.db_pool.updates.register_noop_background_update("event_thread_relation")
-
self.db_pool.updates.register_background_update_handler(
"event_arbitrary_relations",
self._event_arbitrary_relations,
@@ -240,6 +238,21 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
################################################################################
+ self.db_pool.updates.register_background_update_handler(
+ _BackgroundUpdates.EVENT_EDGES_DROP_INVALID_ROWS,
+ self._background_drop_invalid_event_edges_rows,
+ )
+
+ self.db_pool.updates.register_background_index_update(
+ _BackgroundUpdates.EVENT_EDGES_REPLACE_INDEX,
+ index_name="event_edges_event_id_prev_event_id_idx",
+ table="event_edges",
+ columns=["event_id", "prev_event_id"],
+ unique=True,
+ # the old index which just covered event_id is now redundant.
+ replaces_index="ev_edges_id",
+ )
+
async def _background_reindex_fields_sender(
self, progress: JsonDict, batch_size: int
) -> int:
@@ -1290,3 +1303,99 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
)
return 0
+
+ async def _background_drop_invalid_event_edges_rows(
+ self, progress: JsonDict, batch_size: int
+ ) -> int:
+ """Drop invalid rows from event_edges
+
+ This only runs for postgres. For SQLite, it all happens synchronously.
+
+ Firstly, drop any rows with is_state=True. These may have been added a long time
+ ago, but they are no longer used.
+
+ We also drop rows that do not correspond to entries in `events`, and add a
+ foreign key.
+ """
+
+ last_event_id = progress.get("last_event_id", "")
+
+ def drop_invalid_event_edges_txn(txn: LoggingTransaction) -> bool:
+ """Returns True if we're done."""
+
+ # first we need to find an endpoint.
+ txn.execute(
+ """
+ SELECT event_id FROM event_edges
+ WHERE event_id > ?
+ ORDER BY event_id
+ LIMIT 1 OFFSET ?
+ """,
+ (last_event_id, batch_size),
+ )
+
+ endpoint = None
+ row = txn.fetchone()
+
+ if row:
+ endpoint = row[0]
+
+ where_clause = "ee.event_id > ?"
+ args = [last_event_id]
+ if endpoint:
+ where_clause += " AND ee.event_id <= ?"
+ args.append(endpoint)
+
+ # now delete any that:
+ # - have is_state=TRUE, or
+ # - do not correspond to a row in `events`
+ txn.execute(
+ f"""
+ DELETE FROM event_edges
+ WHERE event_id IN (
+ SELECT ee.event_id
+ FROM event_edges ee
+ LEFT JOIN events ev USING (event_id)
+ WHERE ({where_clause}) AND
+ (is_state OR ev.event_id IS NULL)
+ )""",
+ args,
+ )
+
+ logger.info(
+ "cleaned up event_edges up to %s: removed %i/%i rows",
+ endpoint,
+ txn.rowcount,
+ batch_size,
+ )
+
+ if endpoint is not None:
+ self.db_pool.updates._background_update_progress_txn(
+ txn,
+ _BackgroundUpdates.EVENT_EDGES_DROP_INVALID_ROWS,
+ {"last_event_id": endpoint},
+ )
+ return False
+
+ # if that was the final batch, we validate the foreign key.
+ #
+ # The constraint should have been in place and enforced for new rows since
+ # before we started deleting invalid rows, so there's no chance for any
+ # invalid rows to have snuck in the meantime. In other words, this really
+ # ought to succeed.
+ logger.info("cleaned up event_edges; enabling foreign key")
+ txn.execute(
+ "ALTER TABLE event_edges VALIDATE CONSTRAINT event_edges_event_id_fkey"
+ )
+ return True
+
+ done = await self.db_pool.runInteraction(
+ desc="drop_invalid_event_edges", func=drop_invalid_event_edges_txn
+ )
+
+ if done:
+ await self.db_pool.updates._end_background_update(
+ _BackgroundUpdates.EVENT_EDGES_DROP_INVALID_ROWS
+ )
+
+ return batch_size
diff --git a/synapse/storage/databases/main/group_server.py b/synapse/storage/databases/main/group_server.py
deleted file mode 100644
index c15a7136..00000000
--- a/synapse/storage/databases/main/group_server.py
+++ /dev/null
@@ -1,34 +0,0 @@
-# Copyright 2017 Vector Creations Ltd
-# Copyright 2018 New Vector Ltd
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from typing import TYPE_CHECKING
-
-from synapse.storage._base import SQLBaseStore
-from synapse.storage.database import DatabasePool, LoggingDatabaseConnection
-
-if TYPE_CHECKING:
- from synapse.server import HomeServer
-
-
-class GroupServerStore(SQLBaseStore):
- def __init__(
- self,
- database: DatabasePool,
- db_conn: LoggingDatabaseConnection,
- hs: "HomeServer",
- ):
- # Register a legacy groups background update as a no-op.
- database.updates.register_noop_background_update("local_group_updates_index")
- super().__init__(database, db_conn, hs)
diff --git a/synapse/storage/databases/main/media_repository.py b/synapse/storage/databases/main/media_repository.py
index d028be16..9b172a64 100644
--- a/synapse/storage/databases/main/media_repository.py
+++ b/synapse/storage/databases/main/media_repository.py
@@ -37,9 +37,6 @@ from synapse.types import JsonDict, UserID
if TYPE_CHECKING:
from synapse.server import HomeServer
-BG_UPDATE_REMOVE_MEDIA_REPO_INDEX_WITHOUT_METHOD = (
- "media_repository_drop_index_wo_method"
-)
BG_UPDATE_REMOVE_MEDIA_REPO_INDEX_WITHOUT_METHOD_2 = (
"media_repository_drop_index_wo_method_2"
)
@@ -111,13 +108,6 @@ class MediaRepositoryBackgroundUpdateStore(SQLBaseStore):
unique=True,
)
- # the original impl of _drop_media_index_without_method was broken (see
- # https://github.com/matrix-org/synapse/issues/8649), so we replace the original
- # impl with a no-op and run the fixed migration as
- # media_repository_drop_index_wo_method_2.
- self.db_pool.updates.register_noop_background_update(
- BG_UPDATE_REMOVE_MEDIA_REPO_INDEX_WITHOUT_METHOD
- )
self.db_pool.updates.register_background_update_handler(
BG_UPDATE_REMOVE_MEDIA_REPO_INDEX_WITHOUT_METHOD_2,
self._drop_media_index_without_method,
diff --git a/synapse/storage/databases/main/purge_events.py b/synapse/storage/databases/main/purge_events.py
index ba385f9f..87b0d090 100644
--- a/synapse/storage/databases/main/purge_events.py
+++ b/synapse/storage/databases/main/purge_events.py
@@ -214,10 +214,10 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
# Delete all remote non-state events
for table in (
+ "event_edges",
"events",
"event_json",
"event_auth",
- "event_edges",
"event_forward_extremities",
"event_relations",
"event_search",
diff --git a/synapse/storage/databases/main/push_rule.py b/synapse/storage/databases/main/push_rule.py
index d5aefe02..86649c1e 100644
--- a/synapse/storage/databases/main/push_rule.py
+++ b/synapse/storage/databases/main/push_rule.py
@@ -110,9 +110,9 @@ def _load_rules(
# the abstract methods being implemented.
class PushRulesWorkerStore(
ApplicationServiceWorkerStore,
- ReceiptsWorkerStore,
PusherWorkerStore,
RoomMemberWorkerStore,
+ ReceiptsWorkerStore,
EventsWorkerStore,
SQLBaseStore,
metaclass=abc.ABCMeta,
diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py
index 21e954cc..0090c9f2 100644
--- a/synapse/storage/databases/main/receipts.py
+++ b/synapse/storage/databases/main/receipts.py
@@ -26,7 +26,7 @@ from typing import (
cast,
)
-from synapse.api.constants import EduTypes, ReceiptTypes
+from synapse.api.constants import EduTypes
from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker
from synapse.replication.tcp.streams import ReceiptsStream
from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
@@ -36,6 +36,7 @@ from synapse.storage.database import (
LoggingTransaction,
)
from synapse.storage.engines import PostgresEngine
+from synapse.storage.engines._base import IsolationLevel
from synapse.storage.util.id_generators import (
AbstractStreamIdTracker,
MultiWriterIdGenerator,
@@ -117,7 +118,7 @@ class ReceiptsWorkerStore(SQLBaseStore):
return self._receipts_id_gen.get_current_token()
async def get_last_receipt_event_id_for_user(
- self, user_id: str, room_id: str, receipt_types: Iterable[str]
+ self, user_id: str, room_id: str, receipt_types: Collection[str]
) -> Optional[str]:
"""
Fetch the event ID for the latest receipt in a room with one of the given receipt types.
@@ -125,58 +126,63 @@ class ReceiptsWorkerStore(SQLBaseStore):
Args:
user_id: The user to fetch receipts for.
room_id: The room ID to fetch the receipt for.
- receipt_type: The receipt types to fetch. Earlier receipt types
- are given priority if multiple receipts point to the same event.
+ receipt_type: The receipt types to fetch.
Returns:
The latest receipt, if one exists.
"""
- latest_event_id: Optional[str] = None
- latest_stream_ordering = 0
- for receipt_type in receipt_types:
- result = await self._get_last_receipt_event_id_for_user(
- user_id, room_id, receipt_type
- )
- if result is None:
- continue
- event_id, stream_ordering = result
-
- if latest_event_id is None or latest_stream_ordering < stream_ordering:
- latest_event_id = event_id
- latest_stream_ordering = stream_ordering
+ result = await self.db_pool.runInteraction(
+ "get_last_receipt_event_id_for_user",
+ self.get_last_receipt_for_user_txn,
+ user_id,
+ room_id,
+ receipt_types,
+ )
+ if not result:
+ return None
- return latest_event_id
+ event_id, _ = result
+ return event_id
- @cached()
- async def _get_last_receipt_event_id_for_user(
- self, user_id: str, room_id: str, receipt_type: str
+ def get_last_receipt_for_user_txn(
+ self,
+ txn: LoggingTransaction,
+ user_id: str,
+ room_id: str,
+ receipt_types: Collection[str],
) -> Optional[Tuple[str, int]]:
"""
- Fetch the event ID and stream ordering for the latest receipt.
+ Fetch the event ID and stream_ordering for the latest receipt in a room
+ with one of the given receipt types.
Args:
user_id: The user to fetch receipts for.
room_id: The room ID to fetch the receipt for.
- receipt_type: The receipt type to fetch.
+ receipt_type: The receipt types to fetch.
Returns:
- The event ID and stream ordering of the latest receipt, if one exists;
- otherwise `None`.
+ The latest receipt, if one exists.
"""
- sql = """
+
+ clause, args = make_in_list_sql_clause(
+ self.database_engine, "receipt_type", receipt_types
+ )
+
+ sql = f"""
SELECT event_id, stream_ordering
FROM receipts_linearized
INNER JOIN events USING (room_id, event_id)
- WHERE user_id = ?
+ WHERE {clause}
+ AND user_id = ?
AND room_id = ?
- AND receipt_type = ?
+ ORDER BY stream_ordering DESC
+ LIMIT 1
"""
- def f(txn: LoggingTransaction) -> Optional[Tuple[str, int]]:
- txn.execute(sql, (user_id, room_id, receipt_type))
- return cast(Optional[Tuple[str, int]], txn.fetchone())
+ args.extend((user_id, room_id))
+ txn.execute(sql, args)
- return await self.db_pool.runInteraction("get_own_receipt_for_user", f)
+ return cast(Optional[Tuple[str, int]], txn.fetchone())
async def get_receipts_for_user(
self, user_id: str, receipt_types: Iterable[str]
@@ -576,8 +582,11 @@ class ReceiptsWorkerStore(SQLBaseStore):
) -> None:
self._get_receipts_for_user_with_orderings.invalidate((user_id, receipt_type))
self._get_linearized_receipts_for_room.invalidate((room_id,))
- self._get_last_receipt_event_id_for_user.invalidate(
- (user_id, room_id, receipt_type)
+
+ # We use this method to invalidate so that we don't end up with circular
+ # dependencies between the receipts and push action stores.
+ self._attempt_to_invalidate_cache(
+ "get_unread_event_push_actions_by_room_for_user", (room_id,)
)
def process_replication_rows(
@@ -673,17 +682,6 @@ class ReceiptsWorkerStore(SQLBaseStore):
lock=False,
)
- # When updating a local users read receipt, remove any push actions
- # which resulted from the receipt's event and all earlier events.
- if (
- self.hs.is_mine_id(user_id)
- and receipt_type in (ReceiptTypes.READ, ReceiptTypes.READ_PRIVATE)
- and stream_ordering is not None
- ):
- self._remove_old_push_actions_before_txn( # type: ignore[attr-defined]
- txn, room_id=room_id, user_id=user_id, stream_ordering=stream_ordering
- )
-
return rx_ts
def _graph_to_linear(
@@ -764,6 +762,10 @@ class ReceiptsWorkerStore(SQLBaseStore):
linearized_event_id,
data,
stream_id=stream_id,
+ # Read committed is actually beneficial here because we check for a receipt with
+ # greater stream order, and checking the very latest data at select time is better
+ # than the data at transaction start time.
+ isolation_level=IsolationLevel.READ_COMMITTED,
)
# If the receipt was older than the currently persisted one, nothing to do.
diff --git a/synapse/storage/databases/main/registration.py b/synapse/storage/databases/main/registration.py
index 4991360b..cb63cd9b 100644
--- a/synapse/storage/databases/main/registration.py
+++ b/synapse/storage/databases/main/registration.py
@@ -1805,21 +1805,10 @@ class RegistrationBackgroundUpdateStore(RegistrationWorkerStore):
columns=["creation_ts"],
)
- # we no longer use refresh tokens, but it's possible that some people
- # might have a background update queued to build this index. Just
- # clear the background update.
- self.db_pool.updates.register_noop_background_update(
- "refresh_tokens_device_index"
- )
-
self.db_pool.updates.register_background_update_handler(
"users_set_deactivated_flag", self._background_update_set_deactivated_flag
)
- self.db_pool.updates.register_noop_background_update(
- "user_threepids_grandfather"
- )
-
self.db_pool.updates.register_background_index_update(
"user_external_ids_user_id_idx",
index_name="user_external_ids_user_id_idx",
diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py
index 68d4fc2e..13d6a1d5 100644
--- a/synapse/storage/databases/main/room.py
+++ b/synapse/storage/databases/main/room.py
@@ -32,12 +32,17 @@ from typing import (
import attr
-from synapse.api.constants import EventContentFields, EventTypes, JoinRules
+from synapse.api.constants import (
+ EventContentFields,
+ EventTypes,
+ JoinRules,
+ PublicRoomsFilterFields,
+)
from synapse.api.errors import StoreError
from synapse.api.room_versions import RoomVersion, RoomVersions
from synapse.config.homeserver import HomeServerConfig
from synapse.events import EventBase
-from synapse.storage._base import SQLBaseStore, db_to_json
+from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
from synapse.storage.database import (
DatabasePool,
LoggingDatabaseConnection,
@@ -199,10 +204,29 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
desc="get_public_room_ids",
)
+ def _construct_room_type_where_clause(
+ self, room_types: Union[List[Union[str, None]], None]
+ ) -> Tuple[Union[str, None], List[str]]:
+ if not room_types or not self.config.experimental.msc3827_enabled:
+ return None, []
+ else:
+ # We use None when we want get rooms without a type
+ is_null_clause = ""
+ if None in room_types:
+ is_null_clause = "OR room_type IS NULL"
+ room_types = [value for value in room_types if value is not None]
+
+ list_clause, args = make_in_list_sql_clause(
+ self.database_engine, "room_type", room_types
+ )
+
+ return f"({list_clause} {is_null_clause})", args
+
async def count_public_rooms(
self,
network_tuple: Optional[ThirdPartyInstanceID],
ignore_non_federatable: bool,
+ search_filter: Optional[dict],
) -> int:
"""Counts the number of public rooms as tracked in the room_stats_current
and room_stats_state table.
@@ -210,11 +234,20 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
Args:
network_tuple
ignore_non_federatable: If true filters out non-federatable rooms
+ search_filter
"""
def _count_public_rooms_txn(txn: LoggingTransaction) -> int:
query_args = []
+ room_type_clause, args = self._construct_room_type_where_clause(
+ search_filter.get(PublicRoomsFilterFields.ROOM_TYPES, None)
+ if search_filter
+ else None
+ )
+ room_type_clause = f" AND {room_type_clause}" if room_type_clause else ""
+ query_args += args
+
if network_tuple:
if network_tuple.appservice_id:
published_sql = """
@@ -249,6 +282,7 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
OR join_rules = '{JoinRules.KNOCK_RESTRICTED}'
OR history_visibility = 'world_readable'
)
+ {room_type_clause}
AND joined_members > 0
"""
@@ -347,8 +381,12 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
if ignore_non_federatable:
where_clauses.append("is_federatable")
- if search_filter and search_filter.get("generic_search_term", None):
- search_term = "%" + search_filter["generic_search_term"] + "%"
+ if search_filter and search_filter.get(
+ PublicRoomsFilterFields.GENERIC_SEARCH_TERM, None
+ ):
+ search_term = (
+ "%" + search_filter[PublicRoomsFilterFields.GENERIC_SEARCH_TERM] + "%"
+ )
where_clauses.append(
"""
@@ -365,6 +403,15 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
search_term.lower(),
]
+ room_type_clause, args = self._construct_room_type_where_clause(
+ search_filter.get(PublicRoomsFilterFields.ROOM_TYPES, None)
+ if search_filter
+ else None
+ )
+ if room_type_clause:
+ where_clauses.append(room_type_clause)
+ query_args += args
+
where_clause = ""
if where_clauses:
where_clause = " AND " + " AND ".join(where_clauses)
@@ -373,7 +420,7 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
sql = f"""
SELECT
room_id, name, topic, canonical_alias, joined_members,
- avatar, history_visibility, guest_access, join_rules
+ avatar, history_visibility, guest_access, join_rules, room_type
FROM (
{published_sql}
) published
@@ -1109,17 +1156,25 @@ class RoomWorkerStore(CacheInvalidationWorkerStore):
return room_servers
async def clear_partial_state_room(self, room_id: str) -> bool:
- # this can race with incoming events, so we watch out for FK errors.
- # TODO(faster_joins): this still doesn't completely fix the race, since the persist process
- # is not atomic. I fear we need an application-level lock.
+ """Clears the partial state flag for a room.
+
+ Args:
+ room_id: The room whose partial state flag is to be cleared.
+
+ Returns:
+ `True` if the partial state flag has been cleared successfully.
+
+ `False` if the partial state flag could not be cleared because the room
+ still contains events with partial state.
+ """
try:
await self.db_pool.runInteraction(
"clear_partial_state_room", self._clear_partial_state_room_txn, room_id
)
return True
- except self.db_pool.engine.module.DatabaseError as e:
- # TODO(faster_joins): how do we distinguish between FK errors and other errors?
- logger.warning(
+ except self.db_pool.engine.module.IntegrityError as e:
+ # Assume that any `IntegrityError`s are due to partial state events.
+ logger.info(
"Exception while clearing lazy partial-state-room %s, retrying: %s",
room_id,
e,
@@ -1164,6 +1219,7 @@ class _BackgroundUpdates:
POPULATE_ROOM_DEPTH_MIN_DEPTH2 = "populate_room_depth_min_depth2"
REPLACE_ROOM_DEPTH_MIN_DEPTH = "replace_room_depth_min_depth"
POPULATE_ROOMS_CREATOR_COLUMN = "populate_rooms_creator_column"
+ ADD_ROOM_TYPE_COLUMN = "add_room_type_column"
_REPLACE_ROOM_DEPTH_SQL_COMMANDS = (
@@ -1198,6 +1254,11 @@ class RoomBackgroundUpdateStore(SQLBaseStore):
self._background_add_rooms_room_version_column,
)
+ self.db_pool.updates.register_background_update_handler(
+ _BackgroundUpdates.ADD_ROOM_TYPE_COLUMN,
+ self._background_add_room_type_column,
+ )
+
# BG updates to change the type of room_depth.min_depth
self.db_pool.updates.register_background_update_handler(
_BackgroundUpdates.POPULATE_ROOM_DEPTH_MIN_DEPTH2,
@@ -1567,6 +1628,69 @@ class RoomBackgroundUpdateStore(SQLBaseStore):
return batch_size
+ async def _background_add_room_type_column(
+ self, progress: JsonDict, batch_size: int
+ ) -> int:
+ """Background update to go and add room_type information to `room_stats_state`
+ table from `event_json` table.
+ """
+
+ last_room_id = progress.get("room_id", "")
+
+ def _background_add_room_type_column_txn(
+ txn: LoggingTransaction,
+ ) -> bool:
+ sql = """
+ SELECT state.room_id, json FROM event_json
+ INNER JOIN current_state_events AS state USING (event_id)
+ WHERE state.room_id > ? AND type = 'm.room.create'
+ ORDER BY state.room_id
+ LIMIT ?
+ """
+
+ txn.execute(sql, (last_room_id, batch_size))
+ room_id_to_create_event_results = txn.fetchall()
+
+ new_last_room_id = None
+ for room_id, event_json in room_id_to_create_event_results:
+ event_dict = db_to_json(event_json)
+
+ room_type = event_dict.get("content", {}).get(
+ EventContentFields.ROOM_TYPE, None
+ )
+ if isinstance(room_type, str):
+ self.db_pool.simple_update_txn(
+ txn,
+ table="room_stats_state",
+ keyvalues={"room_id": room_id},
+ updatevalues={"room_type": room_type},
+ )
+
+ new_last_room_id = room_id
+
+ if new_last_room_id is None:
+ return True
+
+ self.db_pool.updates._background_update_progress_txn(
+ txn,
+ _BackgroundUpdates.ADD_ROOM_TYPE_COLUMN,
+ {"room_id": new_last_room_id},
+ )
+
+ return False
+
+ end = await self.db_pool.runInteraction(
+ "_background_add_room_type_column",
+ _background_add_room_type_column_txn,
+ )
+
+ if end:
+ await self.db_pool.updates._end_background_update(
+ _BackgroundUpdates.ADD_ROOM_TYPE_COLUMN
+ )
+
+ return batch_size
+
class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore):
def __init__(
diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py
index 31bc8c56..0b5e4e42 100644
--- a/synapse/storage/databases/main/roommember.py
+++ b/synapse/storage/databases/main/roommember.py
@@ -212,6 +212,60 @@ class RoomMemberWorkerStore(EventsWorkerStore):
txn.execute(sql, (room_id, Membership.JOIN))
return [r[0] for r in txn]
+ @cached()
+ def get_user_in_room_with_profile(
+ self, room_id: str, user_id: str
+ ) -> Dict[str, ProfileInfo]:
+ raise NotImplementedError()
+
+ @cachedList(
+ cached_method_name="get_user_in_room_with_profile", list_name="user_ids"
+ )
+ async def get_subset_users_in_room_with_profiles(
+ self, room_id: str, user_ids: Collection[str]
+ ) -> Dict[str, ProfileInfo]:
+ """Get a mapping from user ID to profile information for a list of users
+ in a given room.
+
+ The profile information comes directly from this room's `m.room.member`
+ events, and so may be specific to this room rather than part of a user's
+ global profile. To avoid privacy leaks, the profile data should only be
+ revealed to users who are already in this room.
+
+ Args:
+ room_id: The ID of the room to retrieve the users of.
+ user_ids: a list of users in the room to run the query for
+
+ Returns:
+ A mapping from user ID to ProfileInfo.
+ """
+
+ def _get_subset_users_in_room_with_profiles(
+ txn: LoggingTransaction,
+ ) -> Dict[str, ProfileInfo]:
+ clause, ids = make_in_list_sql_clause(
+ self.database_engine, "m.user_id", user_ids
+ )
+
+ sql = """
+ SELECT state_key, display_name, avatar_url FROM room_memberships as m
+ INNER JOIN current_state_events as c
+ ON m.event_id = c.event_id
+ AND m.room_id = c.room_id
+ AND m.user_id = c.state_key
+ WHERE c.type = 'm.room.member' AND c.room_id = ? AND m.membership = ? AND %s
+ """ % (
+ clause,
+ )
+ txn.execute(sql, (room_id, Membership.JOIN, *ids))
+
+ return {r[0]: ProfileInfo(display_name=r[1], avatar_url=r[2]) for r in txn}
+
+ return await self.db_pool.runInteraction(
+ "get_subset_users_in_room_with_profiles",
+ _get_subset_users_in_room_with_profiles,
+ )
+
@cached(max_entries=100000, iterable=True)
async def get_users_in_room_with_profiles(
self, room_id: str
@@ -338,6 +392,15 @@ class RoomMemberWorkerStore(EventsWorkerStore):
)
@cached()
+ async def get_number_joined_users_in_room(self, room_id: str) -> int:
+ return await self.db_pool.simple_select_one_onecol(
+ table="current_state_events",
+ keyvalues={"room_id": room_id, "membership": Membership.JOIN},
+ retcol="COUNT(*)",
+ desc="get_number_joined_users_in_room",
+ )
+
+ @cached()
async def get_invited_rooms_for_local_user(
self, user_id: str
) -> List[RoomsForUser]:
@@ -416,6 +479,17 @@ class RoomMemberWorkerStore(EventsWorkerStore):
user_id: str,
membership_list: List[str],
) -> List[RoomsForUser]:
+ """Get all the rooms for this *local* user where the membership for this user
+ matches one in the membership list.
+
+ Args:
+ user_id: The user ID.
+ membership_list: A list of synapse.api.constants.Membership
+ values which the user must be in.
+
+ Returns:
+ The RoomsForUser that the user matches the membership types.
+ """
# Paranoia check.
if not self.hs.is_mine_id(user_id):
raise Exception(
@@ -444,6 +518,18 @@ class RoomMemberWorkerStore(EventsWorkerStore):
return results
+ @cached(iterable=True)
+ async def get_local_users_in_room(self, room_id: str) -> List[str]:
+ """
+ Retrieves a list of the current roommembers who are local to the server.
+ """
+ return await self.db_pool.simple_select_onecol(
+ table="local_current_membership",
+ keyvalues={"room_id": room_id, "membership": Membership.JOIN},
+ retcol="user_id",
+ desc="get_local_users_in_room",
+ )
+
async def get_local_current_membership_for_user_in_room(
self, user_id: str, room_id: str
) -> Tuple[Optional[str], Optional[str]]:
diff --git a/synapse/storage/databases/main/search.py b/synapse/storage/databases/main/search.py
index 78e0773b..f6e24b68 100644
--- a/synapse/storage/databases/main/search.py
+++ b/synapse/storage/databases/main/search.py
@@ -113,7 +113,6 @@ class SearchBackgroundUpdateStore(SearchWorkerStore):
EVENT_SEARCH_UPDATE_NAME = "event_search"
EVENT_SEARCH_ORDER_UPDATE_NAME = "event_search_order"
- EVENT_SEARCH_USE_GIST_POSTGRES_NAME = "event_search_postgres_gist"
EVENT_SEARCH_USE_GIN_POSTGRES_NAME = "event_search_postgres_gin"
EVENT_SEARCH_DELETE_NON_STRINGS = "event_search_sqlite_delete_non_strings"
@@ -132,15 +131,6 @@ class SearchBackgroundUpdateStore(SearchWorkerStore):
self.EVENT_SEARCH_ORDER_UPDATE_NAME, self._background_reindex_search_order
)
- # we used to have a background update to turn the GIN index into a
- # GIST one; we no longer do that (obviously) because we actually want
- # a GIN index. However, it's possible that some people might still have
- # the background update queued, so we register a handler to clear the
- # background update.
- self.db_pool.updates.register_noop_background_update(
- self.EVENT_SEARCH_USE_GIST_POSTGRES_NAME
- )
-
self.db_pool.updates.register_background_update_handler(
self.EVENT_SEARCH_USE_GIN_POSTGRES_NAME, self._background_reindex_gin_search
)
diff --git a/synapse/storage/databases/main/state.py b/synapse/storage/databases/main/state.py
index bdd00273..9674c4a7 100644
--- a/synapse/storage/databases/main/state.py
+++ b/synapse/storage/databases/main/state.py
@@ -127,13 +127,8 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
NotFoundError: if the room is unknown
"""
- # First we try looking up room version from the database, but for old
- # rooms we might not have added the room version to it yet so we fall
- # back to previous behaviour and look in current state events.
- #
# We really should have an entry in the rooms table for every room we
- # care about, but let's be a bit paranoid (at least while the background
- # update is happening) to avoid breaking existing rooms.
+ # care about, but let's be a bit paranoid.
room_version = self.db_pool.simple_select_one_onecol_txn(
txn,
table="rooms",
@@ -440,6 +435,7 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
)
# TODO(faster_joins): need to do something about workers here
+ # https://github.com/matrix-org/synapse/issues/12994
txn.call_after(self.is_partial_state_event.invalidate, (event.event_id,))
txn.call_after(
self._get_state_group_for_event.prefill,
diff --git a/synapse/storage/databases/main/stats.py b/synapse/storage/databases/main/stats.py
index b95dbef6..b4c652ac 100644
--- a/synapse/storage/databases/main/stats.py
+++ b/synapse/storage/databases/main/stats.py
@@ -16,7 +16,7 @@
import logging
from enum import Enum
from itertools import chain
-from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, cast
+from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union, cast
from typing_extensions import Counter
@@ -120,11 +120,6 @@ class StatsStore(StateDeltasStore):
self.db_pool.updates.register_background_update_handler(
"populate_stats_process_users", self._populate_stats_process_users
)
- # we no longer need to perform clean-up, but we will give ourselves
- # the potential to reintroduce it in the future – so documentation
- # will still encourage the use of this no-op handler.
- self.db_pool.updates.register_noop_background_update("populate_stats_cleanup")
- self.db_pool.updates.register_noop_background_update("populate_stats_prepare")
async def _populate_stats_process_users(
self, progress: JsonDict, batch_size: int
@@ -243,6 +238,7 @@ class StatsStore(StateDeltasStore):
* avatar
* canonical_alias
* guest_access
+ * room_type
A is_federatable key can also be included with a boolean value.
@@ -268,6 +264,7 @@ class StatsStore(StateDeltasStore):
"avatar",
"canonical_alias",
"guest_access",
+ "room_type",
):
field = fields.get(col, sentinel)
if field is not sentinel and (not isinstance(field, str) or "\0" in field):
@@ -300,6 +297,7 @@ class StatsStore(StateDeltasStore):
keyvalues={id_col: id},
retcol="completed_delta_stream_id",
allow_none=True,
+ desc="get_earliest_token_for_stats",
)
async def bulk_update_stats_delta(
@@ -576,7 +574,7 @@ class StatsStore(StateDeltasStore):
state_event_map = await self.get_events(event_ids, get_prev_content=False) # type: ignore[attr-defined]
- room_state = {
+ room_state: Dict[str, Union[None, bool, str]] = {
"join_rules": None,
"history_visibility": None,
"encryption": None,
@@ -585,6 +583,7 @@ class StatsStore(StateDeltasStore):
"avatar": None,
"canonical_alias": None,
"is_federatable": True,
+ "room_type": None,
}
for event in state_event_map.values():
@@ -608,6 +607,9 @@ class StatsStore(StateDeltasStore):
room_state["is_federatable"] = (
event.content.get(EventContentFields.FEDERATE, True) is True
)
+ room_type = event.content.get(EventContentFields.ROOM_TYPE)
+ if isinstance(room_type, str):
+ room_state["room_type"] = room_type
await self.update_room_state(room_id, room_state)
diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py
index 8e88784d..3a1df777 100644
--- a/synapse/storage/databases/main/stream.py
+++ b/synapse/storage/databases/main/stream.py
@@ -46,10 +46,12 @@ from typing import (
Set,
Tuple,
cast,
+ overload,
)
import attr
from frozendict import frozendict
+from typing_extensions import Literal
from twisted.internet import defer
@@ -795,6 +797,24 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
)
return RoomStreamToken(topo, stream_ordering)
+ @overload
+ def get_stream_id_for_event_txn(
+ self,
+ txn: LoggingTransaction,
+ event_id: str,
+ allow_none: Literal[False] = False,
+ ) -> int:
+ ...
+
+ @overload
+ def get_stream_id_for_event_txn(
+ self,
+ txn: LoggingTransaction,
+ event_id: str,
+ allow_none: bool = False,
+ ) -> Optional[int]:
+ ...
+
def get_stream_id_for_event_txn(
self,
txn: LoggingTransaction,