summaryrefslogtreecommitdiff
path: root/synapse/storage
diff options
context:
space:
mode:
authorAntonio Russo <aerusso@aerusso.net>2023-09-27 18:16:24 -0600
committerAntonio Russo <aerusso@aerusso.net>2023-09-27 18:16:24 -0600
commitb230e2110fd27e03a5452ce95dbbe8ea8dd87037 (patch)
tree6fb976a0de5fa566a805537ade0ddae5502ae138 /synapse/storage
parent44b86bfd0ddbeaf838f80677985bce34ec04d72f (diff)
New upstream version 1.93.0
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/background_updates.py2
-rw-r--r--synapse/storage/controllers/persist_events.py17
-rw-r--r--synapse/storage/database.py8
-rw-r--r--synapse/storage/databases/main/__init__.py7
-rw-r--r--synapse/storage/databases/main/account_data.py14
-rw-r--r--synapse/storage/databases/main/client_ips.py11
-rw-r--r--synapse/storage/databases/main/deviceinbox.py28
-rw-r--r--synapse/storage/databases/main/devices.py34
-rw-r--r--synapse/storage/databases/main/event_federation.py6
-rw-r--r--synapse/storage/databases/main/event_push_actions.py77
-rw-r--r--synapse/storage/databases/main/events.py12
-rw-r--r--synapse/storage/databases/main/experimental_features.py7
-rw-r--r--synapse/storage/databases/main/keys.py236
-rw-r--r--synapse/storage/databases/main/purge_events.py4
-rw-r--r--synapse/storage/databases/main/receipts.py29
-rw-r--r--synapse/storage/databases/main/registration.py96
-rw-r--r--synapse/storage/databases/main/stats.py1
-rw-r--r--synapse/storage/databases/main/tags.py6
-rw-r--r--synapse/storage/databases/main/task_scheduler.py6
-rw-r--r--synapse/storage/engines/_base.py6
-rw-r--r--synapse/storage/engines/postgres.py4
-rw-r--r--synapse/storage/engines/sqlite.py4
-rw-r--r--synapse/storage/schema/__init__.py6
-rw-r--r--synapse/storage/schema/main/delta/48/group_unique_indexes.py4
-rw-r--r--synapse/storage/schema/main/delta/82/02_scheduled_tasks_index.sql16
25 files changed, 303 insertions, 338 deletions
diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py
index 7619f405..99ebd96f 100644
--- a/synapse/storage/background_updates.py
+++ b/synapse/storage/background_updates.py
@@ -62,7 +62,6 @@ class Constraint(metaclass=abc.ABCMeta):
@abc.abstractmethod
def make_check_clause(self, table: str) -> str:
"""Returns an SQL expression that checks the row passes the constraint."""
- pass
@abc.abstractmethod
def make_constraint_clause_postgres(self) -> str:
@@ -70,7 +69,6 @@ class Constraint(metaclass=abc.ABCMeta):
Only used on Postgres DBs
"""
- pass
@attr.s(auto_attribs=True)
diff --git a/synapse/storage/controllers/persist_events.py b/synapse/storage/controllers/persist_events.py
index abd1d149..f39ae2d6 100644
--- a/synapse/storage/controllers/persist_events.py
+++ b/synapse/storage/controllers/persist_events.py
@@ -19,6 +19,7 @@ import logging
from collections import deque
from typing import (
TYPE_CHECKING,
+ AbstractSet,
Any,
Awaitable,
Callable,
@@ -154,12 +155,13 @@ class _UpdateCurrentStateTask:
_EventPersistQueueTask = Union[_PersistEventsTask, _UpdateCurrentStateTask]
+_PersistResult = TypeVar("_PersistResult")
@attr.s(auto_attribs=True, slots=True)
-class _EventPersistQueueItem:
+class _EventPersistQueueItem(Generic[_PersistResult]):
task: _EventPersistQueueTask
- deferred: ObservableDeferred
+ deferred: ObservableDeferred[_PersistResult]
parent_opentracing_span_contexts: List = attr.ib(factory=list)
"""A list of opentracing spans waiting for this batch"""
@@ -168,9 +170,6 @@ class _EventPersistQueueItem:
"""The opentracing span under which the persistence actually happened"""
-_PersistResult = TypeVar("_PersistResult")
-
-
class _EventPeristenceQueue(Generic[_PersistResult]):
"""Queues up tasks so that they can be processed with only one concurrent
transaction per room.
@@ -620,7 +619,7 @@ class EventsPersistenceStorageController:
)
for room_id, ev_ctx_rm in events_by_room.items():
- latest_event_ids = set(
+ latest_event_ids = (
await self.main_store.get_latest_event_ids_in_room(room_id)
)
new_latest_event_ids = await self._calculate_new_extremities(
@@ -742,7 +741,7 @@ class EventsPersistenceStorageController:
self,
room_id: str,
event_contexts: List[Tuple[EventBase, EventContext]],
- latest_event_ids: Collection[str],
+ latest_event_ids: AbstractSet[str],
) -> Set[str]:
"""Calculates the new forward extremities for a room given events to
persist.
@@ -760,8 +759,6 @@ class EventsPersistenceStorageController:
and not event.internal_metadata.is_soft_failed()
]
- latest_event_ids = set(latest_event_ids)
-
# start with the existing forward extremities
result = set(latest_event_ids)
@@ -800,7 +797,7 @@ class EventsPersistenceStorageController:
self,
room_id: str,
events_context: List[Tuple[EventBase, EventContext]],
- old_latest_event_ids: Set[str],
+ old_latest_event_ids: AbstractSet[str],
new_latest_event_ids: Set[str],
) -> Tuple[Optional[StateMap[str]], Optional[StateMap[str]], Set[str]]:
"""Calculate the current state dict after adding some new events to
diff --git a/synapse/storage/database.py b/synapse/storage/database.py
index 55ac313f..697bc565 100644
--- a/synapse/storage/database.py
+++ b/synapse/storage/database.py
@@ -422,10 +422,11 @@ class LoggingTransaction:
return self._do_execute(
# TODO: is it safe for values to be Iterable[Iterable[Any]] here?
# https://www.psycopg.org/docs/extras.html?highlight=execute_batch#psycopg2.extras.execute_values says values should be Sequence[Sequence]
- lambda the_sql: execute_values(
- self.txn, the_sql, values, template=template, fetch=fetch
+ lambda the_sql, the_values: execute_values(
+ self.txn, the_sql, the_values, template=template, fetch=fetch
),
sql,
+ values,
)
def execute(self, sql: str, parameters: SQLQueryParameters = ()) -> None:
@@ -1192,6 +1193,7 @@ class DatabasePool:
keyvalues: Dict[str, Any],
values: Dict[str, Any],
insertion_values: Optional[Dict[str, Any]] = None,
+ where_clause: Optional[str] = None,
desc: str = "simple_upsert",
) -> bool:
"""Insert a row with values + insertion_values; on conflict, update with values.
@@ -1242,6 +1244,7 @@ class DatabasePool:
keyvalues: The unique key columns and their new values
values: The nonunique columns and their new values
insertion_values: additional key/values to use only when inserting
+ where_clause: An index predicate to apply to the upsert.
desc: description of the transaction, for logging and metrics
Returns:
Returns True if a row was inserted or updated (i.e. if `values` is
@@ -1262,6 +1265,7 @@ class DatabasePool:
keyvalues,
values,
insertion_values,
+ where_clause,
db_autocommit=autocommit,
)
except self.engine.module.IntegrityError as e:
diff --git a/synapse/storage/databases/main/__init__.py b/synapse/storage/databases/main/__init__.py
index 0836e247..10140357 100644
--- a/synapse/storage/databases/main/__init__.py
+++ b/synapse/storage/databases/main/__init__.py
@@ -175,6 +175,7 @@ class DataStore(
direction: Direction = Direction.FORWARDS,
approved: bool = True,
not_user_types: Optional[List[str]] = None,
+ locked: bool = False,
) -> Tuple[List[JsonDict], int]:
"""Function to retrieve a paginated list of users from
users list. This will return a json list of users and the
@@ -194,6 +195,7 @@ class DataStore(
direction: sort ascending or descending
approved: whether to include approved users
not_user_types: list of user types to exclude
+ locked: whether to include locked users
Returns:
A tuple of a list of mappings from user to information and a count of total users.
"""
@@ -226,6 +228,9 @@ class DataStore(
if not deactivated:
filters.append("deactivated = 0")
+ if not locked:
+ filters.append("locked IS FALSE")
+
if admins is not None:
if admins:
filters.append("admin = 1")
@@ -290,7 +295,7 @@ class DataStore(
sql = f"""
SELECT name, user_type, is_guest, admin, deactivated, shadow_banned,
displayname, avatar_url, creation_ts * 1000 as creation_ts, approved,
- eu.user_id is not null as erased, last_seen_ts
+ eu.user_id is not null as erased, last_seen_ts, locked
{sql_base}
ORDER BY {order_by_column} {order}, u.name ASC
LIMIT ? OFFSET ?
diff --git a/synapse/storage/databases/main/account_data.py b/synapse/storage/databases/main/account_data.py
index 8f7bdbc6..80f146dd 100644
--- a/synapse/storage/databases/main/account_data.py
+++ b/synapse/storage/databases/main/account_data.py
@@ -43,7 +43,7 @@ from synapse.storage.util.id_generators import (
MultiWriterIdGenerator,
StreamIdGenerator,
)
-from synapse.types import JsonDict
+from synapse.types import JsonDict, JsonMapping
from synapse.util import json_encoder
from synapse.util.caches.descriptors import cached
from synapse.util.caches.stream_change_cache import StreamChangeCache
@@ -119,7 +119,7 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore)
@cached()
async def get_global_account_data_for_user(
self, user_id: str
- ) -> Mapping[str, JsonDict]:
+ ) -> Mapping[str, JsonMapping]:
"""
Get all the global client account_data for a user.
@@ -164,7 +164,7 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore)
@cached()
async def get_room_account_data_for_user(
self, user_id: str
- ) -> Mapping[str, Mapping[str, JsonDict]]:
+ ) -> Mapping[str, Mapping[str, JsonMapping]]:
"""
Get all of the per-room client account_data for a user.
@@ -213,7 +213,7 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore)
@cached(num_args=2, max_entries=5000, tree=True)
async def get_global_account_data_by_type_for_user(
self, user_id: str, data_type: str
- ) -> Optional[JsonDict]:
+ ) -> Optional[JsonMapping]:
"""
Returns:
The account data.
@@ -265,7 +265,7 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore)
@cached(num_args=2, tree=True)
async def get_account_data_for_room(
self, user_id: str, room_id: str
- ) -> Mapping[str, JsonDict]:
+ ) -> Mapping[str, JsonMapping]:
"""Get all the client account_data for a user for a room.
Args:
@@ -296,7 +296,7 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore)
@cached(num_args=3, max_entries=5000, tree=True)
async def get_account_data_for_room_and_type(
self, user_id: str, room_id: str, account_data_type: str
- ) -> Optional[JsonDict]:
+ ) -> Optional[JsonMapping]:
"""Get the client account_data of given type for a user for a room.
Args:
@@ -394,7 +394,7 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore)
async def get_updated_global_account_data_for_user(
self, user_id: str, stream_id: int
- ) -> Dict[str, JsonDict]:
+ ) -> Mapping[str, JsonMapping]:
"""Get all the global account_data that's changed for a user.
Args:
diff --git a/synapse/storage/databases/main/client_ips.py b/synapse/storage/databases/main/client_ips.py
index d8d333e1..7da47c3d 100644
--- a/synapse/storage/databases/main/client_ips.py
+++ b/synapse/storage/databases/main/client_ips.py
@@ -764,3 +764,14 @@ class ClientIpWorkerStore(ClientIpBackgroundUpdateStore, MonthlyActiveUsersWorke
}
return list(results.values())
+
+ async def get_last_seen_for_user_id(self, user_id: str) -> Optional[int]:
+ """Get the last seen timestamp for a user, if we have it."""
+
+ return await self.db_pool.simple_select_one_onecol(
+ table="user_ips",
+ keyvalues={"user_id": user_id},
+ retcol="MAX(last_seen)",
+ allow_none=True,
+ desc="get_last_seen_for_user_id",
+ )
diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py
index b471fcb0..744e98c6 100644
--- a/synapse/storage/databases/main/deviceinbox.py
+++ b/synapse/storage/databases/main/deviceinbox.py
@@ -349,7 +349,7 @@ class DeviceInboxWorkerStore(SQLBaseStore):
table="devices",
column="user_id",
iterable=user_ids_to_query,
- keyvalues={"user_id": user_id, "hidden": False},
+ keyvalues={"hidden": False},
retcols=("device_id",),
)
@@ -445,13 +445,18 @@ class DeviceInboxWorkerStore(SQLBaseStore):
@trace
async def delete_messages_for_device(
- self, user_id: str, device_id: Optional[str], up_to_stream_id: int
+ self,
+ user_id: str,
+ device_id: Optional[str],
+ up_to_stream_id: int,
+ limit: int,
) -> int:
"""
Args:
user_id: The recipient user_id.
device_id: The recipient device_id.
up_to_stream_id: Where to delete messages up to.
+ limit: maximum number of messages to delete
Returns:
The number of messages deleted.
@@ -472,12 +477,16 @@ class DeviceInboxWorkerStore(SQLBaseStore):
log_kv({"message": "No changes in cache since last check"})
return 0
+ ROW_ID_NAME = self.database_engine.row_id_name
+
def delete_messages_for_device_txn(txn: LoggingTransaction) -> int:
- sql = (
- "DELETE FROM device_inbox"
- " WHERE user_id = ? AND device_id = ?"
- " AND stream_id <= ?"
- )
+ sql = f"""
+ DELETE FROM device_inbox WHERE {ROW_ID_NAME} IN (
+ SELECT {ROW_ID_NAME} FROM device_inbox
+ WHERE user_id = ? AND device_id = ? AND stream_id <= ?
+ LIMIT {limit}
+ )
+ """
txn.execute(sql, (user_id, device_id, up_to_stream_id))
return txn.rowcount
@@ -487,6 +496,11 @@ class DeviceInboxWorkerStore(SQLBaseStore):
log_kv({"message": f"deleted {count} messages for device", "count": count})
+ # In this case we don't know if we hit the limit or the delete is complete
+ # so let's not update the cache.
+ if count == limit:
+ return count
+
# Update the cache, ensuring that we only ever increase the value
updated_last_deleted_stream_id = self._last_device_delete_cache.get(
(user_id, device_id), 0
diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py
index e4162f84..70faf4b1 100644
--- a/synapse/storage/databases/main/devices.py
+++ b/synapse/storage/databases/main/devices.py
@@ -759,18 +759,10 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore):
mapping of user_id -> device_id -> device_info.
"""
unique_user_ids = user_ids | {user_id for user_id, _ in user_and_device_ids}
- user_map = await self.get_device_list_last_stream_id_for_remotes(
- list(unique_user_ids)
- )
- # We go and check if any of the users need to have their device lists
- # resynced. If they do then we remove them from the cached list.
- users_needing_resync = await self.get_user_ids_requiring_device_list_resync(
+ user_ids_in_cache = await self.get_users_whose_devices_are_cached(
unique_user_ids
)
- user_ids_in_cache = {
- user_id for user_id, stream_id in user_map.items() if stream_id
- } - users_needing_resync
user_ids_not_in_cache = unique_user_ids - user_ids_in_cache
# First fetch all the users which all devices are to be returned.
@@ -792,6 +784,22 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore):
return user_ids_not_in_cache, results
+ async def get_users_whose_devices_are_cached(
+ self, user_ids: StrCollection
+ ) -> Set[str]:
+ """Checks which of the given users we have cached the devices for."""
+ user_map = await self.get_device_list_last_stream_id_for_remotes(user_ids)
+
+ # We go and check if any of the users need to have their device lists
+ # resynced. If they do then we remove them from the cached list.
+ users_needing_resync = await self.get_user_ids_requiring_device_list_resync(
+ user_ids
+ )
+ user_ids_in_cache = {
+ user_id for user_id, stream_id in user_map.items() if stream_id
+ } - users_needing_resync
+ return user_ids_in_cache
+
@cached(num_args=2, tree=True)
async def _get_cached_user_device(self, user_id: str, device_id: str) -> JsonDict:
content = await self.db_pool.simple_select_one_onecol(
@@ -1766,14 +1774,6 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
self.db_pool.simple_delete_many_txn(
txn,
- table="device_inbox",
- column="device_id",
- values=device_ids,
- keyvalues={"user_id": user_id},
- )
-
- self.db_pool.simple_delete_many_txn(
- txn,
table="device_auth_providers",
column="device_id",
values=device_ids,
diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py
index fab7008a..afffa549 100644
--- a/synapse/storage/databases/main/event_federation.py
+++ b/synapse/storage/databases/main/event_federation.py
@@ -19,6 +19,7 @@ from typing import (
TYPE_CHECKING,
Collection,
Dict,
+ FrozenSet,
Iterable,
List,
Optional,
@@ -1179,13 +1180,14 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
)
@cached(max_entries=5000, iterable=True)
- async def get_latest_event_ids_in_room(self, room_id: str) -> Sequence[str]:
- return await self.db_pool.simple_select_onecol(
+ async def get_latest_event_ids_in_room(self, room_id: str) -> FrozenSet[str]:
+ event_ids = await self.db_pool.simple_select_onecol(
table="event_forward_extremities",
keyvalues={"room_id": room_id},
retcol="event_id",
desc="get_latest_event_ids_in_room",
)
+ return frozenset(event_ids)
async def get_min_depth(self, room_id: str) -> Optional[int]:
"""For the given room, get the minimum depth we have seen for it."""
diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py
index 07bda7d6..ba99e63d 100644
--- a/synapse/storage/databases/main/event_push_actions.py
+++ b/synapse/storage/databases/main/event_push_actions.py
@@ -1599,10 +1599,7 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
txn,
table="event_push_summary",
key_names=("user_id", "room_id", "thread_id"),
- key_values=[
- (user_id, room_id, thread_id)
- for user_id, room_id, thread_id in summaries
- ],
+ key_values=list(summaries),
value_names=("notif_count", "unread_count", "stream_ordering"),
value_values=[
(
@@ -1740,42 +1737,6 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
# We sleep to ensure that we don't overwhelm the DB.
await self._clock.sleep(1.0)
-
-class EventPushActionsStore(EventPushActionsWorkerStore):
- EPA_HIGHLIGHT_INDEX = "epa_highlight_index"
-
- def __init__(
- self,
- database: DatabasePool,
- db_conn: LoggingDatabaseConnection,
- hs: "HomeServer",
- ):
- super().__init__(database, db_conn, hs)
-
- self.db_pool.updates.register_background_index_update(
- self.EPA_HIGHLIGHT_INDEX,
- index_name="event_push_actions_u_highlight",
- table="event_push_actions",
- columns=["user_id", "stream_ordering"],
- )
-
- self.db_pool.updates.register_background_index_update(
- "event_push_actions_highlights_index",
- index_name="event_push_actions_highlights_index",
- table="event_push_actions",
- columns=["user_id", "room_id", "topological_ordering", "stream_ordering"],
- where_clause="highlight=1",
- )
-
- # Add index to make deleting old push actions faster.
- self.db_pool.updates.register_background_index_update(
- "event_push_actions_stream_highlight_index",
- index_name="event_push_actions_stream_highlight_index",
- table="event_push_actions",
- columns=["highlight", "stream_ordering"],
- where_clause="highlight=0",
- )
-
async def get_push_actions_for_user(
self,
user_id: str,
@@ -1834,6 +1795,42 @@ class EventPushActionsStore(EventPushActionsWorkerStore):
]
+class EventPushActionsStore(EventPushActionsWorkerStore):
+ EPA_HIGHLIGHT_INDEX = "epa_highlight_index"
+
+ def __init__(
+ self,
+ database: DatabasePool,
+ db_conn: LoggingDatabaseConnection,
+ hs: "HomeServer",
+ ):
+ super().__init__(database, db_conn, hs)
+
+ self.db_pool.updates.register_background_index_update(
+ self.EPA_HIGHLIGHT_INDEX,
+ index_name="event_push_actions_u_highlight",
+ table="event_push_actions",
+ columns=["user_id", "stream_ordering"],
+ )
+
+ self.db_pool.updates.register_background_index_update(
+ "event_push_actions_highlights_index",
+ index_name="event_push_actions_highlights_index",
+ table="event_push_actions",
+ columns=["user_id", "room_id", "topological_ordering", "stream_ordering"],
+ where_clause="highlight=1",
+ )
+
+ # Add index to make deleting old push actions faster.
+ self.db_pool.updates.register_background_index_update(
+ "event_push_actions_stream_highlight_index",
+ index_name="event_push_actions_stream_highlight_index",
+ table="event_push_actions",
+ columns=["highlight", "stream_ordering"],
+ where_clause="highlight=0",
+ )
+
+
def _action_has_highlight(actions: Collection[Union[Mapping, str]]) -> bool:
for action in actions:
if not isinstance(action, dict):
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index 0c1ed752..790d058c 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -222,7 +222,7 @@ class PersistEventsStore:
for room_id, latest_event_ids in new_forward_extremities.items():
self.store.get_latest_event_ids_in_room.prefill(
- (room_id,), list(latest_event_ids)
+ (room_id,), frozenset(latest_event_ids)
)
async def _get_events_which_are_prevs(self, event_ids: Iterable[str]) -> List[str]:
@@ -827,15 +827,7 @@ class PersistEventsStore:
"target_chain_id",
"target_sequence_number",
),
- values=[
- (source_id, source_seq, target_id, target_seq)
- for (
- source_id,
- source_seq,
- target_id,
- target_seq,
- ) in chain_links.get_additions()
- ],
+ values=list(chain_links.get_additions()),
)
@staticmethod
diff --git a/synapse/storage/databases/main/experimental_features.py b/synapse/storage/databases/main/experimental_features.py
index cf3226ae..654f9240 100644
--- a/synapse/storage/databases/main/experimental_features.py
+++ b/synapse/storage/databases/main/experimental_features.py
@@ -12,11 +12,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from typing import TYPE_CHECKING, Dict
+from typing import TYPE_CHECKING, Dict, FrozenSet
from synapse.storage.database import DatabasePool, LoggingDatabaseConnection
from synapse.storage.databases.main import CacheInvalidationWorkerStore
-from synapse.types import StrCollection
from synapse.util.caches.descriptors import cached
if TYPE_CHECKING:
@@ -34,7 +33,7 @@ class ExperimentalFeaturesStore(CacheInvalidationWorkerStore):
super().__init__(database, db_conn, hs)
@cached()
- async def list_enabled_features(self, user_id: str) -> StrCollection:
+ async def list_enabled_features(self, user_id: str) -> FrozenSet[str]:
"""
Checks to see what features are enabled for a given user
Args:
@@ -49,7 +48,7 @@ class ExperimentalFeaturesStore(CacheInvalidationWorkerStore):
["feature"],
)
- return [feature["feature"] for feature in enabled]
+ return frozenset(feature["feature"] for feature in enabled)
async def set_features_for_user(
self,
diff --git a/synapse/storage/databases/main/keys.py b/synapse/storage/databases/main/keys.py
index a3b47448..41563371 100644
--- a/synapse/storage/databases/main/keys.py
+++ b/synapse/storage/databases/main/keys.py
@@ -16,14 +16,17 @@
import itertools
import json
import logging
-from typing import Dict, Iterable, Mapping, Optional, Tuple
+from typing import Dict, Iterable, Optional, Tuple
+from canonicaljson import encode_canonical_json
from signedjson.key import decode_verify_key_bytes
from unpaddedbase64 import decode_base64
+from synapse.storage.database import LoggingTransaction
from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore
from synapse.storage.keys import FetchKeyResult, FetchKeyResultForRemote
from synapse.storage.types import Cursor
+from synapse.types import JsonDict
from synapse.util.caches.descriptors import cached, cachedList
from synapse.util.iterutils import batch_iter
@@ -36,162 +39,84 @@ db_binary_type = memoryview
class KeyStore(CacheInvalidationWorkerStore):
"""Persistence for signature verification keys"""
- @cached()
- def _get_server_signature_key(
- self, server_name_and_key_id: Tuple[str, str]
- ) -> FetchKeyResult:
- raise NotImplementedError()
-
- @cachedList(
- cached_method_name="_get_server_signature_key",
- list_name="server_name_and_key_ids",
- )
- async def get_server_signature_keys(
- self, server_name_and_key_ids: Iterable[Tuple[str, str]]
- ) -> Dict[Tuple[str, str], FetchKeyResult]:
- """
- Args:
- server_name_and_key_ids:
- iterable of (server_name, key-id) tuples to fetch keys for
-
- Returns:
- A map from (server_name, key_id) -> FetchKeyResult, or None if the
- key is unknown
- """
- keys = {}
-
- def _get_keys(txn: Cursor, batch: Tuple[Tuple[str, str], ...]) -> None:
- """Processes a batch of keys to fetch, and adds the result to `keys`."""
-
- # batch_iter always returns tuples so it's safe to do len(batch)
- sql = """
- SELECT server_name, key_id, verify_key, ts_valid_until_ms
- FROM server_signature_keys WHERE 1=0
- """ + " OR (server_name=? AND key_id=?)" * len(
- batch
- )
-
- txn.execute(sql, tuple(itertools.chain.from_iterable(batch)))
-
- for row in txn:
- server_name, key_id, key_bytes, ts_valid_until_ms = row
-
- if ts_valid_until_ms is None:
- # Old keys may be stored with a ts_valid_until_ms of null,
- # in which case we treat this as if it was set to `0`, i.e.
- # it won't match key requests that define a minimum
- # `ts_valid_until_ms`.
- ts_valid_until_ms = 0
-
- keys[(server_name, key_id)] = FetchKeyResult(
- verify_key=decode_verify_key_bytes(key_id, bytes(key_bytes)),
- valid_until_ts=ts_valid_until_ms,
- )
-
- def _txn(txn: Cursor) -> Dict[Tuple[str, str], FetchKeyResult]:
- for batch in batch_iter(server_name_and_key_ids, 50):
- _get_keys(txn, batch)
- return keys
-
- return await self.db_pool.runInteraction("get_server_signature_keys", _txn)
-
- async def store_server_signature_keys(
+ async def store_server_keys_response(
self,
+ server_name: str,
from_server: str,
ts_added_ms: int,
- verify_keys: Mapping[Tuple[str, str], FetchKeyResult],
+ verify_keys: Dict[str, FetchKeyResult],
+ response_json: JsonDict,
) -> None:
- """Stores NACL verification keys for remote servers.
+ """Stores the keys for the given server that we got from `from_server`.
+
Args:
- from_server: Where the verification keys were looked up
- ts_added_ms: The time to record that the key was added
- verify_keys:
- keys to be stored. Each entry is a triplet of
- (server_name, key_id, key).
+ server_name: The owner of the keys
+ from_server: Which server we got the keys from
+ ts_added_ms: When we're adding the keys
+ verify_keys: The decoded keys
+ response_json: The full *signed* response JSON that contains the keys.
"""
- key_values = []
- value_values = []
- invalidations = []
- for (server_name, key_id), fetch_result in verify_keys.items():
- key_values.append((server_name, key_id))
- value_values.append(
- (
- from_server,
- ts_added_ms,
- fetch_result.valid_until_ts,
- db_binary_type(fetch_result.verify_key.encode()),
- )
- )
- # invalidate takes a tuple corresponding to the params of
- # _get_server_signature_key. _get_server_signature_key only takes one
- # param, which is itself the 2-tuple (server_name, key_id).
- invalidations.append((server_name, key_id))
- await self.db_pool.simple_upsert_many(
- table="server_signature_keys",
- key_names=("server_name", "key_id"),
- key_values=key_values,
- value_names=(
- "from_server",
- "ts_added_ms",
- "ts_valid_until_ms",
- "verify_key",
- ),
- value_values=value_values,
- desc="store_server_signature_keys",
- )
+ key_json_bytes = encode_canonical_json(response_json)
+
+ def store_server_keys_response_txn(txn: LoggingTransaction) -> None:
+ self.db_pool.simple_upsert_many_txn(
+ txn,
+ table="server_signature_keys",
+ key_names=("server_name", "key_id"),
+ key_values=[(server_name, key_id) for key_id in verify_keys],
+ value_names=(
+ "from_server",
+ "ts_added_ms",
+ "ts_valid_until_ms",
+ "verify_key",
+ ),
+ value_values=[
+ (
+ from_server,
+ ts_added_ms,
+ fetch_result.valid_until_ts,
+ db_binary_type(fetch_result.verify_key.encode()),
+ )
+ for fetch_result in verify_keys.values()
+ ],
+ )
- invalidate = self._get_server_signature_key.invalidate
- for i in invalidations:
- invalidate((i,))
+ self.db_pool.simple_upsert_many_txn(
+ txn,
+ table="server_keys_json",
+ key_names=("server_name", "key_id", "from_server"),
+ key_values=[
+ (server_name, key_id, from_server) for key_id in verify_keys
+ ],
+ value_names=(
+ "ts_added_ms",
+ "ts_valid_until_ms",
+ "key_json",
+ ),
+ value_values=[
+ (
+ ts_added_ms,
+ fetch_result.valid_until_ts,
+ db_binary_type(key_json_bytes),
+ )
+ for fetch_result in verify_keys.values()
+ ],
+ )
- async def store_server_keys_json(
- self,
- server_name: str,
- key_id: str,
- from_server: str,
- ts_now_ms: int,
- ts_expires_ms: int,
- key_json_bytes: bytes,
- ) -> None:
- """Stores the JSON bytes for a set of keys from a server
- The JSON should be signed by the originating server, the intermediate
- server, and by this server. Updates the value for the
- (server_name, key_id, from_server) triplet if one already existed.
- Args:
- server_name: The name of the server.
- key_id: The identifier of the key this JSON is for.
- from_server: The server this JSON was fetched from.
- ts_now_ms: The time now in milliseconds.
- ts_valid_until_ms: The time when this json stops being valid.
- key_json_bytes: The encoded JSON.
- """
- await self.db_pool.simple_upsert(
- table="server_keys_json",
- keyvalues={
- "server_name": server_name,
- "key_id": key_id,
- "from_server": from_server,
- },
- values={
- "server_name": server_name,
- "key_id": key_id,
- "from_server": from_server,
- "ts_added_ms": ts_now_ms,
- "ts_valid_until_ms": ts_expires_ms,
- "key_json": db_binary_type(key_json_bytes),
- },
- desc="store_server_keys_json",
- )
+ # invalidate takes a tuple corresponding to the params of
+ # _get_server_keys_json. _get_server_keys_json only takes one
+ # param, which is itself the 2-tuple (server_name, key_id).
+ for key_id in verify_keys:
+ self._invalidate_cache_and_stream(
+ txn, self._get_server_keys_json, ((server_name, key_id),)
+ )
+ self._invalidate_cache_and_stream(
+ txn, self.get_server_key_json_for_remote, (server_name, key_id)
+ )
- # invalidate takes a tuple corresponding to the params of
- # _get_server_keys_json. _get_server_keys_json only takes one
- # param, which is itself the 2-tuple (server_name, key_id).
- await self.invalidate_cache_and_stream(
- "_get_server_keys_json", ((server_name, key_id),)
- )
- await self.invalidate_cache_and_stream(
- "get_server_key_json_for_remote", (server_name, key_id)
+ await self.db_pool.runInteraction(
+ "store_server_keys_response", store_server_keys_response_txn
)
@cached()
@@ -221,12 +146,17 @@ class KeyStore(CacheInvalidationWorkerStore):
"""Processes a batch of keys to fetch, and adds the result to `keys`."""
# batch_iter always returns tuples so it's safe to do len(batch)
- sql = """
- SELECT server_name, key_id, key_json, ts_valid_until_ms
- FROM server_keys_json WHERE 1=0
- """ + " OR (server_name=? AND key_id=?)" * len(
- batch
- )
+ where_clause = " OR (server_name=? AND key_id=?)" * len(batch)
+
+ # `server_keys_json` can have multiple entries per server (one per
+ # remote server we fetched from, if using perspectives). Order by
+ # `ts_added_ms` so the most recently fetched one always wins.
+ sql = f"""
+ SELECT server_name, key_id, key_json, ts_valid_until_ms
+ FROM server_keys_json WHERE 1=0
+ {where_clause}
+ ORDER BY ts_added_ms
+ """
txn.execute(sql, tuple(itertools.chain.from_iterable(batch)))
diff --git a/synapse/storage/databases/main/purge_events.py b/synapse/storage/databases/main/purge_events.py
index b52f48cf..dea0e045 100644
--- a/synapse/storage/databases/main/purge_events.py
+++ b/synapse/storage/databases/main/purge_events.py
@@ -450,10 +450,6 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
"e2e_room_keys",
"event_push_summary",
"pusher_throttle",
- "insertion_events",
- "insertion_event_extremities",
- "insertion_event_edges",
- "batch_events",
"room_account_data",
"room_tags",
# "rooms" happens last, to keep the foreign keys in the other tables
diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py
index 5ee5c7ad..a074c439 100644
--- a/synapse/storage/databases/main/receipts.py
+++ b/synapse/storage/databases/main/receipts.py
@@ -795,9 +795,7 @@ class ReceiptsWorkerStore(SQLBaseStore):
now - event_ts,
)
- await self.db_pool.runInteraction(
- "insert_graph_receipt",
- self._insert_graph_receipt_txn,
+ await self._insert_graph_receipt(
room_id,
receipt_type,
user_id,
@@ -810,9 +808,8 @@ class ReceiptsWorkerStore(SQLBaseStore):
return stream_id, max_persisted_id
- def _insert_graph_receipt_txn(
+ async def _insert_graph_receipt(
self,
- txn: LoggingTransaction,
room_id: str,
receipt_type: str,
user_id: str,
@@ -822,13 +819,6 @@ class ReceiptsWorkerStore(SQLBaseStore):
) -> None:
assert self._can_write_to_receipts
- txn.call_after(
- self._get_receipts_for_user_with_orderings.invalidate,
- (user_id, receipt_type),
- )
- # FIXME: This shouldn't invalidate the whole cache
- txn.call_after(self._get_linearized_receipts_for_room.invalidate, (room_id,))
-
keyvalues = {
"room_id": room_id,
"receipt_type": receipt_type,
@@ -840,8 +830,8 @@ class ReceiptsWorkerStore(SQLBaseStore):
else:
keyvalues["thread_id"] = thread_id
- self.db_pool.simple_upsert_txn(
- txn,
+ await self.db_pool.simple_upsert(
+ desc="insert_graph_receipt",
table="receipts_graph",
keyvalues=keyvalues,
values={
@@ -851,6 +841,11 @@ class ReceiptsWorkerStore(SQLBaseStore):
where_clause=where_clause,
)
+ self._get_receipts_for_user_with_orderings.invalidate((user_id, receipt_type))
+
+ # FIXME: This shouldn't invalidate the whole cache
+ self._get_linearized_receipts_for_room.invalidate((room_id,))
+
class ReceiptsBackgroundUpdateStore(SQLBaseStore):
POPULATE_RECEIPT_EVENT_STREAM_ORDERING = "populate_event_stream_ordering"
@@ -939,11 +934,7 @@ class ReceiptsBackgroundUpdateStore(SQLBaseStore):
receipts."""
def _remote_duplicate_receipts_txn(txn: LoggingTransaction) -> None:
- if isinstance(self.database_engine, PostgresEngine):
- ROW_ID_NAME = "ctid"
- else:
- ROW_ID_NAME = "rowid"
-
+ ROW_ID_NAME = self.database_engine.row_id_name
# Identify any duplicate receipts arising from
# https://github.com/matrix-org/synapse/issues/14406.
# The following query takes less than a minute on matrix.org.
diff --git a/synapse/storage/databases/main/registration.py b/synapse/storage/databases/main/registration.py
index 7e85b73e..cc964604 100644
--- a/synapse/storage/databases/main/registration.py
+++ b/synapse/storage/databases/main/registration.py
@@ -16,7 +16,7 @@
import logging
import random
import re
-from typing import TYPE_CHECKING, Any, Dict, List, Mapping, Optional, Tuple, Union, cast
+from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union, cast
import attr
@@ -192,8 +192,8 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore):
)
@cached()
- async def get_user_by_id(self, user_id: str) -> Optional[Mapping[str, Any]]:
- """Deprecated: use get_userinfo_by_id instead"""
+ async def get_user_by_id(self, user_id: str) -> Optional[UserInfo]:
+ """Returns info about the user account, if it exists."""
def get_user_by_id_txn(txn: LoggingTransaction) -> Optional[Dict[str, Any]]:
# We could technically use simple_select_one here, but it would not perform
@@ -202,16 +202,12 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore):
txn.execute(
"""
SELECT
- name, password_hash, is_guest, admin, consent_version, consent_ts,
+ name, is_guest, admin, consent_version, consent_ts,
consent_server_notice_sent, appservice_id, creation_ts, user_type,
deactivated, COALESCE(shadow_banned, FALSE) AS shadow_banned,
COALESCE(approved, TRUE) AS approved,
- COALESCE(locked, FALSE) AS locked, last_seen_ts
+ COALESCE(locked, FALSE) AS locked
FROM users
- LEFT JOIN (
- SELECT user_id, MAX(last_seen) AS last_seen_ts
- FROM user_ips GROUP BY user_id
- ) ls ON users.name = ls.user_id
WHERE name = ?
""",
(user_id,),
@@ -228,51 +224,23 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore):
desc="get_user_by_id",
func=get_user_by_id_txn,
)
-
- if row is not None:
- # If we're using SQLite our boolean values will be integers. Because we
- # present some of this data as is to e.g. server admins via REST APIs, we
- # want to make sure we're returning the right type of data.
- # Note: when adding a column name to this list, be wary of NULLable columns,
- # since NULL values will be turned into False.
- boolean_columns = [
- "admin",
- "deactivated",
- "shadow_banned",
- "approved",
- "locked",
- ]
- for column in boolean_columns:
- row[column] = bool(row[column])
-
- return row
-
- async def get_userinfo_by_id(self, user_id: str) -> Optional[UserInfo]:
- """Get a UserInfo object for a user by user ID.
-
- Note! Currently uses the cache of `get_user_by_id`. Once that deprecated method is removed,
- this method should be cached.
-
- Args:
- user_id: The user to fetch user info for.
- Returns:
- `UserInfo` object if user found, otherwise `None`.
- """
- user_data = await self.get_user_by_id(user_id)
- if not user_data:
+ if row is None:
return None
+
return UserInfo(
- appservice_id=user_data["appservice_id"],
- consent_server_notice_sent=user_data["consent_server_notice_sent"],
- consent_version=user_data["consent_version"],
- creation_ts=user_data["creation_ts"],
- is_admin=bool(user_data["admin"]),
- is_deactivated=bool(user_data["deactivated"]),
- is_guest=bool(user_data["is_guest"]),
- is_shadow_banned=bool(user_data["shadow_banned"]),
- user_id=UserID.from_string(user_data["name"]),
- user_type=user_data["user_type"],
- last_seen_ts=user_data["last_seen_ts"],
+ appservice_id=row["appservice_id"],
+ consent_server_notice_sent=row["consent_server_notice_sent"],
+ consent_version=row["consent_version"],
+ consent_ts=row["consent_ts"],
+ creation_ts=row["creation_ts"],
+ is_admin=bool(row["admin"]),
+ is_deactivated=bool(row["deactivated"]),
+ is_guest=bool(row["is_guest"]),
+ is_shadow_banned=bool(row["shadow_banned"]),
+ user_id=UserID.from_string(row["name"]),
+ user_type=row["user_type"],
+ approved=bool(row["approved"]),
+ locked=bool(row["locked"]),
)
async def is_trial_user(self, user_id: str) -> bool:
@@ -290,10 +258,10 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore):
now = self._clock.time_msec()
days = self.config.server.mau_appservice_trial_days.get(
- info["appservice_id"], self.config.server.mau_trial_days
+ info.appservice_id, self.config.server.mau_trial_days
)
trial_duration_ms = days * 24 * 60 * 60 * 1000
- is_trial = (now - info["creation_ts"] * 1000) < trial_duration_ms
+ is_trial = (now - info.creation_ts * 1000) < trial_duration_ms
return is_trial
@cached()
@@ -2312,6 +2280,26 @@ class RegistrationStore(StatsStore, RegistrationBackgroundUpdateStore):
return next_id
+ async def set_device_for_refresh_token(
+ self, user_id: str, old_device_id: str, device_id: str
+ ) -> None:
+ """Moves refresh tokens from old device to current device
+
+ Args:
+ user_id: The user of the devices.
+ old_device_id: The old device.
+ device_id: The new device ID.
+ Returns:
+ None
+ """
+
+ await self.db_pool.simple_update(
+ "refresh_tokens",
+ keyvalues={"user_id": user_id, "device_id": old_device_id},
+ updatevalues={"device_id": device_id},
+ desc="set_device_for_refresh_token",
+ )
+
def _set_device_for_access_token_txn(
self, txn: LoggingTransaction, token: str, device_id: str
) -> str:
diff --git a/synapse/storage/databases/main/stats.py b/synapse/storage/databases/main/stats.py
index 3a2966b9..9d403919 100644
--- a/synapse/storage/databases/main/stats.py
+++ b/synapse/storage/databases/main/stats.py
@@ -108,6 +108,7 @@ class UserSortOrder(Enum):
SHADOW_BANNED = "shadow_banned"
CREATION_TS = "creation_ts"
LAST_SEEN_TS = "last_seen_ts"
+ LOCKED = "locked"
class StatsStore(StateDeltasStore):
diff --git a/synapse/storage/databases/main/tags.py b/synapse/storage/databases/main/tags.py
index c149a9ea..61403a98 100644
--- a/synapse/storage/databases/main/tags.py
+++ b/synapse/storage/databases/main/tags.py
@@ -23,7 +23,7 @@ from synapse.storage._base import db_to_json
from synapse.storage.database import LoggingTransaction
from synapse.storage.databases.main.account_data import AccountDataWorkerStore
from synapse.storage.util.id_generators import AbstractStreamIdGenerator
-from synapse.types import JsonDict
+from synapse.types import JsonDict, JsonMapping
from synapse.util import json_encoder
from synapse.util.caches.descriptors import cached
@@ -34,7 +34,7 @@ class TagsWorkerStore(AccountDataWorkerStore):
@cached()
async def get_tags_for_user(
self, user_id: str
- ) -> Mapping[str, Mapping[str, JsonDict]]:
+ ) -> Mapping[str, Mapping[str, JsonMapping]]:
"""Get all the tags for a user.
@@ -109,7 +109,7 @@ class TagsWorkerStore(AccountDataWorkerStore):
async def get_updated_tags(
self, user_id: str, stream_id: int
- ) -> Mapping[str, Mapping[str, JsonDict]]:
+ ) -> Mapping[str, Mapping[str, JsonMapping]]:
"""Get all the tags for the rooms where the tags have changed since the
given version
diff --git a/synapse/storage/databases/main/task_scheduler.py b/synapse/storage/databases/main/task_scheduler.py
index 9ab120ee..5c5372a8 100644
--- a/synapse/storage/databases/main/task_scheduler.py
+++ b/synapse/storage/databases/main/task_scheduler.py
@@ -53,6 +53,7 @@ class TaskSchedulerWorkerStore(SQLBaseStore):
resource_id: Optional[str] = None,
statuses: Optional[List[TaskStatus]] = None,
max_timestamp: Optional[int] = None,
+ limit: Optional[int] = None,
) -> List[ScheduledTask]:
"""Get a list of scheduled tasks from the DB.
@@ -62,6 +63,7 @@ class TaskSchedulerWorkerStore(SQLBaseStore):
statuses: Limit the returned tasks to the specific statuses
max_timestamp: Limit the returned tasks to the ones that have
a timestamp inferior to the specified one
+ limit: Only return `limit` number of rows if set.
Returns: a list of `ScheduledTask`, ordered by increasing timestamps
"""
@@ -94,6 +96,10 @@ class TaskSchedulerWorkerStore(SQLBaseStore):
sql = sql + " ORDER BY timestamp"
+ if limit is not None:
+ sql += " LIMIT ?"
+ args.append(limit)
+
txn.execute(sql, args)
return self.db_pool.cursor_to_dict(txn)
diff --git a/synapse/storage/engines/_base.py b/synapse/storage/engines/_base.py
index 0b5b3bf0..b1a2418c 100644
--- a/synapse/storage/engines/_base.py
+++ b/synapse/storage/engines/_base.py
@@ -100,6 +100,12 @@ class BaseDatabaseEngine(Generic[ConnectionType, CursorType], metaclass=abc.ABCM
"""Gets a string giving the server version. For example: '3.22.0'"""
...
+ @property
+ @abc.abstractmethod
+ def row_id_name(self) -> str:
+ """Gets the literal name representing a row id for this engine."""
+ ...
+
@abc.abstractmethod
def in_transaction(self, conn: ConnectionType) -> bool:
"""Whether the connection is currently in a transaction."""
diff --git a/synapse/storage/engines/postgres.py b/synapse/storage/engines/postgres.py
index 05a72dc5..63093632 100644
--- a/synapse/storage/engines/postgres.py
+++ b/synapse/storage/engines/postgres.py
@@ -211,6 +211,10 @@ class PostgresEngine(
else:
return "%i.%i.%i" % (numver / 10000, (numver % 10000) / 100, numver % 100)
+ @property
+ def row_id_name(self) -> str:
+ return "ctid"
+
def in_transaction(self, conn: psycopg2.extensions.connection) -> bool:
return conn.status != psycopg2.extensions.STATUS_READY
diff --git a/synapse/storage/engines/sqlite.py b/synapse/storage/engines/sqlite.py
index ca8c5929..802069e1 100644
--- a/synapse/storage/engines/sqlite.py
+++ b/synapse/storage/engines/sqlite.py
@@ -123,6 +123,10 @@ class Sqlite3Engine(BaseDatabaseEngine[sqlite3.Connection, sqlite3.Cursor]):
"""Gets a string giving the server version. For example: '3.22.0'."""
return "%i.%i.%i" % sqlite3.sqlite_version_info
+ @property
+ def row_id_name(self) -> str:
+ return "rowid"
+
def in_transaction(self, conn: sqlite3.Connection) -> bool:
return conn.in_transaction
diff --git a/synapse/storage/schema/__init__.py b/synapse/storage/schema/__init__.py
index 422f11f5..5b50bd66 100644
--- a/synapse/storage/schema/__init__.py
+++ b/synapse/storage/schema/__init__.py
@@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-SCHEMA_VERSION = 81 # remember to update the list below when updating
+SCHEMA_VERSION = 82 # remember to update the list below when updating
"""Represents the expectations made by the codebase about the database schema
This should be incremented whenever the codebase changes its requirements on the
@@ -117,6 +117,10 @@ Changes in SCHEMA_VERSION = 80
Changes in SCHEMA_VERSION = 81
- The event_txn_id is no longer written to for new events.
+
+Changes in SCHEMA_VERSION = 82
+ - The insertion_events, insertion_event_extremities, insertion_event_edges, and
+ batch_events tables are no longer purged in preparation for their removal.
"""
diff --git a/synapse/storage/schema/main/delta/48/group_unique_indexes.py b/synapse/storage/schema/main/delta/48/group_unique_indexes.py
index ad2da4c8..622686d2 100644
--- a/synapse/storage/schema/main/delta/48/group_unique_indexes.py
+++ b/synapse/storage/schema/main/delta/48/group_unique_indexes.py
@@ -14,7 +14,7 @@
from synapse.storage.database import LoggingTransaction
-from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine
+from synapse.storage.engines import BaseDatabaseEngine
from synapse.storage.prepare_database import get_statements
FIX_INDEXES = """
@@ -37,7 +37,7 @@ CREATE INDEX group_rooms_r_idx ON group_rooms(room_id);
def run_create(cur: LoggingTransaction, database_engine: BaseDatabaseEngine) -> None:
- rowid = "ctid" if isinstance(database_engine, PostgresEngine) else "rowid"
+ rowid = database_engine.row_id_name
# remove duplicates from group_users & group_invites tables
cur.execute(
diff --git a/synapse/storage/schema/main/delta/82/02_scheduled_tasks_index.sql b/synapse/storage/schema/main/delta/82/02_scheduled_tasks_index.sql
new file mode 100644
index 00000000..6b902751
--- /dev/null
+++ b/synapse/storage/schema/main/delta/82/02_scheduled_tasks_index.sql
@@ -0,0 +1,16 @@
+/* Copyright 2023 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE INDEX IF NOT EXISTS scheduled_tasks_timestamp ON scheduled_tasks(timestamp);