summaryrefslogtreecommitdiff
path: root/synapse/storage/databases/main/events.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/databases/main/events.py')
-rw-r--r--synapse/storage/databases/main/events.py134
1 files changed, 86 insertions, 48 deletions
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index 40b53274..8e691678 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -575,7 +575,13 @@ class PersistEventsStore:
missing_auth_chains.clear()
- for auth_id, event_type, state_key, chain_id, sequence_number in txn:
+ for (
+ auth_id,
+ event_type,
+ state_key,
+ chain_id,
+ sequence_number,
+ ) in txn.fetchall():
event_to_types[auth_id] = (event_type, state_key)
if chain_id is None:
@@ -1379,18 +1385,18 @@ class PersistEventsStore:
# If we're persisting an unredacted event we go and ensure
# that we mark any redactions that reference this event as
# requiring censoring.
- sql = "UPDATE redactions SET have_censored = ? WHERE redacts = ?"
- txn.execute_batch(
- sql,
- (
- (
- False,
- event.event_id,
- )
- for event, _ in events_and_contexts
- if not event.internal_metadata.is_redacted()
- ),
+ unredacted_events = [
+ event.event_id
+ for event, _ in events_and_contexts
+ if not event.internal_metadata.is_redacted()
+ ]
+ sql = "UPDATE redactions SET have_censored = ? WHERE "
+ clause, args = make_in_list_sql_clause(
+ self.database_engine,
+ "redacts",
+ unredacted_events,
)
+ txn.execute(sql + clause, [False] + args)
state_events_and_contexts = [
ec for ec in events_and_contexts if ec[0].is_state()
@@ -1541,35 +1547,32 @@ class PersistEventsStore:
to_prefill = []
rows = []
- N = 200
- for i in range(0, len(events_and_contexts), N):
- ev_map = {e[0].event_id: e[0] for e in events_and_contexts[i : i + N]}
- if not ev_map:
- break
-
- sql = (
- "SELECT "
- " e.event_id as event_id, "
- " r.redacts as redacts,"
- " rej.event_id as rejects "
- " FROM events as e"
- " LEFT JOIN rejections as rej USING (event_id)"
- " LEFT JOIN redactions as r ON e.event_id = r.redacts"
- " WHERE "
- )
- clause, args = make_in_list_sql_clause(
- self.database_engine, "e.event_id", list(ev_map)
- )
+ ev_map = {e.event_id: e for e, _ in events_and_contexts}
+ if not ev_map:
+ return
- txn.execute(sql + clause, args)
- rows = self.db_pool.cursor_to_dict(txn)
- for row in rows:
- event = ev_map[row["event_id"]]
- if not row["rejects"] and not row["redacts"]:
- to_prefill.append(
- _EventCacheEntry(event=event, redacted_event=None)
- )
+ sql = (
+ "SELECT "
+ " e.event_id as event_id, "
+ " r.redacts as redacts,"
+ " rej.event_id as rejects "
+ " FROM events as e"
+ " LEFT JOIN rejections as rej USING (event_id)"
+ " LEFT JOIN redactions as r ON e.event_id = r.redacts"
+ " WHERE "
+ )
+
+ clause, args = make_in_list_sql_clause(
+ self.database_engine, "e.event_id", list(ev_map)
+ )
+
+ txn.execute(sql + clause, args)
+ rows = self.db_pool.cursor_to_dict(txn)
+ for row in rows:
+ event = ev_map[row["event_id"]]
+ if not row["rejects"] and not row["redacts"]:
+ to_prefill.append(_EventCacheEntry(event=event, redacted_event=None))
def prefill():
for cache_entry in to_prefill:
@@ -1770,10 +1773,21 @@ class PersistEventsStore:
# Not a insertion event
return
- # Skip processing a insertion event if the room version doesn't
- # support it.
+ # Skip processing an insertion event if the room version doesn't
+ # support it or the event is not from the room creator.
room_version = self.store.get_room_version_txn(txn, event.room_id)
- if not room_version.msc2716_historical:
+ room_creator = self.db_pool.simple_select_one_onecol_txn(
+ txn,
+ table="rooms",
+ keyvalues={"room_id": event.room_id},
+ retcol="creator",
+ allow_none=True,
+ )
+ if (
+ not room_version.msc2716_historical
+ or not self.hs.config.experimental.msc2716_enabled
+ or event.sender != room_creator
+ ):
return
next_chunk_id = event.content.get(EventContentFields.MSC2716_NEXT_CHUNK_ID)
@@ -1822,9 +1836,20 @@ class PersistEventsStore:
return
# Skip processing a chunk event if the room version doesn't
- # support it.
+ # support it or the event is not from the room creator.
room_version = self.store.get_room_version_txn(txn, event.room_id)
- if not room_version.msc2716_historical:
+ room_creator = self.db_pool.simple_select_one_onecol_txn(
+ txn,
+ table="rooms",
+ keyvalues={"room_id": event.room_id},
+ retcol="creator",
+ allow_none=True,
+ )
+ if (
+ not room_version.msc2716_historical
+ or not self.hs.config.experimental.msc2716_enabled
+ or event.sender != room_creator
+ ):
return
chunk_id = event.content.get(EventContentFields.MSC2716_CHUNK_ID)
@@ -1962,6 +1987,15 @@ class PersistEventsStore:
events_and_context.
"""
+ # Only non outlier events will have push actions associated with them,
+ # so let's filter them out. (This makes joining large rooms faster, as
+ # these queries took seconds to process all the state events).
+ non_outlier_events = [
+ event
+ for event, _ in events_and_contexts
+ if not event.internal_metadata.is_outlier()
+ ]
+
sql = """
INSERT INTO event_push_actions (
room_id, event_id, user_id, actions, stream_ordering,
@@ -1972,7 +2006,7 @@ class PersistEventsStore:
WHERE event_id = ?
"""
- if events_and_contexts:
+ if non_outlier_events:
txn.execute_batch(
sql,
(
@@ -1982,12 +2016,12 @@ class PersistEventsStore:
event.depth,
event.event_id,
)
- for event, _ in events_and_contexts
+ for event in non_outlier_events
),
)
room_to_event_ids: Dict[str, List[str]] = {}
- for e, _ in events_and_contexts:
+ for e in non_outlier_events:
room_to_event_ids.setdefault(e.room_id, []).append(e.event_id)
for room_id, event_ids in room_to_event_ids.items():
@@ -2012,7 +2046,11 @@ class PersistEventsStore:
# persisted.
txn.execute_batch(
"DELETE FROM event_push_actions_staging WHERE event_id = ?",
- ((event.event_id,) for event, _ in all_events_and_contexts),
+ (
+ (event.event_id,)
+ for event, _ in all_events_and_contexts
+ if not event.internal_metadata.is_outlier()
+ ),
)
def _remove_push_actions_for_event_id_txn(self, txn, room_id, event_id):