summaryrefslogtreecommitdiff
path: root/synapse/storage/databases/main
diff options
context:
space:
mode:
authorAntonio Russo <aerusso@aerusso.net>2023-09-27 18:16:24 -0600
committerAntonio Russo <aerusso@aerusso.net>2023-09-27 18:16:24 -0600
commitb230e2110fd27e03a5452ce95dbbe8ea8dd87037 (patch)
tree6fb976a0de5fa566a805537ade0ddae5502ae138 /synapse/storage/databases/main
parent44b86bfd0ddbeaf838f80677985bce34ec04d72f (diff)
New upstream version 1.93.0
Diffstat (limited to 'synapse/storage/databases/main')
-rw-r--r--synapse/storage/databases/main/__init__.py7
-rw-r--r--synapse/storage/databases/main/account_data.py14
-rw-r--r--synapse/storage/databases/main/client_ips.py11
-rw-r--r--synapse/storage/databases/main/deviceinbox.py28
-rw-r--r--synapse/storage/databases/main/devices.py34
-rw-r--r--synapse/storage/databases/main/event_federation.py6
-rw-r--r--synapse/storage/databases/main/event_push_actions.py77
-rw-r--r--synapse/storage/databases/main/events.py12
-rw-r--r--synapse/storage/databases/main/experimental_features.py7
-rw-r--r--synapse/storage/databases/main/keys.py236
-rw-r--r--synapse/storage/databases/main/purge_events.py4
-rw-r--r--synapse/storage/databases/main/receipts.py29
-rw-r--r--synapse/storage/databases/main/registration.py96
-rw-r--r--synapse/storage/databases/main/stats.py1
-rw-r--r--synapse/storage/databases/main/tags.py6
-rw-r--r--synapse/storage/databases/main/task_scheduler.py6
16 files changed, 253 insertions, 321 deletions
diff --git a/synapse/storage/databases/main/__init__.py b/synapse/storage/databases/main/__init__.py
index 0836e247..10140357 100644
--- a/synapse/storage/databases/main/__init__.py
+++ b/synapse/storage/databases/main/__init__.py
@@ -175,6 +175,7 @@ class DataStore(
direction: Direction = Direction.FORWARDS,
approved: bool = True,
not_user_types: Optional[List[str]] = None,
+ locked: bool = False,
) -> Tuple[List[JsonDict], int]:
"""Function to retrieve a paginated list of users from
users list. This will return a json list of users and the
@@ -194,6 +195,7 @@ class DataStore(
direction: sort ascending or descending
approved: whether to include approved users
not_user_types: list of user types to exclude
+ locked: whether to include locked users
Returns:
A tuple of a list of mappings from user to information and a count of total users.
"""
@@ -226,6 +228,9 @@ class DataStore(
if not deactivated:
filters.append("deactivated = 0")
+ if not locked:
+ filters.append("locked IS FALSE")
+
if admins is not None:
if admins:
filters.append("admin = 1")
@@ -290,7 +295,7 @@ class DataStore(
sql = f"""
SELECT name, user_type, is_guest, admin, deactivated, shadow_banned,
displayname, avatar_url, creation_ts * 1000 as creation_ts, approved,
- eu.user_id is not null as erased, last_seen_ts
+ eu.user_id is not null as erased, last_seen_ts, locked
{sql_base}
ORDER BY {order_by_column} {order}, u.name ASC
LIMIT ? OFFSET ?
diff --git a/synapse/storage/databases/main/account_data.py b/synapse/storage/databases/main/account_data.py
index 8f7bdbc6..80f146dd 100644
--- a/synapse/storage/databases/main/account_data.py
+++ b/synapse/storage/databases/main/account_data.py
@@ -43,7 +43,7 @@ from synapse.storage.util.id_generators import (
MultiWriterIdGenerator,
StreamIdGenerator,
)
-from synapse.types import JsonDict
+from synapse.types import JsonDict, JsonMapping
from synapse.util import json_encoder
from synapse.util.caches.descriptors import cached
from synapse.util.caches.stream_change_cache import StreamChangeCache
@@ -119,7 +119,7 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore)
@cached()
async def get_global_account_data_for_user(
self, user_id: str
- ) -> Mapping[str, JsonDict]:
+ ) -> Mapping[str, JsonMapping]:
"""
Get all the global client account_data for a user.
@@ -164,7 +164,7 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore)
@cached()
async def get_room_account_data_for_user(
self, user_id: str
- ) -> Mapping[str, Mapping[str, JsonDict]]:
+ ) -> Mapping[str, Mapping[str, JsonMapping]]:
"""
Get all of the per-room client account_data for a user.
@@ -213,7 +213,7 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore)
@cached(num_args=2, max_entries=5000, tree=True)
async def get_global_account_data_by_type_for_user(
self, user_id: str, data_type: str
- ) -> Optional[JsonDict]:
+ ) -> Optional[JsonMapping]:
"""
Returns:
The account data.
@@ -265,7 +265,7 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore)
@cached(num_args=2, tree=True)
async def get_account_data_for_room(
self, user_id: str, room_id: str
- ) -> Mapping[str, JsonDict]:
+ ) -> Mapping[str, JsonMapping]:
"""Get all the client account_data for a user for a room.
Args:
@@ -296,7 +296,7 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore)
@cached(num_args=3, max_entries=5000, tree=True)
async def get_account_data_for_room_and_type(
self, user_id: str, room_id: str, account_data_type: str
- ) -> Optional[JsonDict]:
+ ) -> Optional[JsonMapping]:
"""Get the client account_data of given type for a user for a room.
Args:
@@ -394,7 +394,7 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore)
async def get_updated_global_account_data_for_user(
self, user_id: str, stream_id: int
- ) -> Dict[str, JsonDict]:
+ ) -> Mapping[str, JsonMapping]:
"""Get all the global account_data that's changed for a user.
Args:
diff --git a/synapse/storage/databases/main/client_ips.py b/synapse/storage/databases/main/client_ips.py
index d8d333e1..7da47c3d 100644
--- a/synapse/storage/databases/main/client_ips.py
+++ b/synapse/storage/databases/main/client_ips.py
@@ -764,3 +764,14 @@ class ClientIpWorkerStore(ClientIpBackgroundUpdateStore, MonthlyActiveUsersWorke
}
return list(results.values())
+
+ async def get_last_seen_for_user_id(self, user_id: str) -> Optional[int]:
+ """Get the last seen timestamp for a user, if we have it."""
+
+ return await self.db_pool.simple_select_one_onecol(
+ table="user_ips",
+ keyvalues={"user_id": user_id},
+ retcol="MAX(last_seen)",
+ allow_none=True,
+ desc="get_last_seen_for_user_id",
+ )
diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py
index b471fcb0..744e98c6 100644
--- a/synapse/storage/databases/main/deviceinbox.py
+++ b/synapse/storage/databases/main/deviceinbox.py
@@ -349,7 +349,7 @@ class DeviceInboxWorkerStore(SQLBaseStore):
table="devices",
column="user_id",
iterable=user_ids_to_query,
- keyvalues={"user_id": user_id, "hidden": False},
+ keyvalues={"hidden": False},
retcols=("device_id",),
)
@@ -445,13 +445,18 @@ class DeviceInboxWorkerStore(SQLBaseStore):
@trace
async def delete_messages_for_device(
- self, user_id: str, device_id: Optional[str], up_to_stream_id: int
+ self,
+ user_id: str,
+ device_id: Optional[str],
+ up_to_stream_id: int,
+ limit: int,
) -> int:
"""
Args:
user_id: The recipient user_id.
device_id: The recipient device_id.
up_to_stream_id: Where to delete messages up to.
+ limit: maximum number of messages to delete
Returns:
The number of messages deleted.
@@ -472,12 +477,16 @@ class DeviceInboxWorkerStore(SQLBaseStore):
log_kv({"message": "No changes in cache since last check"})
return 0
+ ROW_ID_NAME = self.database_engine.row_id_name
+
def delete_messages_for_device_txn(txn: LoggingTransaction) -> int:
- sql = (
- "DELETE FROM device_inbox"
- " WHERE user_id = ? AND device_id = ?"
- " AND stream_id <= ?"
- )
+ sql = f"""
+ DELETE FROM device_inbox WHERE {ROW_ID_NAME} IN (
+ SELECT {ROW_ID_NAME} FROM device_inbox
+ WHERE user_id = ? AND device_id = ? AND stream_id <= ?
+ LIMIT {limit}
+ )
+ """
txn.execute(sql, (user_id, device_id, up_to_stream_id))
return txn.rowcount
@@ -487,6 +496,11 @@ class DeviceInboxWorkerStore(SQLBaseStore):
log_kv({"message": f"deleted {count} messages for device", "count": count})
+ # In this case we don't know if we hit the limit or the delete is complete
+ # so let's not update the cache.
+ if count == limit:
+ return count
+
# Update the cache, ensuring that we only ever increase the value
updated_last_deleted_stream_id = self._last_device_delete_cache.get(
(user_id, device_id), 0
diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py
index e4162f84..70faf4b1 100644
--- a/synapse/storage/databases/main/devices.py
+++ b/synapse/storage/databases/main/devices.py
@@ -759,18 +759,10 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore):
mapping of user_id -> device_id -> device_info.
"""
unique_user_ids = user_ids | {user_id for user_id, _ in user_and_device_ids}
- user_map = await self.get_device_list_last_stream_id_for_remotes(
- list(unique_user_ids)
- )
- # We go and check if any of the users need to have their device lists
- # resynced. If they do then we remove them from the cached list.
- users_needing_resync = await self.get_user_ids_requiring_device_list_resync(
+ user_ids_in_cache = await self.get_users_whose_devices_are_cached(
unique_user_ids
)
- user_ids_in_cache = {
- user_id for user_id, stream_id in user_map.items() if stream_id
- } - users_needing_resync
user_ids_not_in_cache = unique_user_ids - user_ids_in_cache
# First fetch all the users which all devices are to be returned.
@@ -792,6 +784,22 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore):
return user_ids_not_in_cache, results
+ async def get_users_whose_devices_are_cached(
+ self, user_ids: StrCollection
+ ) -> Set[str]:
+ """Checks which of the given users we have cached the devices for."""
+ user_map = await self.get_device_list_last_stream_id_for_remotes(user_ids)
+
+ # We go and check if any of the users need to have their device lists
+ # resynced. If they do then we remove them from the cached list.
+ users_needing_resync = await self.get_user_ids_requiring_device_list_resync(
+ user_ids
+ )
+ user_ids_in_cache = {
+ user_id for user_id, stream_id in user_map.items() if stream_id
+ } - users_needing_resync
+ return user_ids_in_cache
+
@cached(num_args=2, tree=True)
async def _get_cached_user_device(self, user_id: str, device_id: str) -> JsonDict:
content = await self.db_pool.simple_select_one_onecol(
@@ -1766,14 +1774,6 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
self.db_pool.simple_delete_many_txn(
txn,
- table="device_inbox",
- column="device_id",
- values=device_ids,
- keyvalues={"user_id": user_id},
- )
-
- self.db_pool.simple_delete_many_txn(
- txn,
table="device_auth_providers",
column="device_id",
values=device_ids,
diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py
index fab7008a..afffa549 100644
--- a/synapse/storage/databases/main/event_federation.py
+++ b/synapse/storage/databases/main/event_federation.py
@@ -19,6 +19,7 @@ from typing import (
TYPE_CHECKING,
Collection,
Dict,
+ FrozenSet,
Iterable,
List,
Optional,
@@ -1179,13 +1180,14 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
)
@cached(max_entries=5000, iterable=True)
- async def get_latest_event_ids_in_room(self, room_id: str) -> Sequence[str]:
- return await self.db_pool.simple_select_onecol(
+ async def get_latest_event_ids_in_room(self, room_id: str) -> FrozenSet[str]:
+ event_ids = await self.db_pool.simple_select_onecol(
table="event_forward_extremities",
keyvalues={"room_id": room_id},
retcol="event_id",
desc="get_latest_event_ids_in_room",
)
+ return frozenset(event_ids)
async def get_min_depth(self, room_id: str) -> Optional[int]:
"""For the given room, get the minimum depth we have seen for it."""
diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py
index 07bda7d6..ba99e63d 100644
--- a/synapse/storage/databases/main/event_push_actions.py
+++ b/synapse/storage/databases/main/event_push_actions.py
@@ -1599,10 +1599,7 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
txn,
table="event_push_summary",
key_names=("user_id", "room_id", "thread_id"),
- key_values=[
- (user_id, room_id, thread_id)
- for user_id, room_id, thread_id in summaries
- ],
+ key_values=list(summaries),
value_names=("notif_count", "unread_count", "stream_ordering"),
value_values=[
(
@@ -1740,42 +1737,6 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
# We sleep to ensure that we don't overwhelm the DB.
await self._clock.sleep(1.0)
-
-class EventPushActionsStore(EventPushActionsWorkerStore):
- EPA_HIGHLIGHT_INDEX = "epa_highlight_index"
-
- def __init__(
- self,
- database: DatabasePool,
- db_conn: LoggingDatabaseConnection,
- hs: "HomeServer",
- ):
- super().__init__(database, db_conn, hs)
-
- self.db_pool.updates.register_background_index_update(
- self.EPA_HIGHLIGHT_INDEX,
- index_name="event_push_actions_u_highlight",
- table="event_push_actions",
- columns=["user_id", "stream_ordering"],
- )
-
- self.db_pool.updates.register_background_index_update(
- "event_push_actions_highlights_index",
- index_name="event_push_actions_highlights_index",
- table="event_push_actions",
- columns=["user_id", "room_id", "topological_ordering", "stream_ordering"],
- where_clause="highlight=1",
- )
-
- # Add index to make deleting old push actions faster.
- self.db_pool.updates.register_background_index_update(
- "event_push_actions_stream_highlight_index",
- index_name="event_push_actions_stream_highlight_index",
- table="event_push_actions",
- columns=["highlight", "stream_ordering"],
- where_clause="highlight=0",
- )
-
async def get_push_actions_for_user(
self,
user_id: str,
@@ -1834,6 +1795,42 @@ class EventPushActionsStore(EventPushActionsWorkerStore):
]
+class EventPushActionsStore(EventPushActionsWorkerStore):
+ EPA_HIGHLIGHT_INDEX = "epa_highlight_index"
+
+ def __init__(
+ self,
+ database: DatabasePool,
+ db_conn: LoggingDatabaseConnection,
+ hs: "HomeServer",
+ ):
+ super().__init__(database, db_conn, hs)
+
+ self.db_pool.updates.register_background_index_update(
+ self.EPA_HIGHLIGHT_INDEX,
+ index_name="event_push_actions_u_highlight",
+ table="event_push_actions",
+ columns=["user_id", "stream_ordering"],
+ )
+
+ self.db_pool.updates.register_background_index_update(
+ "event_push_actions_highlights_index",
+ index_name="event_push_actions_highlights_index",
+ table="event_push_actions",
+ columns=["user_id", "room_id", "topological_ordering", "stream_ordering"],
+ where_clause="highlight=1",
+ )
+
+ # Add index to make deleting old push actions faster.
+ self.db_pool.updates.register_background_index_update(
+ "event_push_actions_stream_highlight_index",
+ index_name="event_push_actions_stream_highlight_index",
+ table="event_push_actions",
+ columns=["highlight", "stream_ordering"],
+ where_clause="highlight=0",
+ )
+
+
def _action_has_highlight(actions: Collection[Union[Mapping, str]]) -> bool:
for action in actions:
if not isinstance(action, dict):
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index 0c1ed752..790d058c 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -222,7 +222,7 @@ class PersistEventsStore:
for room_id, latest_event_ids in new_forward_extremities.items():
self.store.get_latest_event_ids_in_room.prefill(
- (room_id,), list(latest_event_ids)
+ (room_id,), frozenset(latest_event_ids)
)
async def _get_events_which_are_prevs(self, event_ids: Iterable[str]) -> List[str]:
@@ -827,15 +827,7 @@ class PersistEventsStore:
"target_chain_id",
"target_sequence_number",
),
- values=[
- (source_id, source_seq, target_id, target_seq)
- for (
- source_id,
- source_seq,
- target_id,
- target_seq,
- ) in chain_links.get_additions()
- ],
+ values=list(chain_links.get_additions()),
)
@staticmethod
diff --git a/synapse/storage/databases/main/experimental_features.py b/synapse/storage/databases/main/experimental_features.py
index cf3226ae..654f9240 100644
--- a/synapse/storage/databases/main/experimental_features.py
+++ b/synapse/storage/databases/main/experimental_features.py
@@ -12,11 +12,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from typing import TYPE_CHECKING, Dict
+from typing import TYPE_CHECKING, Dict, FrozenSet
from synapse.storage.database import DatabasePool, LoggingDatabaseConnection
from synapse.storage.databases.main import CacheInvalidationWorkerStore
-from synapse.types import StrCollection
from synapse.util.caches.descriptors import cached
if TYPE_CHECKING:
@@ -34,7 +33,7 @@ class ExperimentalFeaturesStore(CacheInvalidationWorkerStore):
super().__init__(database, db_conn, hs)
@cached()
- async def list_enabled_features(self, user_id: str) -> StrCollection:
+ async def list_enabled_features(self, user_id: str) -> FrozenSet[str]:
"""
Checks to see what features are enabled for a given user
Args:
@@ -49,7 +48,7 @@ class ExperimentalFeaturesStore(CacheInvalidationWorkerStore):
["feature"],
)
- return [feature["feature"] for feature in enabled]
+ return frozenset(feature["feature"] for feature in enabled)
async def set_features_for_user(
self,
diff --git a/synapse/storage/databases/main/keys.py b/synapse/storage/databases/main/keys.py
index a3b47448..41563371 100644
--- a/synapse/storage/databases/main/keys.py
+++ b/synapse/storage/databases/main/keys.py
@@ -16,14 +16,17 @@
import itertools
import json
import logging
-from typing import Dict, Iterable, Mapping, Optional, Tuple
+from typing import Dict, Iterable, Optional, Tuple
+from canonicaljson import encode_canonical_json
from signedjson.key import decode_verify_key_bytes
from unpaddedbase64 import decode_base64
+from synapse.storage.database import LoggingTransaction
from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore
from synapse.storage.keys import FetchKeyResult, FetchKeyResultForRemote
from synapse.storage.types import Cursor
+from synapse.types import JsonDict
from synapse.util.caches.descriptors import cached, cachedList
from synapse.util.iterutils import batch_iter
@@ -36,162 +39,84 @@ db_binary_type = memoryview
class KeyStore(CacheInvalidationWorkerStore):
"""Persistence for signature verification keys"""
- @cached()
- def _get_server_signature_key(
- self, server_name_and_key_id: Tuple[str, str]
- ) -> FetchKeyResult:
- raise NotImplementedError()
-
- @cachedList(
- cached_method_name="_get_server_signature_key",
- list_name="server_name_and_key_ids",
- )
- async def get_server_signature_keys(
- self, server_name_and_key_ids: Iterable[Tuple[str, str]]
- ) -> Dict[Tuple[str, str], FetchKeyResult]:
- """
- Args:
- server_name_and_key_ids:
- iterable of (server_name, key-id) tuples to fetch keys for
-
- Returns:
- A map from (server_name, key_id) -> FetchKeyResult, or None if the
- key is unknown
- """
- keys = {}
-
- def _get_keys(txn: Cursor, batch: Tuple[Tuple[str, str], ...]) -> None:
- """Processes a batch of keys to fetch, and adds the result to `keys`."""
-
- # batch_iter always returns tuples so it's safe to do len(batch)
- sql = """
- SELECT server_name, key_id, verify_key, ts_valid_until_ms
- FROM server_signature_keys WHERE 1=0
- """ + " OR (server_name=? AND key_id=?)" * len(
- batch
- )
-
- txn.execute(sql, tuple(itertools.chain.from_iterable(batch)))
-
- for row in txn:
- server_name, key_id, key_bytes, ts_valid_until_ms = row
-
- if ts_valid_until_ms is None:
- # Old keys may be stored with a ts_valid_until_ms of null,
- # in which case we treat this as if it was set to `0`, i.e.
- # it won't match key requests that define a minimum
- # `ts_valid_until_ms`.
- ts_valid_until_ms = 0
-
- keys[(server_name, key_id)] = FetchKeyResult(
- verify_key=decode_verify_key_bytes(key_id, bytes(key_bytes)),
- valid_until_ts=ts_valid_until_ms,
- )
-
- def _txn(txn: Cursor) -> Dict[Tuple[str, str], FetchKeyResult]:
- for batch in batch_iter(server_name_and_key_ids, 50):
- _get_keys(txn, batch)
- return keys
-
- return await self.db_pool.runInteraction("get_server_signature_keys", _txn)
-
- async def store_server_signature_keys(
+ async def store_server_keys_response(
self,
+ server_name: str,
from_server: str,
ts_added_ms: int,
- verify_keys: Mapping[Tuple[str, str], FetchKeyResult],
+ verify_keys: Dict[str, FetchKeyResult],
+ response_json: JsonDict,
) -> None:
- """Stores NACL verification keys for remote servers.
+ """Stores the keys for the given server that we got from `from_server`.
+
Args:
- from_server: Where the verification keys were looked up
- ts_added_ms: The time to record that the key was added
- verify_keys:
- keys to be stored. Each entry is a triplet of
- (server_name, key_id, key).
+ server_name: The owner of the keys
+ from_server: Which server we got the keys from
+ ts_added_ms: When we're adding the keys
+ verify_keys: The decoded keys
+ response_json: The full *signed* response JSON that contains the keys.
"""
- key_values = []
- value_values = []
- invalidations = []
- for (server_name, key_id), fetch_result in verify_keys.items():
- key_values.append((server_name, key_id))
- value_values.append(
- (
- from_server,
- ts_added_ms,
- fetch_result.valid_until_ts,
- db_binary_type(fetch_result.verify_key.encode()),
- )
- )
- # invalidate takes a tuple corresponding to the params of
- # _get_server_signature_key. _get_server_signature_key only takes one
- # param, which is itself the 2-tuple (server_name, key_id).
- invalidations.append((server_name, key_id))
- await self.db_pool.simple_upsert_many(
- table="server_signature_keys",
- key_names=("server_name", "key_id"),
- key_values=key_values,
- value_names=(
- "from_server",
- "ts_added_ms",
- "ts_valid_until_ms",
- "verify_key",
- ),
- value_values=value_values,
- desc="store_server_signature_keys",
- )
+ key_json_bytes = encode_canonical_json(response_json)
+
+ def store_server_keys_response_txn(txn: LoggingTransaction) -> None:
+ self.db_pool.simple_upsert_many_txn(
+ txn,
+ table="server_signature_keys",
+ key_names=("server_name", "key_id"),
+ key_values=[(server_name, key_id) for key_id in verify_keys],
+ value_names=(
+ "from_server",
+ "ts_added_ms",
+ "ts_valid_until_ms",
+ "verify_key",
+ ),
+ value_values=[
+ (
+ from_server,
+ ts_added_ms,
+ fetch_result.valid_until_ts,
+ db_binary_type(fetch_result.verify_key.encode()),
+ )
+ for fetch_result in verify_keys.values()
+ ],
+ )
- invalidate = self._get_server_signature_key.invalidate
- for i in invalidations:
- invalidate((i,))
+ self.db_pool.simple_upsert_many_txn(
+ txn,
+ table="server_keys_json",
+ key_names=("server_name", "key_id", "from_server"),
+ key_values=[
+ (server_name, key_id, from_server) for key_id in verify_keys
+ ],
+ value_names=(
+ "ts_added_ms",
+ "ts_valid_until_ms",
+ "key_json",
+ ),
+ value_values=[
+ (
+ ts_added_ms,
+ fetch_result.valid_until_ts,
+ db_binary_type(key_json_bytes),
+ )
+ for fetch_result in verify_keys.values()
+ ],
+ )
- async def store_server_keys_json(
- self,
- server_name: str,
- key_id: str,
- from_server: str,
- ts_now_ms: int,
- ts_expires_ms: int,
- key_json_bytes: bytes,
- ) -> None:
- """Stores the JSON bytes for a set of keys from a server
- The JSON should be signed by the originating server, the intermediate
- server, and by this server. Updates the value for the
- (server_name, key_id, from_server) triplet if one already existed.
- Args:
- server_name: The name of the server.
- key_id: The identifier of the key this JSON is for.
- from_server: The server this JSON was fetched from.
- ts_now_ms: The time now in milliseconds.
- ts_valid_until_ms: The time when this json stops being valid.
- key_json_bytes: The encoded JSON.
- """
- await self.db_pool.simple_upsert(
- table="server_keys_json",
- keyvalues={
- "server_name": server_name,
- "key_id": key_id,
- "from_server": from_server,
- },
- values={
- "server_name": server_name,
- "key_id": key_id,
- "from_server": from_server,
- "ts_added_ms": ts_now_ms,
- "ts_valid_until_ms": ts_expires_ms,
- "key_json": db_binary_type(key_json_bytes),
- },
- desc="store_server_keys_json",
- )
+ # invalidate takes a tuple corresponding to the params of
+ # _get_server_keys_json. _get_server_keys_json only takes one
+ # param, which is itself the 2-tuple (server_name, key_id).
+ for key_id in verify_keys:
+ self._invalidate_cache_and_stream(
+ txn, self._get_server_keys_json, ((server_name, key_id),)
+ )
+ self._invalidate_cache_and_stream(
+ txn, self.get_server_key_json_for_remote, (server_name, key_id)
+ )
- # invalidate takes a tuple corresponding to the params of
- # _get_server_keys_json. _get_server_keys_json only takes one
- # param, which is itself the 2-tuple (server_name, key_id).
- await self.invalidate_cache_and_stream(
- "_get_server_keys_json", ((server_name, key_id),)
- )
- await self.invalidate_cache_and_stream(
- "get_server_key_json_for_remote", (server_name, key_id)
+ await self.db_pool.runInteraction(
+ "store_server_keys_response", store_server_keys_response_txn
)
@cached()
@@ -221,12 +146,17 @@ class KeyStore(CacheInvalidationWorkerStore):
"""Processes a batch of keys to fetch, and adds the result to `keys`."""
# batch_iter always returns tuples so it's safe to do len(batch)
- sql = """
- SELECT server_name, key_id, key_json, ts_valid_until_ms
- FROM server_keys_json WHERE 1=0
- """ + " OR (server_name=? AND key_id=?)" * len(
- batch
- )
+ where_clause = " OR (server_name=? AND key_id=?)" * len(batch)
+
+ # `server_keys_json` can have multiple entries per server (one per
+ # remote server we fetched from, if using perspectives). Order by
+ # `ts_added_ms` so the most recently fetched one always wins.
+ sql = f"""
+ SELECT server_name, key_id, key_json, ts_valid_until_ms
+ FROM server_keys_json WHERE 1=0
+ {where_clause}
+ ORDER BY ts_added_ms
+ """
txn.execute(sql, tuple(itertools.chain.from_iterable(batch)))
diff --git a/synapse/storage/databases/main/purge_events.py b/synapse/storage/databases/main/purge_events.py
index b52f48cf..dea0e045 100644
--- a/synapse/storage/databases/main/purge_events.py
+++ b/synapse/storage/databases/main/purge_events.py
@@ -450,10 +450,6 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
"e2e_room_keys",
"event_push_summary",
"pusher_throttle",
- "insertion_events",
- "insertion_event_extremities",
- "insertion_event_edges",
- "batch_events",
"room_account_data",
"room_tags",
# "rooms" happens last, to keep the foreign keys in the other tables
diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py
index 5ee5c7ad..a074c439 100644
--- a/synapse/storage/databases/main/receipts.py
+++ b/synapse/storage/databases/main/receipts.py
@@ -795,9 +795,7 @@ class ReceiptsWorkerStore(SQLBaseStore):
now - event_ts,
)
- await self.db_pool.runInteraction(
- "insert_graph_receipt",
- self._insert_graph_receipt_txn,
+ await self._insert_graph_receipt(
room_id,
receipt_type,
user_id,
@@ -810,9 +808,8 @@ class ReceiptsWorkerStore(SQLBaseStore):
return stream_id, max_persisted_id
- def _insert_graph_receipt_txn(
+ async def _insert_graph_receipt(
self,
- txn: LoggingTransaction,
room_id: str,
receipt_type: str,
user_id: str,
@@ -822,13 +819,6 @@ class ReceiptsWorkerStore(SQLBaseStore):
) -> None:
assert self._can_write_to_receipts
- txn.call_after(
- self._get_receipts_for_user_with_orderings.invalidate,
- (user_id, receipt_type),
- )
- # FIXME: This shouldn't invalidate the whole cache
- txn.call_after(self._get_linearized_receipts_for_room.invalidate, (room_id,))
-
keyvalues = {
"room_id": room_id,
"receipt_type": receipt_type,
@@ -840,8 +830,8 @@ class ReceiptsWorkerStore(SQLBaseStore):
else:
keyvalues["thread_id"] = thread_id
- self.db_pool.simple_upsert_txn(
- txn,
+ await self.db_pool.simple_upsert(
+ desc="insert_graph_receipt",
table="receipts_graph",
keyvalues=keyvalues,
values={
@@ -851,6 +841,11 @@ class ReceiptsWorkerStore(SQLBaseStore):
where_clause=where_clause,
)
+ self._get_receipts_for_user_with_orderings.invalidate((user_id, receipt_type))
+
+ # FIXME: This shouldn't invalidate the whole cache
+ self._get_linearized_receipts_for_room.invalidate((room_id,))
+
class ReceiptsBackgroundUpdateStore(SQLBaseStore):
POPULATE_RECEIPT_EVENT_STREAM_ORDERING = "populate_event_stream_ordering"
@@ -939,11 +934,7 @@ class ReceiptsBackgroundUpdateStore(SQLBaseStore):
receipts."""
def _remote_duplicate_receipts_txn(txn: LoggingTransaction) -> None:
- if isinstance(self.database_engine, PostgresEngine):
- ROW_ID_NAME = "ctid"
- else:
- ROW_ID_NAME = "rowid"
-
+ ROW_ID_NAME = self.database_engine.row_id_name
# Identify any duplicate receipts arising from
# https://github.com/matrix-org/synapse/issues/14406.
# The following query takes less than a minute on matrix.org.
diff --git a/synapse/storage/databases/main/registration.py b/synapse/storage/databases/main/registration.py
index 7e85b73e..cc964604 100644
--- a/synapse/storage/databases/main/registration.py
+++ b/synapse/storage/databases/main/registration.py
@@ -16,7 +16,7 @@
import logging
import random
import re
-from typing import TYPE_CHECKING, Any, Dict, List, Mapping, Optional, Tuple, Union, cast
+from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union, cast
import attr
@@ -192,8 +192,8 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore):
)
@cached()
- async def get_user_by_id(self, user_id: str) -> Optional[Mapping[str, Any]]:
- """Deprecated: use get_userinfo_by_id instead"""
+ async def get_user_by_id(self, user_id: str) -> Optional[UserInfo]:
+ """Returns info about the user account, if it exists."""
def get_user_by_id_txn(txn: LoggingTransaction) -> Optional[Dict[str, Any]]:
# We could technically use simple_select_one here, but it would not perform
@@ -202,16 +202,12 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore):
txn.execute(
"""
SELECT
- name, password_hash, is_guest, admin, consent_version, consent_ts,
+ name, is_guest, admin, consent_version, consent_ts,
consent_server_notice_sent, appservice_id, creation_ts, user_type,
deactivated, COALESCE(shadow_banned, FALSE) AS shadow_banned,
COALESCE(approved, TRUE) AS approved,
- COALESCE(locked, FALSE) AS locked, last_seen_ts
+ COALESCE(locked, FALSE) AS locked
FROM users
- LEFT JOIN (
- SELECT user_id, MAX(last_seen) AS last_seen_ts
- FROM user_ips GROUP BY user_id
- ) ls ON users.name = ls.user_id
WHERE name = ?
""",
(user_id,),
@@ -228,51 +224,23 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore):
desc="get_user_by_id",
func=get_user_by_id_txn,
)
-
- if row is not None:
- # If we're using SQLite our boolean values will be integers. Because we
- # present some of this data as is to e.g. server admins via REST APIs, we
- # want to make sure we're returning the right type of data.
- # Note: when adding a column name to this list, be wary of NULLable columns,
- # since NULL values will be turned into False.
- boolean_columns = [
- "admin",
- "deactivated",
- "shadow_banned",
- "approved",
- "locked",
- ]
- for column in boolean_columns:
- row[column] = bool(row[column])
-
- return row
-
- async def get_userinfo_by_id(self, user_id: str) -> Optional[UserInfo]:
- """Get a UserInfo object for a user by user ID.
-
- Note! Currently uses the cache of `get_user_by_id`. Once that deprecated method is removed,
- this method should be cached.
-
- Args:
- user_id: The user to fetch user info for.
- Returns:
- `UserInfo` object if user found, otherwise `None`.
- """
- user_data = await self.get_user_by_id(user_id)
- if not user_data:
+ if row is None:
return None
+
return UserInfo(
- appservice_id=user_data["appservice_id"],
- consent_server_notice_sent=user_data["consent_server_notice_sent"],
- consent_version=user_data["consent_version"],
- creation_ts=user_data["creation_ts"],
- is_admin=bool(user_data["admin"]),
- is_deactivated=bool(user_data["deactivated"]),
- is_guest=bool(user_data["is_guest"]),
- is_shadow_banned=bool(user_data["shadow_banned"]),
- user_id=UserID.from_string(user_data["name"]),
- user_type=user_data["user_type"],
- last_seen_ts=user_data["last_seen_ts"],
+ appservice_id=row["appservice_id"],
+ consent_server_notice_sent=row["consent_server_notice_sent"],
+ consent_version=row["consent_version"],
+ consent_ts=row["consent_ts"],
+ creation_ts=row["creation_ts"],
+ is_admin=bool(row["admin"]),
+ is_deactivated=bool(row["deactivated"]),
+ is_guest=bool(row["is_guest"]),
+ is_shadow_banned=bool(row["shadow_banned"]),
+ user_id=UserID.from_string(row["name"]),
+ user_type=row["user_type"],
+ approved=bool(row["approved"]),
+ locked=bool(row["locked"]),
)
async def is_trial_user(self, user_id: str) -> bool:
@@ -290,10 +258,10 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore):
now = self._clock.time_msec()
days = self.config.server.mau_appservice_trial_days.get(
- info["appservice_id"], self.config.server.mau_trial_days
+ info.appservice_id, self.config.server.mau_trial_days
)
trial_duration_ms = days * 24 * 60 * 60 * 1000
- is_trial = (now - info["creation_ts"] * 1000) < trial_duration_ms
+ is_trial = (now - info.creation_ts * 1000) < trial_duration_ms
return is_trial
@cached()
@@ -2312,6 +2280,26 @@ class RegistrationStore(StatsStore, RegistrationBackgroundUpdateStore):
return next_id
+ async def set_device_for_refresh_token(
+ self, user_id: str, old_device_id: str, device_id: str
+ ) -> None:
+ """Moves refresh tokens from old device to current device
+
+ Args:
+ user_id: The user of the devices.
+ old_device_id: The old device.
+ device_id: The new device ID.
+ Returns:
+ None
+ """
+
+ await self.db_pool.simple_update(
+ "refresh_tokens",
+ keyvalues={"user_id": user_id, "device_id": old_device_id},
+ updatevalues={"device_id": device_id},
+ desc="set_device_for_refresh_token",
+ )
+
def _set_device_for_access_token_txn(
self, txn: LoggingTransaction, token: str, device_id: str
) -> str:
diff --git a/synapse/storage/databases/main/stats.py b/synapse/storage/databases/main/stats.py
index 3a2966b9..9d403919 100644
--- a/synapse/storage/databases/main/stats.py
+++ b/synapse/storage/databases/main/stats.py
@@ -108,6 +108,7 @@ class UserSortOrder(Enum):
SHADOW_BANNED = "shadow_banned"
CREATION_TS = "creation_ts"
LAST_SEEN_TS = "last_seen_ts"
+ LOCKED = "locked"
class StatsStore(StateDeltasStore):
diff --git a/synapse/storage/databases/main/tags.py b/synapse/storage/databases/main/tags.py
index c149a9ea..61403a98 100644
--- a/synapse/storage/databases/main/tags.py
+++ b/synapse/storage/databases/main/tags.py
@@ -23,7 +23,7 @@ from synapse.storage._base import db_to_json
from synapse.storage.database import LoggingTransaction
from synapse.storage.databases.main.account_data import AccountDataWorkerStore
from synapse.storage.util.id_generators import AbstractStreamIdGenerator
-from synapse.types import JsonDict
+from synapse.types import JsonDict, JsonMapping
from synapse.util import json_encoder
from synapse.util.caches.descriptors import cached
@@ -34,7 +34,7 @@ class TagsWorkerStore(AccountDataWorkerStore):
@cached()
async def get_tags_for_user(
self, user_id: str
- ) -> Mapping[str, Mapping[str, JsonDict]]:
+ ) -> Mapping[str, Mapping[str, JsonMapping]]:
"""Get all the tags for a user.
@@ -109,7 +109,7 @@ class TagsWorkerStore(AccountDataWorkerStore):
async def get_updated_tags(
self, user_id: str, stream_id: int
- ) -> Mapping[str, Mapping[str, JsonDict]]:
+ ) -> Mapping[str, Mapping[str, JsonMapping]]:
"""Get all the tags for the rooms where the tags have changed since the
given version
diff --git a/synapse/storage/databases/main/task_scheduler.py b/synapse/storage/databases/main/task_scheduler.py
index 9ab120ee..5c5372a8 100644
--- a/synapse/storage/databases/main/task_scheduler.py
+++ b/synapse/storage/databases/main/task_scheduler.py
@@ -53,6 +53,7 @@ class TaskSchedulerWorkerStore(SQLBaseStore):
resource_id: Optional[str] = None,
statuses: Optional[List[TaskStatus]] = None,
max_timestamp: Optional[int] = None,
+ limit: Optional[int] = None,
) -> List[ScheduledTask]:
"""Get a list of scheduled tasks from the DB.
@@ -62,6 +63,7 @@ class TaskSchedulerWorkerStore(SQLBaseStore):
statuses: Limit the returned tasks to the specific statuses
max_timestamp: Limit the returned tasks to the ones that have
a timestamp inferior to the specified one
+ limit: Only return `limit` number of rows if set.
Returns: a list of `ScheduledTask`, ordered by increasing timestamps
"""
@@ -94,6 +96,10 @@ class TaskSchedulerWorkerStore(SQLBaseStore):
sql = sql + " ORDER BY timestamp"
+ if limit is not None:
+ sql += " LIMIT ?"
+ args.append(limit)
+
txn.execute(sql, args)
return self.db_pool.cursor_to_dict(txn)