summaryrefslogtreecommitdiff
path: root/synapse/storage
diff options
context:
space:
mode:
authorAndrej Shadura <andrewsh@debian.org>2022-03-22 18:11:28 +0100
committerAndrej Shadura <andrewsh@debian.org>2022-03-22 18:11:28 +0100
commit71eade9dd9a73148f44cbd0044c691447ae12caa (patch)
tree2c90f516a37c1c5e7158ee6b14381e26bf05e693 /synapse/storage
parent5d8241ddfec4abdb690c84aeb49ca47dca78fc97 (diff)
New upstream version 1.55.0
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/background_updates.py47
-rw-r--r--synapse/storage/databases/main/cache.py6
-rw-r--r--synapse/storage/databases/main/deviceinbox.py3
-rw-r--r--synapse/storage/databases/main/events.py54
-rw-r--r--synapse/storage/databases/main/events_worker.py4
-rw-r--r--synapse/storage/databases/main/registration.py2
-rw-r--r--synapse/storage/databases/main/relations.py135
-rw-r--r--synapse/storage/databases/main/roommember.py14
-rw-r--r--synapse/storage/databases/main/search.py13
-rw-r--r--synapse/storage/databases/main/stream.py18
-rw-r--r--synapse/storage/relations.py31
-rw-r--r--synapse/storage/schema/main/delta/30/as_users.py1
-rw-r--r--synapse/storage/state.py8
13 files changed, 188 insertions, 148 deletions
diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py
index d64910ad..08c6eabc 100644
--- a/synapse/storage/background_updates.py
+++ b/synapse/storage/background_updates.py
@@ -60,18 +60,19 @@ class _BackgroundUpdateHandler:
class _BackgroundUpdateContextManager:
- BACKGROUND_UPDATE_INTERVAL_MS = 1000
- BACKGROUND_UPDATE_DURATION_MS = 100
-
- def __init__(self, sleep: bool, clock: Clock):
+ def __init__(
+ self, sleep: bool, clock: Clock, sleep_duration_ms: int, update_duration: int
+ ):
self._sleep = sleep
self._clock = clock
+ self._sleep_duration_ms = sleep_duration_ms
+ self._update_duration_ms = update_duration
async def __aenter__(self) -> int:
if self._sleep:
- await self._clock.sleep(self.BACKGROUND_UPDATE_INTERVAL_MS / 1000)
+ await self._clock.sleep(self._sleep_duration_ms / 1000)
- return self.BACKGROUND_UPDATE_DURATION_MS
+ return self._update_duration_ms
async def __aexit__(self, *exc) -> None:
pass
@@ -102,10 +103,12 @@ class BackgroundUpdatePerformance:
Returns:
A duration in ms as a float
"""
- if self.avg_duration_ms == 0:
- return 0
- elif self.total_item_count == 0:
+ # We want to return None if this is the first background update item
+ if self.total_item_count == 0:
return None
+ # Avoid dividing by zero
+ elif self.avg_duration_ms == 0:
+ return 0
else:
# Use the exponential moving average so that we can adapt to
# changes in how long the update process takes.
@@ -131,9 +134,6 @@ class BackgroundUpdater:
process and autotuning the batch size.
"""
- MINIMUM_BACKGROUND_BATCH_SIZE = 1
- DEFAULT_BACKGROUND_BATCH_SIZE = 100
-
def __init__(self, hs: "HomeServer", database: "DatabasePool"):
self._clock = hs.get_clock()
self.db_pool = database
@@ -158,6 +158,14 @@ class BackgroundUpdater:
# enable/disable background updates via the admin API.
self.enabled = True
+ self.minimum_background_batch_size = hs.config.background_updates.min_batch_size
+ self.default_background_batch_size = (
+ hs.config.background_updates.default_batch_size
+ )
+ self.update_duration_ms = hs.config.background_updates.update_duration_ms
+ self.sleep_duration_ms = hs.config.background_updates.sleep_duration_ms
+ self.sleep_enabled = hs.config.background_updates.sleep_enabled
+
def register_update_controller_callbacks(
self,
on_update: ON_UPDATE_CALLBACK,
@@ -214,7 +222,9 @@ class BackgroundUpdater:
if self._on_update_callback is not None:
return self._on_update_callback(update_name, database_name, oneshot)
- return _BackgroundUpdateContextManager(sleep, self._clock)
+ return _BackgroundUpdateContextManager(
+ sleep, self._clock, self.sleep_duration_ms, self.update_duration_ms
+ )
async def _default_batch_size(self, update_name: str, database_name: str) -> int:
"""The batch size to use for the first iteration of a new background
@@ -223,7 +233,7 @@ class BackgroundUpdater:
if self._default_batch_size_callback is not None:
return await self._default_batch_size_callback(update_name, database_name)
- return self.DEFAULT_BACKGROUND_BATCH_SIZE
+ return self.default_background_batch_size
async def _min_batch_size(self, update_name: str, database_name: str) -> int:
"""A lower bound on the batch size of a new background update.
@@ -233,7 +243,7 @@ class BackgroundUpdater:
if self._min_batch_size_callback is not None:
return await self._min_batch_size_callback(update_name, database_name)
- return self.MINIMUM_BACKGROUND_BATCH_SIZE
+ return self.minimum_background_batch_size
def get_current_update(self) -> Optional[BackgroundUpdatePerformance]:
"""Returns the current background update, if any."""
@@ -252,9 +262,12 @@ class BackgroundUpdater:
if self.enabled:
# if we start a new background update, not all updates are done.
self._all_done = False
- run_as_background_process("background_updates", self.run_background_updates)
+ sleep = self.sleep_enabled
+ run_as_background_process(
+ "background_updates", self.run_background_updates, sleep
+ )
- async def run_background_updates(self, sleep: bool = True) -> None:
+ async def run_background_updates(self, sleep: bool) -> None:
if self._running or not self.enabled:
return
diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py
index c428dd55..d6a2df1a 100644
--- a/synapse/storage/databases/main/cache.py
+++ b/synapse/storage/databases/main/cache.py
@@ -191,6 +191,10 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
if redacts:
self._invalidate_get_event_cache(redacts)
+ # Caches which might leak edits must be invalidated for the event being
+ # redacted.
+ self.get_relations_for_event.invalidate((redacts,))
+ self.get_applicable_edit.invalidate((redacts,))
if etype == EventTypes.Member:
self._membership_stream_cache.entity_has_changed(state_key, stream_ordering)
@@ -200,6 +204,8 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
self.get_relations_for_event.invalidate((relates_to,))
self.get_aggregation_groups_for_event.invalidate((relates_to,))
self.get_applicable_edit.invalidate((relates_to,))
+ self.get_thread_summary.invalidate((relates_to,))
+ self.get_thread_participated.invalidate((relates_to,))
async def invalidate_cache_and_stream(self, cache_name: str, keys: Tuple[Any, ...]):
"""Invalidates the cache and adds it to the cache stream so slaves
diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py
index 1392363d..b4a1b041 100644
--- a/synapse/storage/databases/main/deviceinbox.py
+++ b/synapse/storage/databases/main/deviceinbox.py
@@ -298,6 +298,9 @@ class DeviceInboxWorkerStore(SQLBaseStore):
# This user has new messages sent to them. Query messages for them
user_ids_to_query.add(user_id)
+ if not user_ids_to_query:
+ return {}, to_stream_id
+
def get_device_messages_txn(txn: LoggingTransaction):
# Build a query to select messages from any of the given devices that
# are between the given stream id bounds.
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index ca2a9ba9..1f60aef1 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -1518,7 +1518,7 @@ class PersistEventsStore:
)
# Remove from relations table.
- self._handle_redaction(txn, event.redacts)
+ self._handle_redact_relations(txn, event.redacts)
# Update the event_forward_extremities, event_backward_extremities and
# event_edges tables.
@@ -1619,9 +1619,12 @@ class PersistEventsStore:
txn.call_after(prefill)
- def _store_redaction(self, txn, event):
- # invalidate the cache for the redacted event
+ def _store_redaction(self, txn: LoggingTransaction, event: EventBase) -> None:
+ # Invalidate the caches for the redacted event, note that these caches
+ # are also cleared as part of event replication in _invalidate_caches_for_event.
txn.call_after(self.store._invalidate_get_event_cache, event.redacts)
+ txn.call_after(self.store.get_relations_for_event.invalidate, (event.redacts,))
+ txn.call_after(self.store.get_applicable_edit.invalidate, (event.redacts,))
self.db_pool.simple_upsert_txn(
txn,
@@ -1811,10 +1814,11 @@ class PersistEventsStore:
if rel_type == RelationTypes.REPLACE:
txn.call_after(self.store.get_applicable_edit.invalidate, (parent_id,))
- if rel_type == RelationTypes.THREAD:
- txn.call_after(
- self.store.get_thread_summary.invalidate, (parent_id, event.room_id)
- )
+ if (
+ rel_type == RelationTypes.THREAD
+ or rel_type == RelationTypes.UNSTABLE_THREAD
+ ):
+ txn.call_after(self.store.get_thread_summary.invalidate, (parent_id,))
# It should be safe to only invalidate the cache if the user has not
# previously participated in the thread, but that's difficult (and
# potentially error-prone) so it is always invalidated.
@@ -1943,15 +1947,43 @@ class PersistEventsStore:
txn.execute(sql, (batch_id,))
- def _handle_redaction(self, txn, redacted_event_id):
- """Handles receiving a redaction and checking whether we need to remove
- any redacted relations from the database.
+ def _handle_redact_relations(
+ self, txn: LoggingTransaction, redacted_event_id: str
+ ) -> None:
+ """Handles receiving a redaction and checking whether the redacted event
+ has any relations which must be removed from the database.
Args:
txn
- redacted_event_id (str): The event that was redacted.
+ redacted_event_id: The event that was redacted.
"""
+ # Fetch the current relation of the event being redacted.
+ redacted_relates_to = self.db_pool.simple_select_one_onecol_txn(
+ txn,
+ table="event_relations",
+ keyvalues={"event_id": redacted_event_id},
+ retcol="relates_to_id",
+ allow_none=True,
+ )
+ # Any relation information for the related event must be cleared.
+ if redacted_relates_to is not None:
+ self.store._invalidate_cache_and_stream(
+ txn, self.store.get_relations_for_event, (redacted_relates_to,)
+ )
+ self.store._invalidate_cache_and_stream(
+ txn, self.store.get_aggregation_groups_for_event, (redacted_relates_to,)
+ )
+ self.store._invalidate_cache_and_stream(
+ txn, self.store.get_applicable_edit, (redacted_relates_to,)
+ )
+ self.store._invalidate_cache_and_stream(
+ txn, self.store.get_thread_summary, (redacted_relates_to,)
+ )
+ self.store._invalidate_cache_and_stream(
+ txn, self.store.get_thread_participated, (redacted_relates_to,)
+ )
+
self.db_pool.simple_delete_txn(
txn, table="event_relations", keyvalues={"event_id": redacted_event_id}
)
diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index 26784f75..59454a47 100644
--- a/synapse/storage/databases/main/events_worker.py
+++ b/synapse/storage/databases/main/events_worker.py
@@ -1286,7 +1286,7 @@ class EventsWorkerStore(SQLBaseStore):
)
return {eid for ((_rid, eid), have_event) in res.items() if have_event}
- @cachedList("have_seen_event", "keys")
+ @cachedList(cached_method_name="have_seen_event", list_name="keys")
async def _have_seen_events_dict(
self, keys: Iterable[Tuple[str, str]]
) -> Dict[Tuple[str, str], bool]:
@@ -1954,7 +1954,7 @@ class EventsWorkerStore(SQLBaseStore):
get_event_id_for_timestamp_txn,
)
- @cachedList("is_partial_state_event", list_name="event_ids")
+ @cachedList(cached_method_name="is_partial_state_event", list_name="event_ids")
async def get_partial_state_events(
self, event_ids: Collection[str]
) -> Dict[str, bool]:
diff --git a/synapse/storage/databases/main/registration.py b/synapse/storage/databases/main/registration.py
index dc666523..a698d10c 100644
--- a/synapse/storage/databases/main/registration.py
+++ b/synapse/storage/databases/main/registration.py
@@ -48,8 +48,6 @@ class ExternalIDReuseException(Exception):
"""Exception if writing an external id for a user fails,
because this external id is given to an other user."""
- pass
-
@attr.s(frozen=True, slots=True, auto_attribs=True)
class TokenLookupResult:
diff --git a/synapse/storage/databases/main/relations.py b/synapse/storage/databases/main/relations.py
index 36aa1092..c4869d64 100644
--- a/synapse/storage/databases/main/relations.py
+++ b/synapse/storage/databases/main/relations.py
@@ -91,10 +91,11 @@ class RelationsWorkerStore(SQLBaseStore):
self._msc3440_enabled = hs.config.experimental.msc3440_enabled
- @cached(tree=True)
+ @cached(uncached_args=("event",), tree=True)
async def get_relations_for_event(
self,
event_id: str,
+ event: EventBase,
room_id: str,
relation_type: Optional[str] = None,
event_type: Optional[str] = None,
@@ -108,6 +109,7 @@ class RelationsWorkerStore(SQLBaseStore):
Args:
event_id: Fetch events that relate to this event ID.
+ event: The matching EventBase to event_id.
room_id: The room the event belongs to.
relation_type: Only fetch events with this relation type, if given.
event_type: Only fetch events with this event type, if given.
@@ -122,9 +124,13 @@ class RelationsWorkerStore(SQLBaseStore):
List of event IDs that match relations requested. The rows are of
the form `{"event_id": "..."}`.
"""
+ # We don't use `event_id`, it's there so that we can cache based on
+ # it. The `event_id` must match the `event.event_id`.
+ assert event.event_id == event_id
where_clause = ["relates_to_id = ?", "room_id = ?"]
- where_args: List[Union[str, int]] = [event_id, room_id]
+ where_args: List[Union[str, int]] = [event.event_id, room_id]
+ is_redacted = event.internal_metadata.is_redacted()
if relation_type is not None:
where_clause.append("relation_type = ?")
@@ -157,7 +163,7 @@ class RelationsWorkerStore(SQLBaseStore):
order = "ASC"
sql = """
- SELECT event_id, topological_ordering, stream_ordering
+ SELECT event_id, relation_type, topological_ordering, stream_ordering
FROM event_relations
INNER JOIN events USING (event_id)
WHERE %s
@@ -178,9 +184,12 @@ class RelationsWorkerStore(SQLBaseStore):
last_stream_id = None
events = []
for row in txn:
- events.append({"event_id": row[0]})
- last_topo_id = row[1]
- last_stream_id = row[2]
+ # Do not include edits for redacted events as they leak event
+ # content.
+ if not is_redacted or row[1] != RelationTypes.REPLACE:
+ events.append({"event_id": row[0]})
+ last_topo_id = row[2]
+ last_stream_id = row[3]
# If there are more events, generate the next pagination key.
next_token = None
@@ -499,7 +508,7 @@ class RelationsWorkerStore(SQLBaseStore):
AND parent.room_id = child.room_id
WHERE
%s
- AND relation_type = ?
+ AND %s
ORDER BY parent.event_id, child.topological_ordering DESC, child.stream_ordering DESC
"""
else:
@@ -514,16 +523,22 @@ class RelationsWorkerStore(SQLBaseStore):
AND parent.room_id = child.room_id
WHERE
%s
- AND relation_type = ?
+ AND %s
ORDER BY child.topological_ordering DESC, child.stream_ordering DESC
"""
clause, args = make_in_list_sql_clause(
txn.database_engine, "relates_to_id", event_ids
)
- args.append(RelationTypes.THREAD)
- txn.execute(sql % (clause,), args)
+ if self._msc3440_enabled:
+ relations_clause = "(relation_type = ? OR relation_type = ?)"
+ args.extend((RelationTypes.THREAD, RelationTypes.UNSTABLE_THREAD))
+ else:
+ relations_clause = "relation_type = ?"
+ args.append(RelationTypes.THREAD)
+
+ txn.execute(sql % (clause, relations_clause), args)
latest_event_ids = {}
for parent_event_id, child_event_id in txn:
# Only consider the latest threaded reply (by topological ordering).
@@ -543,7 +558,7 @@ class RelationsWorkerStore(SQLBaseStore):
AND parent.room_id = child.room_id
WHERE
%s
- AND relation_type = ?
+ AND %s
GROUP BY parent.event_id
"""
@@ -552,9 +567,15 @@ class RelationsWorkerStore(SQLBaseStore):
clause, args = make_in_list_sql_clause(
txn.database_engine, "relates_to_id", latest_event_ids.keys()
)
- args.append(RelationTypes.THREAD)
- txn.execute(sql % (clause,), args)
+ if self._msc3440_enabled:
+ relations_clause = "(relation_type = ? OR relation_type = ?)"
+ args.extend((RelationTypes.THREAD, RelationTypes.UNSTABLE_THREAD))
+ else:
+ relations_clause = "relation_type = ?"
+ args.append(RelationTypes.THREAD)
+
+ txn.execute(sql % (clause, relations_clause), args)
counts = dict(cast(List[Tuple[str, int]], txn.fetchall()))
return counts, latest_event_ids
@@ -617,16 +638,24 @@ class RelationsWorkerStore(SQLBaseStore):
AND parent.room_id = child.room_id
WHERE
%s
- AND relation_type = ?
+ AND %s
AND child.sender = ?
"""
clause, args = make_in_list_sql_clause(
txn.database_engine, "relates_to_id", event_ids
)
- args.extend((RelationTypes.THREAD, user_id))
- txn.execute(sql % (clause,), args)
+ if self._msc3440_enabled:
+ relations_clause = "(relation_type = ? OR relation_type = ?)"
+ args.extend((RelationTypes.THREAD, RelationTypes.UNSTABLE_THREAD))
+ else:
+ relations_clause = "relation_type = ?"
+ args.append(RelationTypes.THREAD)
+
+ args.append(user_id)
+
+ txn.execute(sql % (clause, relations_clause), args)
return {row[0] for row in txn.fetchall()}
participated_threads = await self.db_pool.runInteraction(
@@ -776,7 +805,7 @@ class RelationsWorkerStore(SQLBaseStore):
)
references = await self.get_relations_for_event(
- event_id, room_id, RelationTypes.REFERENCE, direction="f"
+ event_id, event, room_id, RelationTypes.REFERENCE, direction="f"
)
if references.chunk:
aggregations.references = await references.to_dict(cast("DataStore", self))
@@ -797,59 +826,51 @@ class RelationsWorkerStore(SQLBaseStore):
A map of event ID to the bundled aggregation for the event. Not all
events may have bundled aggregations in the results.
"""
- # The already processed event IDs. Tracked separately from the result
- # since the result omits events which do not have bundled aggregations.
- seen_event_ids = set()
-
- # State events and redacted events do not get bundled aggregations.
- events = [
- event
- for event in events
- if not event.is_state() and not event.internal_metadata.is_redacted()
- ]
+ # De-duplicate events by ID to handle the same event requested multiple times.
+ #
+ # State events do not get bundled aggregations.
+ events_by_id = {
+ event.event_id: event for event in events if not event.is_state()
+ }
# event ID -> bundled aggregation in non-serialized form.
results: Dict[str, BundledAggregations] = {}
# Fetch other relations per event.
- for event in events:
- # De-duplicate events by ID to handle the same event requested multiple
- # times. The caches that _get_bundled_aggregation_for_event use should
- # capture this, but best to reduce work.
- if event.event_id in seen_event_ids:
- continue
- seen_event_ids.add(event.event_id)
-
+ for event in events_by_id.values():
event_result = await self._get_bundled_aggregation_for_event(event, user_id)
if event_result:
results[event.event_id] = event_result
- # Fetch any edits.
- edits = await self._get_applicable_edits(seen_event_ids)
+ # Fetch any edits (but not for redacted events).
+ edits = await self._get_applicable_edits(
+ [
+ event_id
+ for event_id, event in events_by_id.items()
+ if not event.internal_metadata.is_redacted()
+ ]
+ )
for event_id, edit in edits.items():
results.setdefault(event_id, BundledAggregations()).replace = edit
# Fetch thread summaries.
- if self._msc3440_enabled:
- summaries = await self._get_thread_summaries(seen_event_ids)
- # Only fetch participated for a limited selection based on what had
- # summaries.
- participated = await self._get_threads_participated(
- summaries.keys(), user_id
- )
- for event_id, summary in summaries.items():
- if summary:
- thread_count, latest_thread_event, edit = summary
- results.setdefault(
- event_id, BundledAggregations()
- ).thread = _ThreadAggregation(
- latest_event=latest_thread_event,
- latest_edit=edit,
- count=thread_count,
- # If there's a thread summary it must also exist in the
- # participated dictionary.
- current_user_participated=participated[event_id],
- )
+ summaries = await self._get_thread_summaries(events_by_id.keys())
+ # Only fetch participated for a limited selection based on what had
+ # summaries.
+ participated = await self._get_threads_participated(summaries.keys(), user_id)
+ for event_id, summary in summaries.items():
+ if summary:
+ thread_count, latest_thread_event, edit = summary
+ results.setdefault(
+ event_id, BundledAggregations()
+ ).thread = _ThreadAggregation(
+ latest_event=latest_thread_event,
+ latest_edit=edit,
+ count=thread_count,
+ # If there's a thread summary it must also exist in the
+ # participated dictionary.
+ current_user_participated=participated[event_id],
+ )
return results
diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py
index e48ec5f4..bef675b8 100644
--- a/synapse/storage/databases/main/roommember.py
+++ b/synapse/storage/databases/main/roommember.py
@@ -46,7 +46,7 @@ from synapse.storage.roommember import (
ProfileInfo,
RoomsForUser,
)
-from synapse.types import PersistedEventPosition, StateMap, get_domain_from_id
+from synapse.types import PersistedEventPosition, get_domain_from_id
from synapse.util.async_helpers import Linearizer
from synapse.util.caches import intern_string
from synapse.util.caches.descriptors import _CacheContext, cached, cachedList
@@ -273,7 +273,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
txn.execute(sql, (room_id,))
res = {}
for count, membership in txn:
- summary = res.setdefault(membership, MemberSummary([], count))
+ res.setdefault(membership, MemberSummary([], count))
# we order by membership and then fairly arbitrarily by event_id so
# heroes are consistent
@@ -839,18 +839,14 @@ class RoomMemberWorkerStore(EventsWorkerStore):
with Measure(self._clock, "get_joined_hosts"):
return await self._get_joined_hosts(
- room_id, state_group, state_entry.state, state_entry=state_entry
+ room_id, state_group, state_entry=state_entry
)
@cached(num_args=2, max_entries=10000, iterable=True)
async def _get_joined_hosts(
- self,
- room_id: str,
- state_group: int,
- current_state_ids: StateMap[str],
- state_entry: "_StateCacheEntry",
+ self, room_id: str, state_group: int, state_entry: "_StateCacheEntry"
) -> FrozenSet[str]:
- # We don't use `state_group`, its there so that we can cache based on
+ # We don't use `state_group`, it's there so that we can cache based on
# it. However, its important that its never None, since two
# current_state's with a state_group of None are likely to be different.
#
diff --git a/synapse/storage/databases/main/search.py b/synapse/storage/databases/main/search.py
index e23b1190..c5e9010c 100644
--- a/synapse/storage/databases/main/search.py
+++ b/synapse/storage/databases/main/search.py
@@ -125,9 +125,6 @@ class SearchBackgroundUpdateStore(SearchWorkerStore):
):
super().__init__(database, db_conn, hs)
- if not hs.config.server.enable_search:
- return
-
self.db_pool.updates.register_background_update_handler(
self.EVENT_SEARCH_UPDATE_NAME, self._background_reindex_search
)
@@ -243,9 +240,13 @@ class SearchBackgroundUpdateStore(SearchWorkerStore):
return len(event_search_rows)
- result = await self.db_pool.runInteraction(
- self.EVENT_SEARCH_UPDATE_NAME, reindex_search_txn
- )
+ if self.hs.config.server.enable_search:
+ result = await self.db_pool.runInteraction(
+ self.EVENT_SEARCH_UPDATE_NAME, reindex_search_txn
+ )
+ else:
+ # Don't index anything if search is not enabled.
+ result = 0
if not result:
await self.db_pool.updates._end_background_update(
diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py
index a898f847..39e1efe3 100644
--- a/synapse/storage/databases/main/stream.py
+++ b/synapse/storage/databases/main/stream.py
@@ -325,21 +325,23 @@ def filter_to_clause(event_filter: Optional[Filter]) -> Tuple[str, List[str]]:
args.extend(event_filter.labels)
# Filter on relation_senders / relation types from the joined tables.
- if event_filter.relation_senders:
+ if event_filter.related_by_senders:
clauses.append(
"(%s)"
% " OR ".join(
- "related_event.sender = ?" for _ in event_filter.relation_senders
+ "related_event.sender = ?" for _ in event_filter.related_by_senders
)
)
- args.extend(event_filter.relation_senders)
+ args.extend(event_filter.related_by_senders)
- if event_filter.relation_types:
+ if event_filter.related_by_rel_types:
clauses.append(
"(%s)"
- % " OR ".join("relation_type = ?" for _ in event_filter.relation_types)
+ % " OR ".join(
+ "relation_type = ?" for _ in event_filter.related_by_rel_types
+ )
)
- args.extend(event_filter.relation_types)
+ args.extend(event_filter.related_by_rel_types)
return " AND ".join(clauses), args
@@ -1203,7 +1205,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
# If there is a filter on relation_senders and relation_types join to the
# relations table.
if event_filter and (
- event_filter.relation_senders or event_filter.relation_types
+ event_filter.related_by_senders or event_filter.related_by_rel_types
):
# Filtering by relations could cause the same event to appear multiple
# times (since there's no limit on the number of relations to an event).
@@ -1211,7 +1213,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
join_clause += """
LEFT JOIN event_relations AS relation ON (event.event_id = relation.relates_to_id)
"""
- if event_filter.relation_senders:
+ if event_filter.related_by_senders:
join_clause += """
LEFT JOIN events AS related_event ON (relation.event_id = related_event.event_id)
"""
diff --git a/synapse/storage/relations.py b/synapse/storage/relations.py
index 36ca2b82..fba27015 100644
--- a/synapse/storage/relations.py
+++ b/synapse/storage/relations.py
@@ -55,37 +55,6 @@ class PaginationChunk:
@attr.s(frozen=True, slots=True, auto_attribs=True)
-class RelationPaginationToken:
- """Pagination token for relation pagination API.
-
- As the results are in topological order, we can use the
- `topological_ordering` and `stream_ordering` fields of the events at the
- boundaries of the chunk as pagination tokens.
-
- Attributes:
- topological: The topological ordering of the boundary event
- stream: The stream ordering of the boundary event.
- """
-
- topological: int
- stream: int
-
- @staticmethod
- def from_string(string: str) -> "RelationPaginationToken":
- try:
- t, s = string.split("-")
- return RelationPaginationToken(int(t), int(s))
- except ValueError:
- raise SynapseError(400, "Invalid relation pagination token")
-
- async def to_string(self, store: "DataStore") -> str:
- return "%d-%d" % (self.topological, self.stream)
-
- def as_tuple(self) -> Tuple[Any, ...]:
- return attr.astuple(self)
-
-
-@attr.s(frozen=True, slots=True, auto_attribs=True)
class AggregationPaginationToken:
"""Pagination token for relation aggregation pagination API.
diff --git a/synapse/storage/schema/main/delta/30/as_users.py b/synapse/storage/schema/main/delta/30/as_users.py
index 22a7901e..4b4b166e 100644
--- a/synapse/storage/schema/main/delta/30/as_users.py
+++ b/synapse/storage/schema/main/delta/30/as_users.py
@@ -36,7 +36,6 @@ def run_upgrade(cur, database_engine, config, *args, **kwargs):
config_files = config.appservice.app_service_config_files
except AttributeError:
logger.warning("Could not get app_service_config_files from config")
- pass
appservices = load_appservices(config.server.server_name, config_files)
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index e79ecf64..86f1a537 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -561,7 +561,7 @@ class StateGroupStorage:
return state_group_delta.prev_group, state_group_delta.delta_ids
async def get_state_groups_ids(
- self, _room_id: str, event_ids: Iterable[str]
+ self, _room_id: str, event_ids: Collection[str]
) -> Dict[int, MutableStateMap[str]]:
"""Get the event IDs of all the state for the state groups for the given events
@@ -596,7 +596,7 @@ class StateGroupStorage:
return group_to_state[state_group]
async def get_state_groups(
- self, room_id: str, event_ids: Iterable[str]
+ self, room_id: str, event_ids: Collection[str]
) -> Dict[int, List[EventBase]]:
"""Get the state groups for the given list of event_ids
@@ -648,7 +648,7 @@ class StateGroupStorage:
return self.stores.state._get_state_groups_from_groups(groups, state_filter)
async def get_state_for_events(
- self, event_ids: Iterable[str], state_filter: Optional[StateFilter] = None
+ self, event_ids: Collection[str], state_filter: Optional[StateFilter] = None
) -> Dict[str, StateMap[EventBase]]:
"""Given a list of event_ids and type tuples, return a list of state
dicts for each event.
@@ -684,7 +684,7 @@ class StateGroupStorage:
return {event: event_to_state[event] for event in event_ids}
async def get_state_ids_for_events(
- self, event_ids: Iterable[str], state_filter: Optional[StateFilter] = None
+ self, event_ids: Collection[str], state_filter: Optional[StateFilter] = None
) -> Dict[str, StateMap[str]]:
"""
Get the state dicts corresponding to a list of events, containing the event_ids