summaryrefslogtreecommitdiff
path: root/synapse/storage/databases
diff options
context:
space:
mode:
authorAndrej Shadura <andrewsh@debian.org>2021-08-15 10:52:27 +0100
committerAndrej Shadura <andrewsh@debian.org>2021-08-15 10:52:27 +0100
commita48716699a33ad533b4b6d088449e4bbc4528e38 (patch)
tree0807e24466a1b4044870b2f85bd703a1673b79a1 /synapse/storage/databases
parent679ff900f5e9b83af346904d7c8604cc5917608d (diff)
New upstream version 1.40.0
Diffstat (limited to 'synapse/storage/databases')
-rw-r--r--synapse/storage/databases/main/__init__.py21
-rw-r--r--synapse/storage/databases/main/devices.py9
-rw-r--r--synapse/storage/databases/main/event_federation.py199
-rw-r--r--synapse/storage/databases/main/events.py91
-rw-r--r--synapse/storage/databases/main/monthly_active_users.py8
-rw-r--r--synapse/storage/databases/main/room.py2
-rw-r--r--synapse/storage/databases/main/state.py50
-rw-r--r--synapse/storage/databases/main/stats.py6
-rw-r--r--synapse/storage/databases/main/transactions.py8
-rw-r--r--synapse/storage/databases/main/user_directory.py66
-rw-r--r--synapse/storage/databases/state/store.py17
11 files changed, 363 insertions, 114 deletions
diff --git a/synapse/storage/databases/main/__init__.py b/synapse/storage/databases/main/__init__.py
index a3fddea0..8d9f0711 100644
--- a/synapse/storage/databases/main/__init__.py
+++ b/synapse/storage/databases/main/__init__.py
@@ -249,7 +249,7 @@ class DataStore(
name: Optional[str] = None,
guests: bool = True,
deactivated: bool = False,
- order_by: UserSortOrder = UserSortOrder.USER_ID.value,
+ order_by: str = UserSortOrder.USER_ID.value,
direction: str = "f",
) -> Tuple[List[JsonDict], int]:
"""Function to retrieve a paginated list of users from
@@ -297,27 +297,22 @@ class DataStore(
where_clause = "WHERE " + " AND ".join(filters) if len(filters) > 0 else ""
- sql_base = """
+ sql_base = f"""
FROM users as u
LEFT JOIN profiles AS p ON u.name = '@' || p.user_id || ':' || ?
- {}
- """.format(
- where_clause
- )
+ {where_clause}
+ """
sql = "SELECT COUNT(*) as total_users " + sql_base
txn.execute(sql, args)
count = txn.fetchone()[0]
- sql = """
- SELECT name, user_type, is_guest, admin, deactivated, shadow_banned, displayname, avatar_url
+ sql = f"""
+ SELECT name, user_type, is_guest, admin, deactivated, shadow_banned,
+ displayname, avatar_url, creation_ts * 1000 as creation_ts
{sql_base}
ORDER BY {order_by_column} {order}, u.name ASC
LIMIT ? OFFSET ?
- """.format(
- sql_base=sql_base,
- order_by_column=order_by_column,
- order=order,
- )
+ """
args += [limit, start]
txn.execute(sql, args)
users = self.db_pool.cursor_to_dict(txn)
diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py
index 18f07d96..3816a0ca 100644
--- a/synapse/storage/databases/main/devices.py
+++ b/synapse/storage/databases/main/devices.py
@@ -1078,16 +1078,18 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
return False
try:
- inserted = await self.db_pool.simple_insert(
+ inserted = await self.db_pool.simple_upsert(
"devices",
- values={
+ keyvalues={
"user_id": user_id,
"device_id": device_id,
+ },
+ values={},
+ insertion_values={
"display_name": initial_device_display_name,
"hidden": False,
},
desc="store_device",
- or_ignore=True,
)
if not inserted:
# if the device already exists, check if it's a real device, or
@@ -1099,6 +1101,7 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
)
if hidden:
raise StoreError(400, "The device ID is in use", Codes.FORBIDDEN)
+
self.device_id_exists_cache.set(key, True)
return inserted
except StoreError:
diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py
index d39368c2..44018c1c 100644
--- a/synapse/storage/databases/main/event_federation.py
+++ b/synapse/storage/databases/main/event_federation.py
@@ -16,11 +16,11 @@ import logging
from queue import Empty, PriorityQueue
from typing import Collection, Dict, Iterable, List, Optional, Set, Tuple
-from prometheus_client import Gauge
+from prometheus_client import Counter, Gauge
from synapse.api.constants import MAX_DEPTH
from synapse.api.errors import StoreError
-from synapse.api.room_versions import RoomVersion
+from synapse.api.room_versions import EventFormatVersions, RoomVersion
from synapse.events import EventBase, make_event_from_dict
from synapse.metrics.background_process_metrics import wrap_as_background_process
from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
@@ -44,6 +44,12 @@ number_pdus_in_federation_queue = Gauge(
"The total number of events in the inbound federation staging",
)
+pdus_pruned_from_federation_queue = Counter(
+ "synapse_federation_server_number_inbound_pdu_pruned",
+ "The number of events in the inbound federation staging that have been "
+ "pruned due to the queue getting too long",
+)
+
logger = logging.getLogger(__name__)
@@ -936,15 +942,46 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
# We want to make sure that we do a breadth-first, "depth" ordered
# search.
- query = (
- "SELECT depth, prev_event_id FROM event_edges"
- " INNER JOIN events"
- " ON prev_event_id = events.event_id"
- " WHERE event_edges.event_id = ?"
- " AND event_edges.is_state = ?"
- " LIMIT ?"
- )
+ # Look for the prev_event_id connected to the given event_id
+ query = """
+ SELECT depth, prev_event_id FROM event_edges
+ /* Get the depth of the prev_event_id from the events table */
+ INNER JOIN events
+ ON prev_event_id = events.event_id
+ /* Find an event which matches the given event_id */
+ WHERE event_edges.event_id = ?
+ AND event_edges.is_state = ?
+ LIMIT ?
+ """
+
+ # Look for the "insertion" events connected to the given event_id
+ connected_insertion_event_query = """
+ SELECT e.depth, i.event_id FROM insertion_event_edges AS i
+ /* Get the depth of the insertion event from the events table */
+ INNER JOIN events AS e USING (event_id)
+ /* Find an insertion event which points via prev_events to the given event_id */
+ WHERE i.insertion_prev_event_id = ?
+ LIMIT ?
+ """
+
+ # Find any chunk connections of a given insertion event
+ chunk_connection_query = """
+ SELECT e.depth, c.event_id FROM insertion_events AS i
+ /* Find the chunk that connects to the given insertion event */
+ INNER JOIN chunk_events AS c
+ ON i.next_chunk_id = c.chunk_id
+ /* Get the depth of the chunk start event from the events table */
+ INNER JOIN events AS e USING (event_id)
+ /* Find an insertion event which matches the given event_id */
+ WHERE i.event_id = ?
+ LIMIT ?
+ """
+ # In a PriorityQueue, the lowest valued entries are retrieved first.
+ # We're using depth as the priority in the queue.
+ # Depth is lowest at the oldest-in-time message and highest and
+ # newest-in-time message. We add events to the queue with a negative depth so that
+ # we process the newest-in-time messages first going backwards in time.
queue = PriorityQueue()
for event_id in event_list:
@@ -970,9 +1007,48 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
event_results.add(event_id)
+ # Try and find any potential historical chunks of message history.
+ #
+ # First we look for an insertion event connected to the current
+ # event (by prev_event). If we find any, we need to go and try to
+ # find any chunk events connected to the insertion event (by
+ # chunk_id). If we find any, we'll add them to the queue and
+ # navigate up the DAG like normal in the next iteration of the loop.
+ txn.execute(
+ connected_insertion_event_query, (event_id, limit - len(event_results))
+ )
+ connected_insertion_event_id_results = txn.fetchall()
+ logger.debug(
+ "_get_backfill_events: connected_insertion_event_query %s",
+ connected_insertion_event_id_results,
+ )
+ for row in connected_insertion_event_id_results:
+ connected_insertion_event_depth = row[0]
+ connected_insertion_event = row[1]
+ queue.put((-connected_insertion_event_depth, connected_insertion_event))
+
+ # Find any chunk connections for the given insertion event
+ txn.execute(
+ chunk_connection_query,
+ (connected_insertion_event, limit - len(event_results)),
+ )
+ chunk_start_event_id_results = txn.fetchall()
+ logger.debug(
+ "_get_backfill_events: chunk_start_event_id_results %s",
+ chunk_start_event_id_results,
+ )
+ for row in chunk_start_event_id_results:
+ if row[1] not in event_results:
+ queue.put((-row[0], row[1]))
+
+ # Navigate up the DAG by prev_event
txn.execute(query, (event_id, False, limit - len(event_results)))
+ prev_event_id_results = txn.fetchall()
+ logger.debug(
+ "_get_backfill_events: prev_event_ids %s", prev_event_id_results
+ )
- for row in txn:
+ for row in prev_event_id_results:
if row[1] not in event_results:
queue.put((-row[0], row[1]))
@@ -1207,6 +1283,100 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
return origin, event
+ async def prune_staged_events_in_room(
+ self,
+ room_id: str,
+ room_version: RoomVersion,
+ ) -> bool:
+ """Checks if there are lots of staged events for the room, and if so
+ prune them down.
+
+ Returns:
+ Whether any events were pruned
+ """
+
+ # First check the size of the queue.
+ count = await self.db_pool.simple_select_one_onecol(
+ table="federation_inbound_events_staging",
+ keyvalues={"room_id": room_id},
+ retcol="COALESCE(COUNT(*), 0)",
+ desc="prune_staged_events_in_room_count",
+ )
+
+ if count < 100:
+ return False
+
+ # If the queue is too large, then we want clear the entire queue,
+ # keeping only the forward extremities (i.e. the events not referenced
+ # by other events in the queue). We do this so that we can always
+ # backpaginate in all the events we have dropped.
+ rows = await self.db_pool.simple_select_list(
+ table="federation_inbound_events_staging",
+ keyvalues={"room_id": room_id},
+ retcols=("event_id", "event_json"),
+ desc="prune_staged_events_in_room_fetch",
+ )
+
+ # Find the set of events referenced by those in the queue, as well as
+ # collecting all the event IDs in the queue.
+ referenced_events: Set[str] = set()
+ seen_events: Set[str] = set()
+ for row in rows:
+ event_id = row["event_id"]
+ seen_events.add(event_id)
+ event_d = db_to_json(row["event_json"])
+
+ # We don't bother parsing the dicts into full blown event objects,
+ # as that is needlessly expensive.
+
+ # We haven't checked that the `prev_events` have the right format
+ # yet, so we check as we go.
+ prev_events = event_d.get("prev_events", [])
+ if not isinstance(prev_events, list):
+ logger.info("Invalid prev_events for %s", event_id)
+ continue
+
+ if room_version.event_format == EventFormatVersions.V1:
+ for prev_event_tuple in prev_events:
+ if not isinstance(prev_event_tuple, list) or len(prev_events) != 2:
+ logger.info("Invalid prev_events for %s", event_id)
+ break
+
+ prev_event_id = prev_event_tuple[0]
+ if not isinstance(prev_event_id, str):
+ logger.info("Invalid prev_events for %s", event_id)
+ break
+
+ referenced_events.add(prev_event_id)
+ else:
+ for prev_event_id in prev_events:
+ if not isinstance(prev_event_id, str):
+ logger.info("Invalid prev_events for %s", event_id)
+ break
+
+ referenced_events.add(prev_event_id)
+
+ to_delete = referenced_events & seen_events
+ if not to_delete:
+ return False
+
+ pdus_pruned_from_federation_queue.inc(len(to_delete))
+ logger.info(
+ "Pruning %d events in room %s from federation queue",
+ len(to_delete),
+ room_id,
+ )
+
+ await self.db_pool.simple_delete_many(
+ table="federation_inbound_events_staging",
+ keyvalues={"room_id": room_id},
+ iterable=to_delete,
+ column="event_id",
+ desc="prune_staged_events_in_room_delete",
+ )
+
+ return True
+
async def get_all_rooms_with_staged_incoming_events(self) -> List[str]:
"""Get the room IDs of all events currently staged."""
return await self.db_pool.simple_select_onecol(
@@ -1227,12 +1397,15 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
(count,) = txn.fetchone()
txn.execute(
- "SELECT coalesce(min(received_ts), 0) FROM federation_inbound_events_staging"
+ "SELECT min(received_ts) FROM federation_inbound_events_staging"
)
(received_ts,) = txn.fetchone()
- age = self._clock.time_msec() - received_ts
+ # If there is nothing in the staging area default it to 0.
+ age = 0
+ if received_ts is not None:
+ age = self._clock.time_msec() - received_ts
return count, age
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index a396a201..86baf397 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -1502,6 +1502,9 @@ class PersistEventsStore:
self._handle_event_relations(txn, event)
+ self._handle_insertion_event(txn, event)
+ self._handle_chunk_event(txn, event)
+
# Store the labels for this event.
labels = event.content.get(EventContentFields.LABELS)
if labels:
@@ -1754,6 +1757,94 @@ class PersistEventsStore:
if rel_type == RelationTypes.REPLACE:
txn.call_after(self.store.get_applicable_edit.invalidate, (parent_id,))
+ def _handle_insertion_event(self, txn: LoggingTransaction, event: EventBase):
+ """Handles keeping track of insertion events and edges/connections.
+ Part of MSC2716.
+
+ Args:
+ txn: The database transaction object
+ event: The event to process
+ """
+
+ if event.type != EventTypes.MSC2716_INSERTION:
+ # Not a insertion event
+ return
+
+ # Skip processing a insertion event if the room version doesn't
+ # support it.
+ room_version = self.store.get_room_version_txn(txn, event.room_id)
+ if not room_version.msc2716_historical:
+ return
+
+ next_chunk_id = event.content.get(EventContentFields.MSC2716_NEXT_CHUNK_ID)
+ if next_chunk_id is None:
+ # Invalid insertion event without next chunk ID
+ return
+
+ logger.debug(
+ "_handle_insertion_event (next_chunk_id=%s) %s", next_chunk_id, event
+ )
+
+ # Keep track of the insertion event and the chunk ID
+ self.db_pool.simple_insert_txn(
+ txn,
+ table="insertion_events",
+ values={
+ "event_id": event.event_id,
+ "room_id": event.room_id,
+ "next_chunk_id": next_chunk_id,
+ },
+ )
+
+ # Insert an edge for every prev_event connection
+ for prev_event_id in event.prev_events:
+ self.db_pool.simple_insert_txn(
+ txn,
+ table="insertion_event_edges",
+ values={
+ "event_id": event.event_id,
+ "room_id": event.room_id,
+ "insertion_prev_event_id": prev_event_id,
+ },
+ )
+
+ def _handle_chunk_event(self, txn: LoggingTransaction, event: EventBase):
+ """Handles inserting the chunk edges/connections between the chunk event
+ and an insertion event. Part of MSC2716.
+
+ Args:
+ txn: The database transaction object
+ event: The event to process
+ """
+
+ if event.type != EventTypes.MSC2716_CHUNK:
+ # Not a chunk event
+ return
+
+ # Skip processing a chunk event if the room version doesn't
+ # support it.
+ room_version = self.store.get_room_version_txn(txn, event.room_id)
+ if not room_version.msc2716_historical:
+ return
+
+ chunk_id = event.content.get(EventContentFields.MSC2716_CHUNK_ID)
+ if chunk_id is None:
+ # Invalid chunk event without a chunk ID
+ return
+
+ logger.debug("_handle_chunk_event chunk_id=%s %s", chunk_id, event)
+
+ # Keep track of the insertion event and the chunk ID
+ self.db_pool.simple_insert_txn(
+ txn,
+ table="chunk_events",
+ values={
+ "event_id": event.event_id,
+ "room_id": event.room_id,
+ "chunk_id": chunk_id,
+ },
+ )
+
def _handle_redaction(self, txn, redacted_event_id):
"""Handles receiving a redaction and checking whether we need to remove
any redacted relations from the database.
diff --git a/synapse/storage/databases/main/monthly_active_users.py b/synapse/storage/databases/main/monthly_active_users.py
index fe256382..d213b267 100644
--- a/synapse/storage/databases/main/monthly_active_users.py
+++ b/synapse/storage/databases/main/monthly_active_users.py
@@ -297,17 +297,13 @@ class MonthlyActiveUsersStore(MonthlyActiveUsersWorkerStore):
Args:
txn (cursor):
user_id (str): user to add/update
-
- Returns:
- bool: True if a new entry was created, False if an
- existing one was updated.
"""
# Am consciously deciding to lock the table on the basis that is ought
# never be a big table and alternative approaches (batching multiple
# upserts into a single txn) introduced a lot of extra complexity.
# See https://github.com/matrix-org/synapse/issues/3854 for more
- is_insert = self.db_pool.simple_upsert_txn(
+ self.db_pool.simple_upsert_txn(
txn,
table="monthly_active_users",
keyvalues={"user_id": user_id},
@@ -322,8 +318,6 @@ class MonthlyActiveUsersStore(MonthlyActiveUsersWorkerStore):
txn, self.user_last_seen_monthly_active, (user_id,)
)
- return is_insert
-
async def populate_monthly_active_users(self, user_id):
"""Checks on the state of monthly active user limits and optionally
add the user to the monthly active tables
diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py
index 6ddafe54..443e5f33 100644
--- a/synapse/storage/databases/main/room.py
+++ b/synapse/storage/databases/main/room.py
@@ -363,7 +363,7 @@ class RoomWorkerStore(SQLBaseStore):
self,
start: int,
limit: int,
- order_by: RoomSortOrder,
+ order_by: str,
reverse_order: bool,
search_term: Optional[str],
) -> Tuple[List[Dict[str, Any]], int]:
diff --git a/synapse/storage/databases/main/state.py b/synapse/storage/databases/main/state.py
index 1757064a..8e22da99 100644
--- a/synapse/storage/databases/main/state.py
+++ b/synapse/storage/databases/main/state.py
@@ -22,7 +22,7 @@ from synapse.api.errors import NotFoundError, UnsupportedRoomVersionError
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersion
from synapse.events import EventBase
from synapse.storage._base import SQLBaseStore
-from synapse.storage.database import DatabasePool
+from synapse.storage.database import DatabasePool, LoggingTransaction
from synapse.storage.databases.main.events_worker import EventsWorkerStore
from synapse.storage.databases.main.roommember import RoomMemberWorkerStore
from synapse.storage.state import StateFilter
@@ -58,15 +58,32 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
async def get_room_version(self, room_id: str) -> RoomVersion:
"""Get the room_version of a given room
-
Raises:
NotFoundError: if the room is unknown
+ UnsupportedRoomVersionError: if the room uses an unknown room version.
+ Typically this happens if support for the room's version has been
+ removed from Synapse.
+ """
+ return await self.db_pool.runInteraction(
+ "get_room_version_txn",
+ self.get_room_version_txn,
+ room_id,
+ )
+ def get_room_version_txn(
+ self, txn: LoggingTransaction, room_id: str
+ ) -> RoomVersion:
+ """Get the room_version of a given room
+ Args:
+ txn: Transaction object
+ room_id: The room_id of the room you are trying to get the version for
+ Raises:
+ NotFoundError: if the room is unknown
UnsupportedRoomVersionError: if the room uses an unknown room version.
Typically this happens if support for the room's version has been
removed from Synapse.
"""
- room_version_id = await self.get_room_version_id(room_id)
+ room_version_id = self.get_room_version_id_txn(txn, room_id)
v = KNOWN_ROOM_VERSIONS.get(room_version_id)
if not v:
@@ -80,7 +97,20 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
@cached(max_entries=10000)
async def get_room_version_id(self, room_id: str) -> str:
"""Get the room_version of a given room
+ Raises:
+ NotFoundError: if the room is unknown
+ """
+ return await self.db_pool.runInteraction(
+ "get_room_version_id_txn",
+ self.get_room_version_id_txn,
+ room_id,
+ )
+ def get_room_version_id_txn(self, txn: LoggingTransaction, room_id: str) -> str:
+ """Get the room_version of a given room
+ Args:
+ txn: Transaction object
+ room_id: The room_id of the room you are trying to get the version for
Raises:
NotFoundError: if the room is unknown
"""
@@ -88,24 +118,22 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
# First we try looking up room version from the database, but for old
# rooms we might not have added the room version to it yet so we fall
# back to previous behaviour and look in current state events.
-
+ #
# We really should have an entry in the rooms table for every room we
# care about, but let's be a bit paranoid (at least while the background
# update is happening) to avoid breaking existing rooms.
- version = await self.db_pool.simple_select_one_onecol(
+ room_version = self.db_pool.simple_select_one_onecol_txn(
+ txn,
table="rooms",
keyvalues={"room_id": room_id},
retcol="room_version",
- desc="get_room_version",
allow_none=True,
)
- if version is not None:
- return version
+ if room_version is None:
+ raise NotFoundError("Could not room_version for %s" % (room_id,))
- # Retrieve the room's create event
- create_event = await self.get_create_event_for_room(room_id)
- return create_event.content.get("room_version", "1")
+ return room_version
async def get_room_predecessor(self, room_id: str) -> Optional[dict]:
"""Get the predecessor of an upgraded room if it exists.
diff --git a/synapse/storage/databases/main/stats.py b/synapse/storage/databases/main/stats.py
index 59d67c25..42edbcc0 100644
--- a/synapse/storage/databases/main/stats.py
+++ b/synapse/storage/databases/main/stats.py
@@ -75,6 +75,7 @@ class UserSortOrder(Enum):
USER_TYPE = ordered alphabetically by `user_type`
AVATAR_URL = ordered alphabetically by `avatar_url`
SHADOW_BANNED = ordered by `shadow_banned`
+ CREATION_TS = ordered by `creation_ts`
"""
MEDIA_LENGTH = "media_length"
@@ -88,6 +89,7 @@ class UserSortOrder(Enum):
USER_TYPE = "user_type"
AVATAR_URL = "avatar_url"
SHADOW_BANNED = "shadow_banned"
+ CREATION_TS = "creation_ts"
class StatsStore(StateDeltasStore):
@@ -647,10 +649,10 @@ class StatsStore(StateDeltasStore):
limit: int,
from_ts: Optional[int] = None,
until_ts: Optional[int] = None,
- order_by: Optional[UserSortOrder] = UserSortOrder.USER_ID.value,
+ order_by: Optional[str] = UserSortOrder.USER_ID.value,
direction: Optional[str] = "f",
search_term: Optional[str] = None,
- ) -> Tuple[List[JsonDict], Dict[str, int]]:
+ ) -> Tuple[List[JsonDict], int]:
"""Function to retrieve a paginated list of users and their uploaded local media
(size and number). This will return a json list of users and the
total number of users matching the filter criteria.
diff --git a/synapse/storage/databases/main/transactions.py b/synapse/storage/databases/main/transactions.py
index d211c423..7728d5f1 100644
--- a/synapse/storage/databases/main/transactions.py
+++ b/synapse/storage/databases/main/transactions.py
@@ -134,16 +134,18 @@ class TransactionWorkerStore(CacheInvalidationWorkerStore):
response_dict: The response, to be encoded into JSON.
"""
- await self.db_pool.simple_insert(
+ await self.db_pool.simple_upsert(
table="received_transactions",
- values={
+ keyvalues={
"transaction_id": transaction_id,
"origin": origin,
+ },
+ values={},
+ insertion_values={
"response_code": code,
"response_json": db_binary_type(encode_canonical_json(response_dict)),
"ts": self._clock.time_msec(),
},
- or_ignore=True,
desc="set_received_txn_response",
)
diff --git a/synapse/storage/databases/main/user_directory.py b/synapse/storage/databases/main/user_directory.py
index a6bfb490..9d28d69a 100644
--- a/synapse/storage/databases/main/user_directory.py
+++ b/synapse/storage/databases/main/user_directory.py
@@ -377,7 +377,7 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
avatar_url = None
def _update_profile_in_user_dir_txn(txn):
- new_entry = self.db_pool.simple_upsert_txn(
+ self.db_pool.simple_upsert_txn(
txn,
table="user_directory",
keyvalues={"user_id": user_id},
@@ -388,8 +388,7 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
if isinstance(self.database_engine, PostgresEngine):
# We weight the localpart most highly, then display name and finally
# server name
- if self.database_engine.can_native_upsert:
- sql = """
+ sql = """
INSERT INTO user_directory_search(user_id, vector)
VALUES (?,
setweight(to_tsvector('simple', ?), 'A')
@@ -397,58 +396,15 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
|| setweight(to_tsvector('simple', COALESCE(?, '')), 'B')
) ON CONFLICT (user_id) DO UPDATE SET vector=EXCLUDED.vector
"""
- txn.execute(
- sql,
- (
- user_id,
- get_localpart_from_id(user_id),
- get_domain_from_id(user_id),
- display_name,
- ),
- )
- else:
- # TODO: Remove this code after we've bumped the minimum version
- # of postgres to always support upserts, so we can get rid of
- # `new_entry` usage
- if new_entry is True:
- sql = """
- INSERT INTO user_directory_search(user_id, vector)
- VALUES (?,
- setweight(to_tsvector('simple', ?), 'A')
- || setweight(to_tsvector('simple', ?), 'D')
- || setweight(to_tsvector('simple', COALESCE(?, '')), 'B')
- )
- """
- txn.execute(
- sql,
- (
- user_id,
- get_localpart_from_id(user_id),
- get_domain_from_id(user_id),
- display_name,
- ),
- )
- elif new_entry is False:
- sql = """
- UPDATE user_directory_search
- SET vector = setweight(to_tsvector('simple', ?), 'A')
- || setweight(to_tsvector('simple', ?), 'D')
- || setweight(to_tsvector('simple', COALESCE(?, '')), 'B')
- WHERE user_id = ?
- """
- txn.execute(
- sql,
- (
- get_localpart_from_id(user_id),
- get_domain_from_id(user_id),
- display_name,
- user_id,
- ),
- )
- else:
- raise RuntimeError(
- "upsert returned None when 'can_native_upsert' is False"
- )
+ txn.execute(
+ sql,
+ (
+ user_id,
+ get_localpart_from_id(user_id),
+ get_domain_from_id(user_id),
+ display_name,
+ ),
+ )
elif isinstance(self.database_engine, Sqlite3Engine):
value = "%s %s" % (user_id, display_name) if display_name else user_id
self.db_pool.simple_upsert_txn(
diff --git a/synapse/storage/databases/state/store.py b/synapse/storage/databases/state/store.py
index e38461ad..f839c0c2 100644
--- a/synapse/storage/databases/state/store.py
+++ b/synapse/storage/databases/state/store.py
@@ -372,18 +372,23 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
)
async def store_state_group(
- self, event_id, room_id, prev_group, delta_ids, current_state_ids
+ self,
+ event_id: str,
+ room_id: str,
+ prev_group: Optional[int],
+ delta_ids: Optional[StateMap[str]],
+ current_state_ids: StateMap[str],
) -> int:
"""Store a new set of state, returning a newly assigned state group.
Args:
- event_id (str): The event ID for which the state was calculated
- room_id (str)
- prev_group (int|None): A previous state group for the room, optional.
- delta_ids (dict|None): The delta between state at `prev_group` and
+ event_id: The event ID for which the state was calculated
+ room_id
+ prev_group: A previous state group for the room, optional.
+ delta_ids: The delta between state at `prev_group` and
`current_state_ids`, if `prev_group` was given. Same format as
`current_state_ids`.
- current_state_ids (dict): The state to store. Map of (type, state_key)
+ current_state_ids: The state to store. Map of (type, state_key)
to event_id.
Returns: