summaryrefslogtreecommitdiff
path: root/synapse/handlers/federation_event.py
diff options
context:
space:
mode:
authorAndrej Shadura <andrewsh@debian.org>2022-09-01 14:21:56 +0200
committerAndrej Shadura <andrewsh@debian.org>2022-09-01 14:21:56 +0200
commita8a30d403f4d72f1dfbc16620a94bc6051b110c4 (patch)
tree2931b545e612aa8772aff3f59d42290b4ba29af1 /synapse/handlers/federation_event.py
parenta8b88be80e7adebc44ddb792a812407e2656ef56 (diff)
New upstream version 1.66.0
Diffstat (limited to 'synapse/handlers/federation_event.py')
-rw-r--r--synapse/handlers/federation_event.py173
1 files changed, 144 insertions, 29 deletions
diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py
index 8968b705..048c4111 100644
--- a/synapse/handlers/federation_event.py
+++ b/synapse/handlers/federation_event.py
@@ -29,7 +29,7 @@ from typing import (
Tuple,
)
-from prometheus_client import Counter
+from prometheus_client import Counter, Histogram
from synapse import event_auth
from synapse.api.constants import (
@@ -59,7 +59,13 @@ from synapse.events import EventBase
from synapse.events.snapshot import EventContext
from synapse.federation.federation_client import InvalidResponseError
from synapse.logging.context import nested_logging_context
-from synapse.logging.opentracing import trace
+from synapse.logging.opentracing import (
+ SynapseTags,
+ set_tag,
+ start_active_span,
+ tag_args,
+ trace,
+)
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.replication.http.devices import ReplicationUserDevicesResyncRestServlet
from synapse.replication.http.federation import (
@@ -92,6 +98,36 @@ soft_failed_event_counter = Counter(
"Events received over federation that we marked as soft_failed",
)
+# Added to debug performance and track progress on optimizations
+backfill_processing_after_timer = Histogram(
+ "synapse_federation_backfill_processing_after_time_seconds",
+ "sec",
+ [],
+ buckets=(
+ 0.1,
+ 0.25,
+ 0.5,
+ 1.0,
+ 2.5,
+ 5.0,
+ 7.5,
+ 10.0,
+ 15.0,
+ 20.0,
+ 25.0,
+ 30.0,
+ 40.0,
+ 50.0,
+ 60.0,
+ 80.0,
+ 100.0,
+ 120.0,
+ 150.0,
+ 180.0,
+ "+Inf",
+ ),
+)
+
class FederationEventHandler:
"""Handles events that originated from federation.
@@ -410,6 +446,7 @@ class FederationEventHandler:
prev_member_event,
)
+ @trace
async def process_remote_join(
self,
origin: str,
@@ -597,20 +634,21 @@ class FederationEventHandler:
if not events:
return
- # if there are any events in the wrong room, the remote server is buggy and
- # should not be trusted.
- for ev in events:
- if ev.room_id != room_id:
- raise InvalidResponseError(
- f"Remote server {dest} returned event {ev.event_id} which is in "
- f"room {ev.room_id}, when we were backfilling in {room_id}"
- )
+ with backfill_processing_after_timer.time():
+ # if there are any events in the wrong room, the remote server is buggy and
+ # should not be trusted.
+ for ev in events:
+ if ev.room_id != room_id:
+ raise InvalidResponseError(
+ f"Remote server {dest} returned event {ev.event_id} which is in "
+ f"room {ev.room_id}, when we were backfilling in {room_id}"
+ )
- await self._process_pulled_events(
- dest,
- events,
- backfilled=True,
- )
+ await self._process_pulled_events(
+ dest,
+ events,
+ backfilled=True,
+ )
@trace
async def _get_missing_events_for_pdu(
@@ -715,7 +753,7 @@ class FederationEventHandler:
@trace
async def _process_pulled_events(
- self, origin: str, events: Iterable[EventBase], backfilled: bool
+ self, origin: str, events: Collection[EventBase], backfilled: bool
) -> None:
"""Process a batch of events we have pulled from a remote server
@@ -730,6 +768,15 @@ class FederationEventHandler:
backfilled: True if this is part of a historical batch of events (inhibits
notification to clients, and validation of device keys.)
"""
+ set_tag(
+ SynapseTags.FUNC_ARG_PREFIX + "event_ids",
+ str([event.event_id for event in events]),
+ )
+ set_tag(
+ SynapseTags.FUNC_ARG_PREFIX + "event_ids.length",
+ str(len(events)),
+ )
+ set_tag(SynapseTags.FUNC_ARG_PREFIX + "backfilled", str(backfilled))
logger.debug(
"processing pulled backfilled=%s events=%s",
backfilled,
@@ -753,6 +800,7 @@ class FederationEventHandler:
await self._process_pulled_event(origin, ev, backfilled=backfilled)
@trace
+ @tag_args
async def _process_pulled_event(
self, origin: str, event: EventBase, backfilled: bool
) -> None:
@@ -854,6 +902,7 @@ class FederationEventHandler:
else:
raise
+ @trace
async def _compute_event_context_with_maybe_missing_prevs(
self, dest: str, event: EventBase
) -> EventContext:
@@ -970,6 +1019,8 @@ class FederationEventHandler:
event, state_ids_before_event=state_map, partial_state=partial_state
)
+ @trace
+ @tag_args
async def _get_state_ids_after_missing_prev_event(
self,
destination: str,
@@ -1009,10 +1060,10 @@ class FederationEventHandler:
logger.debug("Fetching %i events from cache/store", len(desired_events))
have_events = await self._store.have_seen_events(room_id, desired_events)
- missing_desired_events = desired_events - have_events
+ missing_desired_event_ids = desired_events - have_events
logger.debug(
"We are missing %i events (got %i)",
- len(missing_desired_events),
+ len(missing_desired_event_ids),
len(have_events),
)
@@ -1024,13 +1075,30 @@ class FederationEventHandler:
# already have a bunch of the state events. It would be nice if the
# federation api gave us a way of finding out which we actually need.
- missing_auth_events = set(auth_event_ids) - have_events
- missing_auth_events.difference_update(
- await self._store.have_seen_events(room_id, missing_auth_events)
+ missing_auth_event_ids = set(auth_event_ids) - have_events
+ missing_auth_event_ids.difference_update(
+ await self._store.have_seen_events(room_id, missing_auth_event_ids)
)
- logger.debug("We are also missing %i auth events", len(missing_auth_events))
+ logger.debug("We are also missing %i auth events", len(missing_auth_event_ids))
- missing_events = missing_desired_events | missing_auth_events
+ missing_event_ids = missing_desired_event_ids | missing_auth_event_ids
+
+ set_tag(
+ SynapseTags.RESULT_PREFIX + "missing_auth_event_ids",
+ str(missing_auth_event_ids),
+ )
+ set_tag(
+ SynapseTags.RESULT_PREFIX + "missing_auth_event_ids.length",
+ str(len(missing_auth_event_ids)),
+ )
+ set_tag(
+ SynapseTags.RESULT_PREFIX + "missing_desired_event_ids",
+ str(missing_desired_event_ids),
+ )
+ set_tag(
+ SynapseTags.RESULT_PREFIX + "missing_desired_event_ids.length",
+ str(len(missing_desired_event_ids)),
+ )
# Making an individual request for each of 1000s of events has a lot of
# overhead. On the other hand, we don't really want to fetch all of the events
@@ -1041,13 +1109,13 @@ class FederationEventHandler:
#
# TODO: might it be better to have an API which lets us do an aggregate event
# request
- if (len(missing_events) * 10) >= len(auth_event_ids) + len(state_event_ids):
+ if (len(missing_event_ids) * 10) >= len(auth_event_ids) + len(state_event_ids):
logger.debug("Requesting complete state from remote")
await self._get_state_and_persist(destination, room_id, event_id)
else:
- logger.debug("Fetching %i events from remote", len(missing_events))
+ logger.debug("Fetching %i events from remote", len(missing_event_ids))
await self._get_events_and_persist(
- destination=destination, room_id=room_id, event_ids=missing_events
+ destination=destination, room_id=room_id, event_ids=missing_event_ids
)
# We now need to fill out the state map, which involves fetching the
@@ -1104,6 +1172,14 @@ class FederationEventHandler:
event_id,
failed_to_fetch,
)
+ set_tag(
+ SynapseTags.RESULT_PREFIX + "failed_to_fetch",
+ str(failed_to_fetch),
+ )
+ set_tag(
+ SynapseTags.RESULT_PREFIX + "failed_to_fetch.length",
+ str(len(failed_to_fetch)),
+ )
if remote_event.is_state() and remote_event.rejected_reason is None:
state_map[
@@ -1112,6 +1188,8 @@ class FederationEventHandler:
return state_map
+ @trace
+ @tag_args
async def _get_state_and_persist(
self, destination: str, room_id: str, event_id: str
) -> None:
@@ -1133,6 +1211,7 @@ class FederationEventHandler:
destination=destination, room_id=room_id, event_ids=(event_id,)
)
+ @trace
async def _process_received_pdu(
self,
origin: str,
@@ -1283,6 +1362,7 @@ class FederationEventHandler:
except Exception:
logger.exception("Failed to resync device for %s", sender)
+ @trace
async def _handle_marker_event(self, origin: str, marker_event: EventBase) -> None:
"""Handles backfilling the insertion event when we receive a marker
event that points to one.
@@ -1314,7 +1394,7 @@ class FederationEventHandler:
logger.debug("_handle_marker_event: received %s", marker_event)
insertion_event_id = marker_event.content.get(
- EventContentFields.MSC2716_MARKER_INSERTION
+ EventContentFields.MSC2716_INSERTION_EVENT_REFERENCE
)
if insertion_event_id is None:
@@ -1414,6 +1494,8 @@ class FederationEventHandler:
return event_from_response
+ @trace
+ @tag_args
async def _get_events_and_persist(
self, destination: str, room_id: str, event_ids: Collection[str]
) -> None:
@@ -1459,6 +1541,7 @@ class FederationEventHandler:
logger.info("Fetched %i events of %i requested", len(events), len(event_ids))
await self._auth_and_persist_outliers(room_id, events)
+ @trace
async def _auth_and_persist_outliers(
self, room_id: str, events: Iterable[EventBase]
) -> None:
@@ -1477,6 +1560,16 @@ class FederationEventHandler:
"""
event_map = {event.event_id: event for event in events}
+ event_ids = event_map.keys()
+ set_tag(
+ SynapseTags.FUNC_ARG_PREFIX + "event_ids",
+ str(event_ids),
+ )
+ set_tag(
+ SynapseTags.FUNC_ARG_PREFIX + "event_ids.length",
+ str(len(event_ids)),
+ )
+
# filter out any events we have already seen. This might happen because
# the events were eagerly pushed to us (eg, during a room join), or because
# another thread has raced against us since we decided to request the event.
@@ -1593,6 +1686,7 @@ class FederationEventHandler:
backfilled=True,
)
+ @trace
async def _check_event_auth(
self, origin: Optional[str], event: EventBase, context: EventContext
) -> None:
@@ -1631,6 +1725,14 @@ class FederationEventHandler:
claimed_auth_events = await self._load_or_fetch_auth_events_for_event(
origin, event
)
+ set_tag(
+ SynapseTags.RESULT_PREFIX + "claimed_auth_events",
+ str([ev.event_id for ev in claimed_auth_events]),
+ )
+ set_tag(
+ SynapseTags.RESULT_PREFIX + "claimed_auth_events.length",
+ str(len(claimed_auth_events)),
+ )
# ... and check that the event passes auth at those auth events.
# https://spec.matrix.org/v1.3/server-server-api/#checks-performed-on-receipt-of-a-pdu:
@@ -1728,6 +1830,7 @@ class FederationEventHandler:
)
context.rejected = RejectedReason.AUTH_ERROR
+ @trace
async def _maybe_kick_guest_users(self, event: EventBase) -> None:
if event.type != EventTypes.GuestAccess:
return
@@ -1935,6 +2038,8 @@ class FederationEventHandler:
# instead we raise an AuthError, which will make the caller ignore it.
raise AuthError(code=HTTPStatus.FORBIDDEN, msg="Auth events could not be found")
+ @trace
+ @tag_args
async def _get_remote_auth_chain_for_event(
self, destination: str, room_id: str, event_id: str
) -> None:
@@ -1963,6 +2068,7 @@ class FederationEventHandler:
await self._auth_and_persist_outliers(room_id, remote_auth_events)
+ @trace
async def _run_push_actions_and_persist_event(
self, event: EventBase, context: EventContext, backfilled: bool = False
) -> None:
@@ -2071,8 +2177,17 @@ class FederationEventHandler:
self._message_handler.maybe_schedule_expiry(event)
if not backfilled: # Never notify for backfilled events
- for event in events:
- await self._notify_persisted_event(event, max_stream_token)
+ with start_active_span("notify_persisted_events"):
+ set_tag(
+ SynapseTags.RESULT_PREFIX + "event_ids",
+ str([ev.event_id for ev in events]),
+ )
+ set_tag(
+ SynapseTags.RESULT_PREFIX + "event_ids.length",
+ str(len(events)),
+ )
+ for event in events:
+ await self._notify_persisted_event(event, max_stream_token)
return max_stream_token.stream