diff options
Diffstat (limited to 'synapse/handlers')
30 files changed, 763 insertions, 556 deletions
diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index 6a05a653..c23ccd6d 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -15,10 +15,7 @@ import logging from typing import TYPE_CHECKING, Optional -import synapse.types -from synapse.api.constants import EventTypes, Membership from synapse.api.ratelimiting import Ratelimiter -from synapse.types import UserID if TYPE_CHECKING: from synapse.server import HomeServer @@ -48,16 +45,16 @@ class BaseHandler: self.request_ratelimiter = Ratelimiter( store=self.store, clock=self.clock, rate_hz=0, burst_count=0 ) - self._rc_message = self.hs.config.rc_message + self._rc_message = self.hs.config.ratelimiting.rc_message # Check whether ratelimiting room admin message redaction is enabled # by the presence of rate limits in the config - if self.hs.config.rc_admin_redaction: + if self.hs.config.ratelimiting.rc_admin_redaction: self.admin_redaction_ratelimiter: Optional[Ratelimiter] = Ratelimiter( store=self.store, clock=self.clock, - rate_hz=self.hs.config.rc_admin_redaction.per_second, - burst_count=self.hs.config.rc_admin_redaction.burst_count, + rate_hz=self.hs.config.ratelimiting.rc_admin_redaction.per_second, + burst_count=self.hs.config.ratelimiting.rc_admin_redaction.burst_count, ) else: self.admin_redaction_ratelimiter = None @@ -115,68 +112,3 @@ class BaseHandler: burst_count=burst_count, update=update, ) - - async def maybe_kick_guest_users(self, event, context=None): - # Technically this function invalidates current_state by changing it. - # Hopefully this isn't that important to the caller. - if event.type == EventTypes.GuestAccess: - guest_access = event.content.get("guest_access", "forbidden") - if guest_access != "can_join": - if context: - current_state_ids = await context.get_current_state_ids() - current_state_dict = await self.store.get_events( - list(current_state_ids.values()) - ) - current_state = list(current_state_dict.values()) - else: - current_state_map = await self.state_handler.get_current_state( - event.room_id - ) - current_state = list(current_state_map.values()) - - logger.info("maybe_kick_guest_users %r", current_state) - await self.kick_guest_users(current_state) - - async def kick_guest_users(self, current_state): - for member_event in current_state: - try: - if member_event.type != EventTypes.Member: - continue - - target_user = UserID.from_string(member_event.state_key) - if not self.hs.is_mine(target_user): - continue - - if member_event.content["membership"] not in { - Membership.JOIN, - Membership.INVITE, - }: - continue - - if ( - "kind" not in member_event.content - or member_event.content["kind"] != "guest" - ): - continue - - # We make the user choose to leave, rather than have the - # event-sender kick them. This is partially because we don't - # need to worry about power levels, and partially because guest - # users are a concept which doesn't hugely work over federation, - # and having homeservers have their own users leave keeps more - # of that decision-making and control local to the guest-having - # homeserver. - requester = synapse.types.create_requester( - target_user, is_guest=True, authenticated_entity=self.server_name - ) - handler = self.hs.get_room_member_handler() - await handler.update_membership( - requester, - target_user, - member_event.room_id, - "leave", - ratelimit=False, - require_consent=False, - ) - except Exception as e: - logger.exception("Error kicking guest user: %s" % (e,)) diff --git a/synapse/handlers/account_validity.py b/synapse/handlers/account_validity.py index 078accd6..a9c2222f 100644 --- a/synapse/handlers/account_validity.py +++ b/synapse/handlers/account_validity.py @@ -78,7 +78,7 @@ class AccountValidityHandler: ) # Check the renewal emails to send and send them every 30min. - if hs.config.run_background_tasks: + if hs.config.worker.run_background_tasks: self.clock.looping_call(self._send_renewal_emails, 30 * 60 * 1000) self._is_user_expired_callbacks: List[IS_USER_EXPIRED_CALLBACK] = [] @@ -249,7 +249,7 @@ class AccountValidityHandler: renewal_token = await self._get_renewal_token(user_id) url = "%s_matrix/client/unstable/account_validity/renew?token=%s" % ( - self.hs.config.public_baseurl, + self.hs.config.server.public_baseurl, renewal_token, ) @@ -398,6 +398,7 @@ class AccountValidityHandler: """ now = self.clock.time_msec() if expiration_ts is None: + assert self._account_validity_period is not None expiration_ts = now + self._account_validity_period await self.store.set_account_validity_for_user( diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 4ab40466..a7b5a4e9 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -131,6 +131,8 @@ class ApplicationServicesHandler: now = self.clock.time_msec() ts = await self.store.get_received_ts(event.event_id) + assert ts is not None + synapse.metrics.event_processing_lag_by_event.labels( "appservice_sender" ).observe((now - ts) / 1000) @@ -166,6 +168,7 @@ class ApplicationServicesHandler: if events: now = self.clock.time_msec() ts = await self.store.get_received_ts(events[-1].event_id) + assert ts is not None synapse.metrics.event_processing_lag.labels( "appservice_sender" diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index 34725324..fbbf6fd8 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -244,8 +244,8 @@ class AuthHandler(BaseHandler): self._failed_uia_attempts_ratelimiter = Ratelimiter( store=self.store, clock=self.clock, - rate_hz=self.hs.config.rc_login_failed_attempts.per_second, - burst_count=self.hs.config.rc_login_failed_attempts.burst_count, + rate_hz=self.hs.config.ratelimiting.rc_login_failed_attempts.per_second, + burst_count=self.hs.config.ratelimiting.rc_login_failed_attempts.burst_count, ) # The number of seconds to keep a UI auth session active. @@ -255,14 +255,14 @@ class AuthHandler(BaseHandler): self._failed_login_attempts_ratelimiter = Ratelimiter( store=self.store, clock=hs.get_clock(), - rate_hz=self.hs.config.rc_login_failed_attempts.per_second, - burst_count=self.hs.config.rc_login_failed_attempts.burst_count, + rate_hz=self.hs.config.ratelimiting.rc_login_failed_attempts.per_second, + burst_count=self.hs.config.ratelimiting.rc_login_failed_attempts.burst_count, ) self._clock = self.hs.get_clock() # Expire old UI auth sessions after a period of time. - if hs.config.run_background_tasks: + if hs.config.worker.run_background_tasks: self._clock.looping_call( run_as_background_process, 5 * 60 * 1000, @@ -289,7 +289,7 @@ class AuthHandler(BaseHandler): hs.config.sso_account_deactivated_template ) - self._server_name = hs.config.server_name + self._server_name = hs.config.server.server_name # cast to tuple for use with str.startswith self._whitelisted_sso_clients = tuple(hs.config.sso_client_whitelist) @@ -749,7 +749,7 @@ class AuthHandler(BaseHandler): "name": self.hs.config.user_consent_policy_name, "url": "%s_matrix/consent?v=%s" % ( - self.hs.config.public_baseurl, + self.hs.config.server.public_baseurl, self.hs.config.user_consent_version, ), }, @@ -1799,7 +1799,7 @@ class MacaroonGenerator: def _generate_base_macaroon(self, user_id: str) -> pymacaroons.Macaroon: macaroon = pymacaroons.Macaroon( - location=self.hs.config.server_name, + location=self.hs.config.server.server_name, identifier="key", key=self.hs.config.macaroon_secret_key, ) diff --git a/synapse/handlers/cas.py b/synapse/handlers/cas.py index 0325f86e..47ddabbe 100644 --- a/synapse/handlers/cas.py +++ b/synapse/handlers/cas.py @@ -82,7 +82,6 @@ class CasHandler: # the SsoIdentityProvider protocol type. self.idp_icon = None self.idp_brand = None - self.unstable_idp_brand = None self._sso_handler = hs.get_sso_handler() diff --git a/synapse/handlers/deactivate_account.py b/synapse/handlers/deactivate_account.py index 45d2404d..dcd320c5 100644 --- a/synapse/handlers/deactivate_account.py +++ b/synapse/handlers/deactivate_account.py @@ -46,7 +46,7 @@ class DeactivateAccountHandler(BaseHandler): # Start the user parter loop so it can resume parting users from rooms where # it left off (if it has work left to do). - if hs.config.run_background_tasks: + if hs.config.worker.run_background_tasks: hs.get_reactor().callWhenRunning(self._start_user_parting) self._account_validity_enabled = ( @@ -131,7 +131,7 @@ class DeactivateAccountHandler(BaseHandler): await self.store.add_user_pending_deactivation(user_id) # delete from user directory - await self.user_directory_handler.handle_user_deactivated(user_id) + await self.user_directory_handler.handle_local_user_deactivated(user_id) # Mark the user as erased, if they asked for that if erase_data: diff --git a/synapse/handlers/devicemessage.py b/synapse/handlers/devicemessage.py index 679b47f0..b6a2a34a 100644 --- a/synapse/handlers/devicemessage.py +++ b/synapse/handlers/devicemessage.py @@ -84,8 +84,8 @@ class DeviceMessageHandler: self._ratelimiter = Ratelimiter( store=self.store, clock=hs.get_clock(), - rate_hz=hs.config.rc_key_requests.per_second, - burst_count=hs.config.rc_key_requests.burst_count, + rate_hz=hs.config.ratelimiting.rc_key_requests.per_second, + burst_count=hs.config.ratelimiting.rc_key_requests.burst_count, ) async def on_direct_to_device_edu(self, origin: str, content: JsonDict) -> None: diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py index d9237085..08a13756 100644 --- a/synapse/handlers/e2e_keys.py +++ b/synapse/handlers/e2e_keys.py @@ -57,7 +57,7 @@ class E2eKeysHandler: federation_registry = hs.get_federation_registry() - self._is_master = hs.config.worker_app is None + self._is_master = hs.config.worker.worker_app is None if not self._is_master: self._user_device_resync_client = ( ReplicationUserDevicesResyncRestServlet.make_client(hs) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index daf1d3bf..6754c64c 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -101,7 +101,7 @@ class FederationHandler(BaseHandler): hs ) - if hs.config.worker_app: + if hs.config.worker.worker_app: self._maybe_store_room_on_outlier_membership = ( ReplicationStoreRoomOnOutlierMembershipRestServlet.make_client(hs) ) @@ -507,6 +507,7 @@ class FederationHandler(BaseHandler): await self.store.upsert_room_on_join( room_id=room_id, room_version=room_version_obj, + auth_events=auth_chain, ) max_stream_id = await self._persist_auth_tree( @@ -1613,7 +1614,7 @@ class FederationHandler(BaseHandler): Args: room_id """ - if self.config.worker_app: + if self.config.worker.worker_app: await self._clean_room_for_join_client(room_id) else: await self.store.clean_room_for_join(room_id) diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index 9f055f00..946343fa 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -36,6 +36,7 @@ from synapse import event_auth from synapse.api.constants import ( EventContentFields, EventTypes, + GuestAccess, Membership, RejectedReason, RoomEncryptionAlgorithms, @@ -53,7 +54,6 @@ from synapse.event_auth import auth_types_for_event from synapse.events import EventBase from synapse.events.snapshot import EventContext from synapse.federation.federation_client import InvalidResponseError -from synapse.handlers._base import BaseHandler from synapse.logging.context import ( make_deferred_yieldable, nested_logging_context, @@ -116,7 +116,7 @@ class _NewEventInfo: claimed_auth_event_map: StateMap[EventBase] -class FederationEventHandler(BaseHandler): +class FederationEventHandler: """Handles events that originated from federation. Responsible for handing incoming events and passing them on to the rest @@ -124,30 +124,32 @@ class FederationEventHandler(BaseHandler): """ def __init__(self, hs: "HomeServer"): - super().__init__(hs) + self._store = hs.get_datastore() + self._storage = hs.get_storage() + self._state_store = self._storage.state - self.store = hs.get_datastore() - self.storage = hs.get_storage() - self.state_store = self.storage.state - - self.state_handler = hs.get_state_handler() - self.event_creation_handler = hs.get_event_creation_handler() + self._state_handler = hs.get_state_handler() + self._event_creation_handler = hs.get_event_creation_handler() self._event_auth_handler = hs.get_event_auth_handler() self._message_handler = hs.get_message_handler() - self.action_generator = hs.get_action_generator() + self._action_generator = hs.get_action_generator() self._state_resolution_handler = hs.get_state_resolution_handler() + # avoid a circular dependency by deferring execution here + self._get_room_member_handler = hs.get_room_member_handler - self.federation_client = hs.get_federation_client() - self.third_party_event_rules = hs.get_third_party_event_rules() + self._federation_client = hs.get_federation_client() + self._third_party_event_rules = hs.get_third_party_event_rules() + self._notifier = hs.get_notifier() - self.is_mine_id = hs.is_mine_id + self._is_mine_id = hs.is_mine_id + self._server_name = hs.hostname self._instance_name = hs.get_instance_name() - self.config = hs.config + self._config = hs.config self._ephemeral_messages_enabled = hs.config.server.enable_ephemeral_messages self._send_events = ReplicationFederationSendEventsRestServlet.make_client(hs) - if hs.config.worker_app: + if hs.config.worker.worker_app: self._user_device_resync = ( ReplicationUserDevicesResyncRestServlet.make_client(hs) ) @@ -171,11 +173,14 @@ class FederationEventHandler(BaseHandler): pdu: received PDU """ + # We should never see any outliers here. + assert not pdu.internal_metadata.outlier + room_id = pdu.room_id event_id = pdu.event_id # We reprocess pdus when we have seen them only as outliers - existing = await self.store.get_event( + existing = await self._store.get_event( event_id, allow_none=True, allow_rejected=True ) @@ -221,7 +226,7 @@ class FederationEventHandler(BaseHandler): # Note that if we were never in the room then we would have already # dropped the event, since we wouldn't know the room version. is_in_room = await self._event_auth_handler.check_host_in_room( - room_id, self.server_name + room_id, self._server_name ) if not is_in_room: logger.info( @@ -230,77 +235,71 @@ class FederationEventHandler(BaseHandler): ) return None - # Check that the event passes auth based on the state at the event. This is - # done for events that are to be added to the timeline (non-outliers). - # - # Get missing pdus if necessary: - # - Fetching any missing prev events to fill in gaps in the graph - # - Fetching state if we have a hole in the graph - if not pdu.internal_metadata.is_outlier(): - prevs = set(pdu.prev_event_ids()) - seen = await self.store.have_events_in_timeline(prevs) - missing_prevs = prevs - seen + # Try to fetch any missing prev events to fill in gaps in the graph + prevs = set(pdu.prev_event_ids()) + seen = await self._store.have_events_in_timeline(prevs) + missing_prevs = prevs - seen - if missing_prevs: - # We only backfill backwards to the min depth. - min_depth = await self.get_min_depth_for_context(pdu.room_id) - logger.debug("min_depth: %d", min_depth) + if missing_prevs: + # We only backfill backwards to the min depth. + min_depth = await self.get_min_depth_for_context(pdu.room_id) + logger.debug("min_depth: %d", min_depth) - if min_depth is not None and pdu.depth > min_depth: - # If we're missing stuff, ensure we only fetch stuff one - # at a time. + if min_depth is not None and pdu.depth > min_depth: + # If we're missing stuff, ensure we only fetch stuff one + # at a time. + logger.info( + "Acquiring room lock to fetch %d missing prev_events: %s", + len(missing_prevs), + shortstr(missing_prevs), + ) + with (await self._room_pdu_linearizer.queue(pdu.room_id)): logger.info( - "Acquiring room lock to fetch %d missing prev_events: %s", + "Acquired room lock to fetch %d missing prev_events", len(missing_prevs), - shortstr(missing_prevs), ) - with (await self._room_pdu_linearizer.queue(pdu.room_id)): - logger.info( - "Acquired room lock to fetch %d missing prev_events", - len(missing_prevs), + + try: + await self._get_missing_events_for_pdu( + origin, pdu, prevs, min_depth ) + except Exception as e: + raise Exception( + "Error fetching missing prev_events for %s: %s" + % (event_id, e) + ) from e - try: - await self._get_missing_events_for_pdu( - origin, pdu, prevs, min_depth - ) - except Exception as e: - raise Exception( - "Error fetching missing prev_events for %s: %s" - % (event_id, e) - ) from e - - # Update the set of things we've seen after trying to - # fetch the missing stuff - seen = await self.store.have_events_in_timeline(prevs) - missing_prevs = prevs - seen - - if not missing_prevs: - logger.info("Found all missing prev_events") - - if missing_prevs: - # since this event was pushed to us, it is possible for it to - # become the only forward-extremity in the room, and we would then - # trust its state to be the state for the whole room. This is very - # bad. Further, if the event was pushed to us, there is no excuse - # for us not to have all the prev_events. (XXX: apart from - # min_depth?) - # - # We therefore reject any such events. - logger.warning( - "Rejecting: failed to fetch %d prev events: %s", - len(missing_prevs), - shortstr(missing_prevs), - ) - raise FederationError( - "ERROR", - 403, - ( - "Your server isn't divulging details about prev_events " - "referenced in this event." - ), - affected=pdu.event_id, - ) + # Update the set of things we've seen after trying to + # fetch the missing stuff + seen = await self._store.have_events_in_timeline(prevs) + missing_prevs = prevs - seen + + if not missing_prevs: + logger.info("Found all missing prev_events") + + if missing_prevs: + # since this event was pushed to us, it is possible for it to + # become the only forward-extremity in the room, and we would then + # trust its state to be the state for the whole room. This is very + # bad. Further, if the event was pushed to us, there is no excuse + # for us not to have all the prev_events. (XXX: apart from + # min_depth?) + # + # We therefore reject any such events. + logger.warning( + "Rejecting: failed to fetch %d prev events: %s", + len(missing_prevs), + shortstr(missing_prevs), + ) + raise FederationError( + "ERROR", + 403, + ( + "Your server isn't divulging details about prev_events " + "referenced in this event." + ), + affected=pdu.event_id, + ) await self._process_received_pdu(origin, pdu, state=None) @@ -361,7 +360,7 @@ class FederationEventHandler(BaseHandler): # the room, so we send it on their behalf. event.internal_metadata.send_on_behalf_of = origin - context = await self.state_handler.compute_event_context(event) + context = await self._state_handler.compute_event_context(event) context = await self._check_event_auth(origin, event, context) if context.rejected: raise SynapseError( @@ -375,7 +374,7 @@ class FederationEventHandler(BaseHandler): # for knock events, we run the third-party event rules. It's not entirely clear # why we don't do this for other sorts of membership events. if event.membership == Membership.KNOCK: - event_allowed, _ = await self.third_party_event_rules.check_event_allowed( + event_allowed, _ = await self._third_party_event_rules.check_event_allowed( event, context ) if not event_allowed: @@ -404,7 +403,7 @@ class FederationEventHandler(BaseHandler): prev_member_event_id = prev_state_ids.get((EventTypes.Member, user_id), None) prev_member_event = None if prev_member_event_id: - prev_member_event = await self.store.get_event(prev_member_event_id) + prev_member_event = await self._store.get_event(prev_member_event_id) # Check if the member should be allowed access via membership in a space. await self._event_auth_handler.check_restricted_join_rules( @@ -434,10 +433,10 @@ class FederationEventHandler(BaseHandler): server from invalid events (there is probably no point in trying to re-fetch invalid events from every other HS in the room.) """ - if dest == self.server_name: + if dest == self._server_name: raise SynapseError(400, "Can't backfill from self.") - events = await self.federation_client.backfill( + events = await self._federation_client.backfill( dest, room_id, limit=limit, extremities=extremities ) @@ -469,12 +468,12 @@ class FederationEventHandler(BaseHandler): room_id = pdu.room_id event_id = pdu.event_id - seen = await self.store.have_events_in_timeline(prevs) + seen = await self._store.have_events_in_timeline(prevs) if not prevs - seen: return - latest_list = await self.store.get_latest_event_ids_in_room(room_id) + latest_list = await self._store.get_latest_event_ids_in_room(room_id) # We add the prev events that we have seen to the latest # list to ensure the remote server doesn't give them to us @@ -536,7 +535,7 @@ class FederationEventHandler(BaseHandler): # All that said: Let's try increasing the timeout to 60s and see what happens. try: - missing_events = await self.federation_client.get_missing_events( + missing_events = await self._federation_client.get_missing_events( origin, room_id, earliest_events_ids=list(latest), @@ -609,7 +608,7 @@ class FederationEventHandler(BaseHandler): event_id = event.event_id - existing = await self.store.get_event( + existing = await self._store.get_event( event_id, allow_none=True, allow_rejected=True ) if existing: @@ -674,7 +673,7 @@ class FederationEventHandler(BaseHandler): event_id = event.event_id prevs = set(event.prev_event_ids()) - seen = await self.store.have_events_in_timeline(prevs) + seen = await self._store.have_events_in_timeline(prevs) missing_prevs = prevs - seen if not missing_prevs: @@ -691,7 +690,7 @@ class FederationEventHandler(BaseHandler): event_map = {event_id: event} try: # Get the state of the events we know about - ours = await self.state_store.get_state_groups_ids(room_id, seen) + ours = await self._state_store.get_state_groups_ids(room_id, seen) # state_maps is a list of mappings from (type, state_key) to event_id state_maps: List[StateMap[str]] = list(ours.values()) @@ -720,13 +719,13 @@ class FederationEventHandler(BaseHandler): for x in remote_state: event_map[x.event_id] = x - room_version = await self.store.get_room_version_id(room_id) + room_version = await self._store.get_room_version_id(room_id) state_map = await self._state_resolution_handler.resolve_events_with_store( room_id, room_version, state_maps, event_map, - state_res_store=StateResolutionStore(self.store), + state_res_store=StateResolutionStore(self._store), ) # We need to give _process_received_pdu the actual state events @@ -734,7 +733,7 @@ class FederationEventHandler(BaseHandler): # First though we need to fetch all the events that are in # state_map, so we can build up the state below. - evs = await self.store.get_events( + evs = await self._store.get_events( list(state_map.values()), get_prev_content=False, redact_behaviour=EventRedactBehaviour.AS_IS, @@ -774,7 +773,7 @@ class FederationEventHandler(BaseHandler): ( state_event_ids, auth_event_ids, - ) = await self.federation_client.get_room_state_ids( + ) = await self._federation_client.get_room_state_ids( destination, room_id, event_id=event_id ) @@ -788,7 +787,7 @@ class FederationEventHandler(BaseHandler): desired_events = set(state_event_ids) desired_events.add(event_id) logger.debug("Fetching %i events from cache/store", len(desired_events)) - fetched_events = await self.store.get_events( + fetched_events = await self._store.get_events( desired_events, allow_rejected=True ) @@ -809,20 +808,20 @@ class FederationEventHandler(BaseHandler): missing_auth_events = set(auth_event_ids) - fetched_events.keys() missing_auth_events.difference_update( - await self.store.have_seen_events(room_id, missing_auth_events) + await self._store.have_seen_events(room_id, missing_auth_events) ) logger.debug("We are also missing %i auth events", len(missing_auth_events)) missing_events = missing_desired_events | missing_auth_events logger.debug("Fetching %i events from remote", len(missing_events)) await self._get_events_and_persist( - destination=destination, room_id=room_id, events=missing_events + destination=destination, room_id=room_id, event_ids=missing_events ) # we need to make sure we re-load from the database to get the rejected # state correct. fetched_events.update( - await self.store.get_events(missing_desired_events, allow_rejected=True) + await self._store.get_events(missing_desired_events, allow_rejected=True) ) # check for events which were in the wrong room. @@ -883,8 +882,13 @@ class FederationEventHandler(BaseHandler): state: Optional[Iterable[EventBase]], backfilled: bool = False, ) -> None: - """Called when we have a new pdu. We need to do auth checks and put it - through the StateHandler. + """Called when we have a new non-outlier event. + + This is called when we have a new event to add to the room DAG - either directly + via a /send request, retrieved via get_missing_events after a /send request, or + backfilled after a client request. + + We need to do auth checks and put it through the StateHandler. Args: origin: server sending the event @@ -899,17 +903,24 @@ class FederationEventHandler(BaseHandler): notification to clients, and validation of device keys.) """ logger.debug("Processing event: %s", event) + assert not event.internal_metadata.outlier try: - context = await self.state_handler.compute_event_context( + context = await self._state_handler.compute_event_context( event, old_state=state ) - await self._auth_and_persist_event( - origin, event, context, state=state, backfilled=backfilled + context = await self._check_event_auth( + origin, + event, + context, + state=state, + backfilled=backfilled, ) except AuthError as e: raise FederationError("ERROR", e.code, e.msg, affected=event.event_id) + await self._run_push_actions_and_persist_event(event, context, backfilled) + if backfilled: return @@ -919,7 +930,7 @@ class FederationEventHandler(BaseHandler): device_id = event.content.get("device_id") sender_key = event.content.get("sender_key") - cached_devices = await self.store.get_cached_devices_for_user(event.sender) + cached_devices = await self._store.get_cached_devices_for_user(event.sender) resync = False # Whether we should resync device lists. @@ -995,10 +1006,10 @@ class FederationEventHandler(BaseHandler): """ try: - await self.store.mark_remote_user_device_cache_as_stale(sender) + await self._store.mark_remote_user_device_cache_as_stale(sender) # Immediately attempt a resync in the background - if self.config.worker_app: + if self._config.worker.worker_app: await self._user_device_resync(user_id=sender) else: await self._device_list_updater.user_device_resync(sender) @@ -1023,9 +1034,15 @@ class FederationEventHandler(BaseHandler): return # Skip processing a marker event if the room version doesn't - # support it. - room_version = await self.store.get_room_version(marker_event.room_id) - if not room_version.msc2716_historical: + # support it or the event is not from the room creator. + room_version = await self._store.get_room_version(marker_event.room_id) + create_event = await self._store.get_create_event_for_room(marker_event.room_id) + room_creator = create_event.content.get(EventContentFields.ROOM_CREATOR) + if ( + not room_version.msc2716_historical + or not self._config.experimental.msc2716_enabled + or marker_event.sender != room_creator + ): return logger.debug("_handle_marker_event: received %s", marker_event) @@ -1048,7 +1065,7 @@ class FederationEventHandler(BaseHandler): [insertion_event_id], ) - insertion_event = await self.store.get_event( + insertion_event = await self._store.get_event( insertion_event_id, allow_none=True ) if insertion_event is None: @@ -1066,7 +1083,7 @@ class FederationEventHandler(BaseHandler): marker_event, ) - await self.store.insert_insertion_extremity( + await self._store.insert_insertion_extremity( insertion_event_id, marker_event.room_id ) @@ -1077,25 +1094,25 @@ class FederationEventHandler(BaseHandler): ) async def _get_events_and_persist( - self, destination: str, room_id: str, events: Iterable[str] + self, destination: str, room_id: str, event_ids: Collection[str] ) -> None: """Fetch the given events from a server, and persist them as outliers. This function *does not* recursively get missing auth events of the - newly fetched events. Callers must include in the `events` argument + newly fetched events. Callers must include in the `event_ids` argument any missing events from the auth chain. Logs a warning if we can't find the given event. """ - room_version = await self.store.get_room_version(room_id) + room_version = await self._store.get_room_version(room_id) event_map: Dict[str, EventBase] = {} async def get_event(event_id: str): with nested_logging_context(event_id): try: - event = await self.federation_client.get_pdu( + event = await self._federation_client.get_pdu( [destination], event_id, room_version, @@ -1119,28 +1136,78 @@ class FederationEventHandler(BaseHandler): e, ) - await concurrently_execute(get_event, events, 5) + await concurrently_execute(get_event, event_ids, 5) + logger.info("Fetched %i events of %i requested", len(event_map), len(event_ids)) - # Make a map of auth events for each event. We do this after fetching - # all the events as some of the events' auth events will be in the list - # of requested events. + # we now need to auth the events in an order which ensures that each event's + # auth_events are authed before the event itself. + # + # XXX: it might be possible to kick this process off in parallel with fetching + # the events. + while event_map: + # build a list of events whose auth events are not in the queue. + roots = tuple( + ev + for ev in event_map.values() + if not any(aid in event_map for aid in ev.auth_event_ids()) + ) - auth_events = [ - aid - for event in event_map.values() - for aid in event.auth_event_ids() - if aid not in event_map - ] - persisted_events = await self.store.get_events( + if not roots: + # if *none* of the remaining events are ready, that means + # we have a loop. This either means a bug in our logic, or that + # somebody has managed to create a loop (which requires finding a + # hash collision in room v2 and later). + logger.warning( + "Loop found in auth events while fetching missing state/auth " + "events: %s", + shortstr(event_map.keys()), + ) + return + + logger.info( + "Persisting %i of %i remaining events", len(roots), len(event_map) + ) + + await self._auth_and_persist_fetched_events(destination, room_id, roots) + + for ev in roots: + del event_map[ev.event_id] + + async def _auth_and_persist_fetched_events( + self, origin: str, room_id: str, fetched_events: Collection[EventBase] + ) -> None: + """Persist the events fetched by _get_events_and_persist. + + The events should not depend on one another, e.g. this should be used to persist + a bunch of outliers, but not a chunk of individual events that depend + on each other for state calculations. + + We also assume that all of the auth events for all of the events have already + been persisted. + + Notifies about the events where appropriate. + + Params: + origin: where the events came from + room_id: the room that the events are meant to be in (though this has + not yet been checked) + event_id: map from event_id -> event for the fetched events + """ + # get all the auth events for all the events in this batch. By now, they should + # have been persisted. + auth_events = { + aid for event in fetched_events for aid in event.auth_event_ids() + } + persisted_events = await self._store.get_events( auth_events, allow_rejected=True, ) event_infos = [] - for event in event_map.values(): + for event in fetched_events: auth = {} for auth_event_id in event.auth_event_ids(): - ae = persisted_events.get(auth_event_id) or event_map.get(auth_event_id) + ae = persisted_events.get(auth_event_id) if ae: auth[(ae.type, ae.state_key)] = ae else: @@ -1148,34 +1215,13 @@ class FederationEventHandler(BaseHandler): event_infos.append(_NewEventInfo(event, auth)) - if event_infos: - await self._auth_and_persist_events( - destination, - room_id, - event_infos, - ) - - async def _auth_and_persist_events( - self, - origin: str, - room_id: str, - event_infos: Collection[_NewEventInfo], - ) -> None: - """Creates the appropriate contexts and persists events. The events - should not depend on one another, e.g. this should be used to persist - a bunch of outliers, but not a chunk of individual events that depend - on each other for state calculations. - - Notifies about the events where appropriate. - """ - if not event_infos: return async def prep(ev_info: _NewEventInfo): event = ev_info.event with nested_logging_context(suffix=event.event_id): - res = await self.state_handler.compute_event_context(event) + res = await self._state_handler.compute_event_context(event) res = await self._check_event_auth( origin, event, @@ -1199,49 +1245,6 @@ class FederationEventHandler(BaseHandler): ], ) - async def _auth_and_persist_event( - self, - origin: str, - event: EventBase, - context: EventContext, - state: Optional[Iterable[EventBase]] = None, - claimed_auth_event_map: Optional[StateMap[EventBase]] = None, - backfilled: bool = False, - ) -> None: - """ - Process an event by performing auth checks and then persisting to the database. - - Args: - origin: The host the event originates from. - event: The event itself. - context: - The event context. - - state: - The state events used to check the event for soft-fail. If this is - not provided the current state events will be used. - - claimed_auth_event_map: - A map of (type, state_key) => event for the event's claimed auth_events. - Possibly incomplete, and possibly including events that are not yet - persisted, or authed, or in the right room. - - Only populated where we may not already have persisted these events - - for example, when populating outliers. - - backfilled: True if the event was backfilled. - """ - context = await self._check_event_auth( - origin, - event, - context, - state=state, - claimed_auth_event_map=claimed_auth_event_map, - backfilled=backfilled, - ) - - await self._run_push_actions_and_persist_event(event, context, backfilled) - async def _check_event_auth( self, origin: str, @@ -1269,16 +1272,17 @@ class FederationEventHandler(BaseHandler): Possibly incomplete, and possibly including events that are not yet persisted, or authed, or in the right room. - Only populated where we may not already have persisted these events - - for example, when populating outliers, or the state for a backwards - extremity. + Only populated when populating outliers. backfilled: True if the event was backfilled. Returns: The updated context object. """ - room_version = await self.store.get_room_version_id(event.room_id) + # claimed_auth_event_map should be given iff the event is an outlier + assert bool(claimed_auth_event_map) == event.internal_metadata.outlier + + room_version = await self._store.get_room_version_id(event.room_id) room_version_obj = KNOWN_ROOM_VERSIONS[room_version] if claimed_auth_event_map: @@ -1291,7 +1295,7 @@ class FederationEventHandler(BaseHandler): auth_events_ids = self._event_auth_handler.compute_auth_events( event, prev_state_ids, for_verification=True ) - auth_events_x = await self.store.get_events(auth_events_ids) + auth_events_x = await self._store.get_events(auth_events_ids) auth_events = {(e.type, e.state_key): e for e in auth_events_x.values()} try: @@ -1321,19 +1325,29 @@ class FederationEventHandler(BaseHandler): if not context.rejected: await self._check_for_soft_fail(event, state, backfilled, origin=origin) - - if event.type == EventTypes.GuestAccess and not context.rejected: - await self.maybe_kick_guest_users(event) + await self._maybe_kick_guest_users(event) # If we are going to send this event over federation we precaclculate # the joined hosts. if event.internal_metadata.get_send_on_behalf_of(): - await self.event_creation_handler.cache_joined_hosts_for_event( + await self._event_creation_handler.cache_joined_hosts_for_event( event, context ) return context + async def _maybe_kick_guest_users(self, event: EventBase) -> None: + if event.type != EventTypes.GuestAccess: + return + + guest_access = event.content.get(EventContentFields.GUEST_ACCESS) + if guest_access == GuestAccess.CAN_JOIN: + return + + current_state_map = await self._state_handler.get_current_state(event.room_id) + current_state = list(current_state_map.values()) + await self._get_room_member_handler().kick_guest_users(current_state) + async def _check_for_soft_fail( self, event: EventBase, @@ -1356,7 +1370,7 @@ class FederationEventHandler(BaseHandler): if backfilled or event.internal_metadata.is_outlier(): return - extrem_ids_list = await self.store.get_latest_event_ids_in_room(event.room_id) + extrem_ids_list = await self._store.get_latest_event_ids_in_room(event.room_id) extrem_ids = set(extrem_ids_list) prev_event_ids = set(event.prev_event_ids()) @@ -1365,7 +1379,7 @@ class FederationEventHandler(BaseHandler): # state at the event, so no point rechecking auth for soft fail. return - room_version = await self.store.get_room_version_id(event.room_id) + room_version = await self._store.get_room_version_id(event.room_id) room_version_obj = KNOWN_ROOM_VERSIONS[room_version] # Calculate the "current state". @@ -1382,19 +1396,19 @@ class FederationEventHandler(BaseHandler): # given state at the event. This should correctly handle cases # like bans, especially with state res v2. - state_sets_d = await self.state_store.get_state_groups( + state_sets_d = await self._state_store.get_state_groups( event.room_id, extrem_ids ) state_sets: List[Iterable[EventBase]] = list(state_sets_d.values()) state_sets.append(state) - current_states = await self.state_handler.resolve_events( + current_states = await self._state_handler.resolve_events( room_version, state_sets, event ) current_state_ids: StateMap[str] = { k: e.event_id for k, e in current_states.items() } else: - current_state_ids = await self.state_handler.get_current_state_ids( + current_state_ids = await self._state_handler.get_current_state_ids( event.room_id, latest_event_ids=extrem_ids ) @@ -1410,7 +1424,7 @@ class FederationEventHandler(BaseHandler): e for k, e in current_state_ids.items() if k in auth_types ] - auth_events_map = await self.store.get_events(current_state_ids_list) + auth_events_map = await self._store.get_events(current_state_ids_list) current_auth_events = { (e.type, e.state_key): e for e in auth_events_map.values() } @@ -1481,7 +1495,9 @@ class FederationEventHandler(BaseHandler): # # we start by checking if they are in the store, and then try calling /event_auth/. if missing_auth: - have_events = await self.store.have_seen_events(event.room_id, missing_auth) + have_events = await self._store.have_seen_events( + event.room_id, missing_auth + ) logger.debug("Events %s are in the store", have_events) missing_auth.difference_update(have_events) @@ -1490,7 +1506,7 @@ class FederationEventHandler(BaseHandler): logger.info("auth_events contains unknown events: %s", missing_auth) try: try: - remote_auth_chain = await self.federation_client.get_event_auth( + remote_auth_chain = await self._federation_client.get_event_auth( origin, event.room_id, event.event_id ) except RequestSendFailed as e1: @@ -1499,43 +1515,49 @@ class FederationEventHandler(BaseHandler): logger.info("Failed to get event auth from remote: %s", e1) return context, auth_events - seen_remotes = await self.store.have_seen_events( + seen_remotes = await self._store.have_seen_events( event.room_id, [e.event_id for e in remote_auth_chain] ) - for e in remote_auth_chain: - if e.event_id in seen_remotes: + for auth_event in remote_auth_chain: + if auth_event.event_id in seen_remotes: continue - if e.event_id == event.event_id: + if auth_event.event_id == event.event_id: continue try: - auth_ids = e.auth_event_ids() + auth_ids = auth_event.auth_event_ids() auth = { (e.type, e.state_key): e for e in remote_auth_chain if e.event_id in auth_ids or e.type == EventTypes.Create } - e.internal_metadata.outlier = True + auth_event.internal_metadata.outlier = True logger.debug( "_check_event_auth %s missing_auth: %s", event.event_id, - e.event_id, + auth_event.event_id, ) missing_auth_event_context = ( - await self.state_handler.compute_event_context(e) + await self._state_handler.compute_event_context(auth_event) ) - await self._auth_and_persist_event( + + missing_auth_event_context = await self._check_event_auth( origin, - e, + auth_event, missing_auth_event_context, claimed_auth_event_map=auth, ) + await self.persist_events_and_notify( + event.room_id, [(auth_event, missing_auth_event_context)] + ) - if e.event_id in event_auth_events: - auth_events[(e.type, e.state_key)] = e + if auth_event.event_id in event_auth_events: + auth_events[ + (auth_event.type, auth_event.state_key) + ] = auth_event except AuthError: pass @@ -1566,7 +1588,7 @@ class FederationEventHandler(BaseHandler): # XXX: currently this checks for redactions but I'm not convinced that is # necessary? - different_events = await self.store.get_events_as_list(different_auth) + different_events = await self._store.get_events_as_list(different_auth) for d in different_events: if d.room_id != event.room_id: @@ -1592,8 +1614,8 @@ class FederationEventHandler(BaseHandler): remote_auth_events.update({(d.type, d.state_key): d for d in different_events}) remote_state = remote_auth_events.values() - room_version = await self.store.get_room_version_id(event.room_id) - new_state = await self.state_handler.resolve_events( + room_version = await self._store.get_room_version_id(event.room_id) + new_state = await self._state_handler.resolve_events( room_version, (local_state, remote_state), event ) @@ -1651,7 +1673,7 @@ class FederationEventHandler(BaseHandler): # create a new state group as a delta from the existing one. prev_group = context.state_group - state_group = await self.state_store.store_state_group( + state_group = await self._state_store.store_state_group( event.event_id, event.room_id, prev_group=prev_group, @@ -1678,14 +1700,17 @@ class FederationEventHandler(BaseHandler): context: The event context. backfilled: True if the event was backfilled. """ + # this method should not be called on outliers (those code paths call + # persist_events_and_notify directly.) + assert not event.internal_metadata.outlier + try: if ( - not event.internal_metadata.is_outlier() - and not backfilled + not backfilled and not context.rejected - and (await self.store.get_min_depth(event.room_id)) <= event.depth + and (await self._store.get_min_depth(event.room_id)) <= event.depth ): - await self.action_generator.handle_push_actions_for_event( + await self._action_generator.handle_push_actions_for_event( event, context ) @@ -1694,7 +1719,7 @@ class FederationEventHandler(BaseHandler): ) except Exception: run_in_background( - self.store.remove_push_actions_from_staging, event.event_id + self._store.remove_push_actions_from_staging, event.event_id ) raise @@ -1719,27 +1744,27 @@ class FederationEventHandler(BaseHandler): The stream ID after which all events have been persisted. """ if not event_and_contexts: - return self.store.get_current_events_token() + return self._store.get_current_events_token() - instance = self.config.worker.events_shard_config.get_instance(room_id) + instance = self._config.worker.events_shard_config.get_instance(room_id) if instance != self._instance_name: # Limit the number of events sent over replication. We choose 200 # here as that is what we default to in `max_request_body_size(..)` for batch in batch_iter(event_and_contexts, 200): result = await self._send_events( instance_name=instance, - store=self.store, + store=self._store, room_id=room_id, event_and_contexts=batch, backfilled=backfilled, ) return result["max_stream_id"] else: - assert self.storage.persistence + assert self._storage.persistence # Note that this returns the events that were persisted, which may not be # the same as were passed in if some were deduplicated due to transaction IDs. - events, max_stream_token = await self.storage.persistence.persist_events( + events, max_stream_token = await self._storage.persistence.persist_events( event_and_contexts, backfilled=backfilled ) @@ -1773,7 +1798,7 @@ class FederationEventHandler(BaseHandler): # users if event.internal_metadata.is_outlier(): if event.membership != Membership.INVITE: - if not self.is_mine_id(target_user_id): + if not self._is_mine_id(target_user_id): return target_user = UserID.from_string(target_user_id) @@ -1787,7 +1812,7 @@ class FederationEventHandler(BaseHandler): event_pos = PersistedEventPosition( self._instance_name, event.internal_metadata.stream_ordering ) - self.notifier.on_new_room_event( + self._notifier.on_new_room_event( event, event_pos, max_stream_token, extra_users=extra_users ) @@ -1822,4 +1847,4 @@ class FederationEventHandler(BaseHandler): raise SynapseError(HTTPStatus.BAD_REQUEST, "Too many auth_events") async def get_min_depth_for_context(self, context: str) -> int: - return await self.store.get_min_depth(context) + return await self._store.get_min_depth(context) diff --git a/synapse/handlers/identity.py b/synapse/handlers/identity.py index 8ffeabac..8b8f1f41 100644 --- a/synapse/handlers/identity.py +++ b/synapse/handlers/identity.py @@ -540,13 +540,13 @@ class IdentityHandler(BaseHandler): # It is already checked that public_baseurl is configured since this code # should only be used if account_threepid_delegate_msisdn is true. - assert self.hs.config.public_baseurl + assert self.hs.config.server.public_baseurl # we need to tell the client to send the token back to us, since it doesn't # otherwise know where to send it, so add submit_url response parameter # (see also MSC2078) data["submit_url"] = ( - self.hs.config.public_baseurl + self.hs.config.server.public_baseurl + "_matrix/client/unstable/add_threepid/msisdn/submit_token" ) return data diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 101a29c6..10f1584a 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -27,6 +27,7 @@ from synapse import event_auth from synapse.api.constants import ( EventContentFields, EventTypes, + GuestAccess, Membership, RelationTypes, UserTypes, @@ -83,7 +84,7 @@ class MessageHandler: # scheduled. self._scheduled_expiry: Optional[IDelayedCall] = None - if not hs.config.worker_app: + if not hs.config.worker.worker_app: run_as_background_process( "_schedule_next_expiry", self._schedule_next_expiry ) @@ -426,7 +427,7 @@ class EventCreationHandler: self.send_event = ReplicationSendEventRestServlet.make_client(hs) - # This is only used to get at ratelimit function, and maybe_kick_guest_users + # This is only used to get at ratelimit function self.base_handler = BaseHandler(hs) # We arbitrarily limit concurrent event creation for a room to 5. @@ -460,7 +461,7 @@ class EventCreationHandler: self._dummy_events_threshold = hs.config.dummy_events_threshold if ( - self.config.run_background_tasks + self.config.worker.run_background_tasks and self.config.cleanup_extremities_with_dummy_events ): self.clock.looping_call( @@ -1306,7 +1307,7 @@ class EventCreationHandler: requester, is_admin_redaction=is_admin_redaction ) - await self.base_handler.maybe_kick_guest_users(event, context) + await self._maybe_kick_guest_users(event, context) if event.type == EventTypes.CanonicalAlias: # Validate a newly added alias or newly added alt_aliases. @@ -1393,6 +1394,9 @@ class EventCreationHandler: allow_none=True, ) + room_version = await self.store.get_room_version_id(event.room_id) + room_version_obj = KNOWN_ROOM_VERSIONS[room_version] + # we can make some additional checks now if we have the original event. if original_event: if original_event.type == EventTypes.Create: @@ -1404,6 +1408,28 @@ class EventCreationHandler: if original_event.type == EventTypes.ServerACL: raise AuthError(403, "Redacting server ACL events is not permitted") + # Add a little safety stop-gap to prevent people from trying to + # redact MSC2716 related events when they're in a room version + # which does not support it yet. We allow people to use MSC2716 + # events in existing room versions but only from the room + # creator since it does not require any changes to the auth + # rules and in effect, the redaction algorithm . In the + # supported room version, we add the `historical` power level to + # auth the MSC2716 related events and adjust the redaction + # algorthim to keep the `historical` field around (redacting an + # event should only strip fields which don't affect the + # structural protocol level). + is_msc2716_event = ( + original_event.type == EventTypes.MSC2716_INSERTION + or original_event.type == EventTypes.MSC2716_CHUNK + or original_event.type == EventTypes.MSC2716_MARKER + ) + if not room_version_obj.msc2716_historical and is_msc2716_event: + raise AuthError( + 403, + "Redacting MSC2716 events is not supported in this room version", + ) + prev_state_ids = await context.get_prev_state_ids() auth_events_ids = self._event_auth_handler.compute_auth_events( event, prev_state_ids, for_verification=True @@ -1411,9 +1437,6 @@ class EventCreationHandler: auth_events_map = await self.store.get_events(auth_events_ids) auth_events = {(e.type, e.state_key): e for e in auth_events_map.values()} - room_version = await self.store.get_room_version_id(event.room_id) - room_version_obj = KNOWN_ROOM_VERSIONS[room_version] - if event_auth.check_redaction( room_version_obj, event, auth_events=auth_events ): @@ -1471,6 +1494,28 @@ class EventCreationHandler: return event + async def _maybe_kick_guest_users( + self, event: EventBase, context: EventContext + ) -> None: + if event.type != EventTypes.GuestAccess: + return + + guest_access = event.content.get(EventContentFields.GUEST_ACCESS) + if guest_access == GuestAccess.CAN_JOIN: + return + + current_state_ids = await context.get_current_state_ids() + + # since this is a client-generated event, it cannot be an outlier and we must + # therefore have the state ids. + assert current_state_ids is not None + current_state_dict = await self.store.get_events( + list(current_state_ids.values()) + ) + current_state = list(current_state_dict.values()) + logger.info("maybe_kick_guest_users %r", current_state) + await self.hs.get_room_member_handler().kick_guest_users(current_state) + async def _bump_active_time(self, user: UserID) -> None: try: presence = self.hs.get_presence_handler() diff --git a/synapse/handlers/oidc.py b/synapse/handlers/oidc.py index eca8f160..dfc251b2 100644 --- a/synapse/handlers/oidc.py +++ b/synapse/handlers/oidc.py @@ -324,7 +324,7 @@ class OidcProvider: self._allow_existing_users = provider.allow_existing_users self._http_client = hs.get_proxied_http_client() - self._server_name: str = hs.config.server_name + self._server_name: str = hs.config.server.server_name # identifier for the external_ids table self.idp_id = provider.idp_id @@ -338,9 +338,6 @@ class OidcProvider: # optional brand identifier for this auth provider self.idp_brand = provider.idp_brand - # Optional brand identifier for the unstable API (see MSC2858). - self.unstable_idp_brand = provider.unstable_idp_brand - self._sso_handler = hs.get_sso_handler() self._sso_handler.register_identity_provider(self) diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py index 1dbafd25..7dc0ee4b 100644 --- a/synapse/handlers/pagination.py +++ b/synapse/handlers/pagination.py @@ -91,7 +91,7 @@ class PaginationHandler: self._retention_allowed_lifetime_min = hs.config.retention_allowed_lifetime_min self._retention_allowed_lifetime_max = hs.config.retention_allowed_lifetime_max - if hs.config.run_background_tasks and hs.config.retention_enabled: + if hs.config.worker.run_background_tasks and hs.config.retention_enabled: # Run the purge jobs described in the configuration file. for job in hs.config.retention_purge_jobs: logger.info("Setting up purge job with config: %s", job) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 4418d63d..39b39cd3 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -28,6 +28,7 @@ from bisect import bisect from contextlib import contextmanager from typing import ( TYPE_CHECKING, + Any, Callable, Collection, Dict, @@ -615,7 +616,7 @@ class PresenceHandler(BasePresenceHandler): super().__init__(hs) self.hs = hs self.server_name = hs.hostname - self.wheel_timer = WheelTimer() + self.wheel_timer: WheelTimer[str] = WheelTimer() self.notifier = hs.get_notifier() self._presence_enabled = hs.config.use_presence @@ -924,7 +925,7 @@ class PresenceHandler(BasePresenceHandler): prev_state = await self.current_state_for_user(user_id) - new_fields = {"last_active_ts": self.clock.time_msec()} + new_fields: Dict[str, Any] = {"last_active_ts": self.clock.time_msec()} if prev_state.state == PresenceState.UNAVAILABLE: new_fields["state"] = PresenceState.ONLINE diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py index 20a033d0..51adf876 100644 --- a/synapse/handlers/profile.py +++ b/synapse/handlers/profile.py @@ -63,7 +63,7 @@ class ProfileHandler(BaseHandler): self.user_directory_handler = hs.get_user_directory_handler() - if hs.config.run_background_tasks: + if hs.config.worker.run_background_tasks: self.clock.looping_call( self._update_remote_profile_cache, self.PROFILE_UPDATE_MS ) diff --git a/synapse/handlers/read_marker.py b/synapse/handlers/read_marker.py index c679a830..bd8160e7 100644 --- a/synapse/handlers/read_marker.py +++ b/synapse/handlers/read_marker.py @@ -28,7 +28,7 @@ logger = logging.getLogger(__name__) class ReadMarkerHandler(BaseHandler): def __init__(self, hs: "HomeServer"): super().__init__(hs) - self.server_name = hs.config.server_name + self.server_name = hs.config.server.server_name self.store = hs.get_datastore() self.account_data_handler = hs.get_account_data_handler() self.read_marker_linearizer = Linearizer(name="read_marker") diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py index fb495229..a49b8ee4 100644 --- a/synapse/handlers/receipts.py +++ b/synapse/handlers/receipts.py @@ -29,7 +29,7 @@ class ReceiptsHandler(BaseHandler): def __init__(self, hs: "HomeServer"): super().__init__(hs) - self.server_name = hs.config.server_name + self.server_name = hs.config.server.server_name self.store = hs.get_datastore() self.event_auth_handler = hs.get_event_auth_handler() diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py index 0ed59d75..38c4993d 100644 --- a/synapse/handlers/register.py +++ b/synapse/handlers/register.py @@ -21,7 +21,13 @@ from prometheus_client import Counter from typing_extensions import TypedDict from synapse import types -from synapse.api.constants import MAX_USERID_LENGTH, EventTypes, JoinRules, LoginType +from synapse.api.constants import ( + MAX_USERID_LENGTH, + EventContentFields, + EventTypes, + JoinRules, + LoginType, +) from synapse.api.errors import AuthError, Codes, ConsentNotGivenError, SynapseError from synapse.appservice import ApplicationService from synapse.config.server import is_threepid_reserved @@ -96,7 +102,7 @@ class RegistrationHandler(BaseHandler): self.spam_checker = hs.get_spam_checker() - if hs.config.worker_app: + if hs.config.worker.worker_app: self._register_client = ReplicationRegisterServlet.make_client(hs) self._register_device_client = RegisterDeviceReplicationServlet.make_client( hs @@ -405,7 +411,7 @@ class RegistrationHandler(BaseHandler): # Choose whether to federate the new room. if not self.hs.config.registration.autocreate_auto_join_rooms_federated: - stub_config["creation_content"] = {"m.federate": False} + stub_config["creation_content"] = {EventContentFields.FEDERATE: False} for r in self.hs.config.registration.auto_join_rooms: logger.info("Auto-joining %s to %s", user_id, r) @@ -690,7 +696,7 @@ class RegistrationHandler(BaseHandler): address: the IP address used to perform the registration. shadow_banned: Whether to shadow-ban the user """ - if self.hs.config.worker_app: + if self.hs.config.worker.worker_app: await self._register_client( user_id=user_id, password_hash=password_hash, @@ -780,7 +786,7 @@ class RegistrationHandler(BaseHandler): Does the bits that need doing on the main process. Not for use outside this class and RegisterDeviceReplicationServlet. """ - assert not self.hs.config.worker_app + assert not self.hs.config.worker.worker_app valid_until_ms = None if self.session_lifetime is not None: if is_guest: @@ -837,7 +843,7 @@ class RegistrationHandler(BaseHandler): """ # TODO: 3pid registration can actually happen on the workers. Consider # refactoring it. - if self.hs.config.worker_app: + if self.hs.config.worker.worker_app: await self._post_registration_client( user_id=user_id, auth_result=auth_result, access_token=access_token ) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index b33fe09f..9345ae02 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -25,12 +25,15 @@ from collections import OrderedDict from typing import TYPE_CHECKING, Any, Awaitable, Dict, List, Optional, Tuple from synapse.api.constants import ( + EventContentFields, EventTypes, + GuestAccess, HistoryVisibility, JoinRules, Membership, RoomCreationPreset, RoomEncryptionAlgorithms, + RoomTypes, ) from synapse.api.errors import ( AuthError, @@ -388,14 +391,14 @@ class RoomCreationHandler(BaseHandler): old_room_create_event = await self.store.get_create_event_for_room(old_room_id) # Check if the create event specified a non-federatable room - if not old_room_create_event.content.get("m.federate", True): + if not old_room_create_event.content.get(EventContentFields.FEDERATE, True): # If so, mark the new room as non-federatable as well - creation_content["m.federate"] = False + creation_content[EventContentFields.FEDERATE] = False initial_state = {} # Replicate relevant room events - types_to_copy = ( + types_to_copy: List[Tuple[str, Optional[str]]] = [ (EventTypes.JoinRules, ""), (EventTypes.Name, ""), (EventTypes.Topic, ""), @@ -406,7 +409,16 @@ class RoomCreationHandler(BaseHandler): (EventTypes.ServerACL, ""), (EventTypes.RelatedGroups, ""), (EventTypes.PowerLevels, ""), - ) + ] + + # If the old room was a space, copy over the room type and the rooms in + # the space. + if ( + old_room_create_event.content.get(EventContentFields.ROOM_TYPE) + == RoomTypes.SPACE + ): + creation_content[EventContentFields.ROOM_TYPE] = RoomTypes.SPACE + types_to_copy.append((EventTypes.SpaceChild, None)) old_room_state_ids = await self.store.get_filtered_current_state_ids( old_room_id, StateFilter.from_types(types_to_copy) @@ -417,6 +429,11 @@ class RoomCreationHandler(BaseHandler): for k, old_event_id in old_room_state_ids.items(): old_event = old_room_state_events.get(old_event_id) if old_event: + # If the event is an space child event with empty content, it was + # removed from the space and should be ignored. + if k[0] == EventTypes.SpaceChild and not old_event.content: + continue + initial_state[k] = old_event.content # deep-copy the power-levels event before we start modifying it @@ -909,7 +926,12 @@ class RoomCreationHandler(BaseHandler): ) return last_stream_id - config = self._presets_dict[preset_config] + try: + config = self._presets_dict[preset_config] + except KeyError: + raise SynapseError( + 400, f"'{preset_config}' is not a valid preset", errcode=Codes.BAD_JSON + ) creation_content.update({"creator": creator_id}) await send(etype=EventTypes.Create, content=creation_content) @@ -988,7 +1010,8 @@ class RoomCreationHandler(BaseHandler): if config["guest_can_join"]: if (EventTypes.GuestAccess, "") not in initial_state: last_sent_stream_id = await send( - etype=EventTypes.GuestAccess, content={"guest_access": "can_join"} + etype=EventTypes.GuestAccess, + content={EventContentFields.GUEST_ACCESS: GuestAccess.CAN_JOIN}, ) for (etype, state_key), content in initial_state.items(): diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py index 6d433fad..81680b8d 100644 --- a/synapse/handlers/room_list.py +++ b/synapse/handlers/room_list.py @@ -19,7 +19,13 @@ from typing import TYPE_CHECKING, Optional, Tuple import msgpack from unpaddedbase64 import decode_base64, encode_base64 -from synapse.api.constants import EventTypes, HistoryVisibility, JoinRules +from synapse.api.constants import ( + EventContentFields, + EventTypes, + GuestAccess, + HistoryVisibility, + JoinRules, +) from synapse.api.errors import ( Codes, HttpResponseException, @@ -307,7 +313,9 @@ class RoomListHandler(BaseHandler): # Return whether this room is open to federation users or not create_event = current_state[EventTypes.Create, ""] - result["m.federate"] = create_event.content.get("m.federate", True) + result["m.federate"] = create_event.content.get( + EventContentFields.FEDERATE, True + ) name_event = current_state.get((EventTypes.Name, "")) if name_event: @@ -336,8 +344,8 @@ class RoomListHandler(BaseHandler): guest_event = current_state.get((EventTypes.GuestAccess, "")) guest = None if guest_event: - guest = guest_event.content.get("guest_access", None) - result["guest_can_join"] = guest == "can_join" + guest = guest_event.content.get(EventContentFields.GUEST_ACCESS) + result["guest_can_join"] = guest == GuestAccess.CAN_JOIN avatar_event = current_state.get(("m.room.avatar", "")) if avatar_event: diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index 401b84aa..43902016 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -23,6 +23,7 @@ from synapse.api.constants import ( AccountDataTypes, EventContentFields, EventTypes, + GuestAccess, Membership, ) from synapse.api.errors import ( @@ -44,6 +45,7 @@ from synapse.types import ( RoomID, StateMap, UserID, + create_requester, get_domain_from_id, ) from synapse.util.async_helpers import Linearizer @@ -70,6 +72,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): self.auth = hs.get_auth() self.state_handler = hs.get_state_handler() self.config = hs.config + self._server_name = hs.hostname self.federation_handler = hs.get_federation_handler() self.directory_handler = hs.get_directory_handler() @@ -115,9 +118,8 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): burst_count=hs.config.ratelimiting.rc_invites_per_user.burst_count, ) - # This is only used to get at ratelimit function, and - # maybe_kick_guest_users. It's fine there are multiple of these as - # it doesn't store state. + # This is only used to get at the ratelimit function. It's fine there are + # multiple of these as it doesn't store state. self.base_handler = BaseHandler(hs) @abc.abstractmethod @@ -1095,10 +1097,62 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): return bool( guest_access and guest_access.content - and "guest_access" in guest_access.content - and guest_access.content["guest_access"] == "can_join" + and guest_access.content.get(EventContentFields.GUEST_ACCESS) + == GuestAccess.CAN_JOIN ) + async def kick_guest_users(self, current_state: Iterable[EventBase]) -> None: + """Kick any local guest users from the room. + + This is called when the room state changes from guests allowed to not-allowed. + + Params: + current_state: the current state of the room. We will iterate this to look + for guest users to kick. + """ + for member_event in current_state: + try: + if member_event.type != EventTypes.Member: + continue + + if not self.hs.is_mine_id(member_event.state_key): + continue + + if member_event.content["membership"] not in { + Membership.JOIN, + Membership.INVITE, + }: + continue + + if ( + "kind" not in member_event.content + or member_event.content["kind"] != "guest" + ): + continue + + # We make the user choose to leave, rather than have the + # event-sender kick them. This is partially because we don't + # need to worry about power levels, and partially because guest + # users are a concept which doesn't hugely work over federation, + # and having homeservers have their own users leave keeps more + # of that decision-making and control local to the guest-having + # homeserver. + target_user = UserID.from_string(member_event.state_key) + requester = create_requester( + target_user, is_guest=True, authenticated_entity=self._server_name + ) + handler = self.hs.get_room_member_handler() + await handler.update_membership( + requester, + target_user, + member_event.room_id, + "leave", + ratelimit=False, + require_consent=False, + ) + except Exception as e: + logger.exception("Error kicking guest user: %s" % (e,)) + async def lookup_room_alias( self, room_alias: RoomAlias ) -> Tuple[RoomID, List[str]]: @@ -1352,7 +1406,6 @@ class RoomMemberMasterHandler(RoomMemberHandler): self.distributor = hs.get_distributor() self.distributor.declare("user_left_room") - self._server_name = hs.hostname async def _is_remote_room_too_complex( self, room_id: str, remote_room_hosts: List[str] diff --git a/synapse/handlers/room_summary.py b/synapse/handlers/room_summary.py index 906985c7..781da9e8 100644 --- a/synapse/handlers/room_summary.py +++ b/synapse/handlers/room_summary.py @@ -28,9 +28,15 @@ from synapse.api.constants import ( Membership, RoomTypes, ) -from synapse.api.errors import AuthError, Codes, NotFoundError, StoreError, SynapseError +from synapse.api.errors import ( + AuthError, + Codes, + NotFoundError, + StoreError, + SynapseError, + UnsupportedRoomVersionError, +) from synapse.events import EventBase -from synapse.events.utils import format_event_for_client_v2 from synapse.types import JsonDict from synapse.util.caches.response_cache import ResponseCache @@ -82,7 +88,6 @@ class RoomSummaryHandler: _PAGINATION_SESSION_VALIDITY_PERIOD_MS = 5 * 60 * 1000 def __init__(self, hs: "HomeServer"): - self._clock = hs.get_clock() self._event_auth_handler = hs.get_event_auth_handler() self._store = hs.get_datastore() self._event_serializer = hs.get_event_client_serializer() @@ -641,18 +646,18 @@ class RoomSummaryHandler: if max_children is None or max_children > MAX_ROOMS_PER_SPACE: max_children = MAX_ROOMS_PER_SPACE - now = self._clock.time_msec() - events_result: List[JsonDict] = [] - for edge_event in itertools.islice(child_events, max_children): - events_result.append( - await self._event_serializer.serialize_event( - edge_event, - time_now=now, - event_format=format_event_for_client_v2, - ) - ) - - return _RoomEntry(room_id, room_entry, events_result) + stripped_events: List[JsonDict] = [ + { + "type": e.type, + "state_key": e.state_key, + "content": e.content, + "room_id": e.room_id, + "sender": e.sender, + "origin_server_ts": e.origin_server_ts, + } + for e in itertools.islice(child_events, max_children) + ] + return _RoomEntry(room_id, room_entry, stripped_events) async def _summarize_remote_room( self, @@ -814,7 +819,12 @@ class RoomSummaryHandler: logger.info("room %s is unknown, omitting from summary", room_id) return False - room_version = await self._store.get_room_version(room_id) + try: + room_version = await self._store.get_room_version(room_id) + except UnsupportedRoomVersionError: + # If a room with an unsupported room version is encountered, ignore + # it to avoid breaking the entire summary response. + return False # Include the room if it has join rules of public or knock. join_rules_event_id = state_ids.get((EventTypes.JoinRules, "")) @@ -1139,25 +1149,26 @@ def _is_suggested_child_event(edge_event: EventBase) -> bool: _INVALID_ORDER_CHARS_RE = re.compile(r"[^\x20-\x7E]") -def _child_events_comparison_key(child: EventBase) -> Tuple[bool, Optional[str], str]: +def _child_events_comparison_key( + child: EventBase, +) -> Tuple[bool, Optional[str], int, str]: """ Generate a value for comparing two child events for ordering. - The rules for ordering are supposed to be: + The rules for ordering are: 1. The 'order' key, if it is valid. - 2. The 'origin_server_ts' of the 'm.room.create' event. + 2. The 'origin_server_ts' of the 'm.space.child' event. 3. The 'room_id'. - But we skip step 2 since we may not have any state from the room. - Args: child: The event for generating a comparison key. Returns: The comparison key as a tuple of: False if the ordering is valid. - The ordering field. + The 'order' field or None if it is not given or invalid. + The 'origin_server_ts' field. The room ID. """ order = child.content.get("order") @@ -1168,4 +1179,4 @@ def _child_events_comparison_key(child: EventBase) -> Tuple[bool, Optional[str], order = None # Items without an order come last. - return (order is None, order, child.room_id) + return (order is None, order, child.origin_server_ts, child.room_id) diff --git a/synapse/handlers/saml.py b/synapse/handlers/saml.py index e6e71e97..0066d570 100644 --- a/synapse/handlers/saml.py +++ b/synapse/handlers/saml.py @@ -80,7 +80,6 @@ class SamlHandler(BaseHandler): # the SsoIdentityProvider protocol type. self.idp_icon = None self.idp_brand = None - self.unstable_idp_brand = None # a map from saml session id to Saml2SessionData object self._outstanding_requests_dict: Dict[str, Saml2SessionData] = {} diff --git a/synapse/handlers/sso.py b/synapse/handlers/sso.py index 0e6ebb57..05aa76d6 100644 --- a/synapse/handlers/sso.py +++ b/synapse/handlers/sso.py @@ -104,11 +104,6 @@ class SsoIdentityProvider(Protocol): """Optional branding identifier""" return None - @property - def unstable_idp_brand(self) -> Optional[str]: - """Optional brand identifier for the unstable API (see MSC2858).""" - return None - @abc.abstractmethod async def handle_redirect_request( self, @@ -449,14 +444,16 @@ class SsoHandler: if not user_id: attributes = await self._call_attribute_mapper(sso_to_matrix_id_mapper) - if attributes.localpart is None: - # the mapper doesn't return a username. bail out with a redirect to - # the username picker. - await self._redirect_to_username_picker( + next_step_url = self._get_url_for_next_new_user_step( + attributes=attributes + ) + if next_step_url: + await self._redirect_to_next_new_user_step( auth_provider_id, remote_user_id, attributes, client_redirect_url, + next_step_url, extra_login_attributes, ) @@ -535,18 +532,53 @@ class SsoHandler: ) return attributes - async def _redirect_to_username_picker( + def _get_url_for_next_new_user_step( + self, + attributes: Optional[UserAttributes] = None, + session: Optional[UsernameMappingSession] = None, + ) -> bytes: + """Returns the URL to redirect to for the next step of new user registration + + Given attributes from the user mapping provider or a UsernameMappingSession, + returns the URL to redirect to for the next step of the registration flow. + + Args: + attributes: the user attributes returned by the user mapping provider, + from before a UsernameMappingSession has begun. + + session: an active UsernameMappingSession, possibly with some of its + attributes chosen by the user. + + Returns: + The URL to redirect to, or an empty value if no redirect is necessary + """ + # Must provide either attributes or session, not both + assert (attributes is not None) != (session is not None) + + if (attributes and attributes.localpart is None) or ( + session and session.chosen_localpart is None + ): + return b"/_synapse/client/pick_username/account_details" + elif self._consent_at_registration and not ( + session and session.terms_accepted_version + ): + return b"/_synapse/client/new_user_consent" + else: + return b"/_synapse/client/sso_register" if session else b"" + + async def _redirect_to_next_new_user_step( self, auth_provider_id: str, remote_user_id: str, attributes: UserAttributes, client_redirect_url: str, + next_step_url: bytes, extra_login_attributes: Optional[JsonDict], ) -> NoReturn: """Creates a UsernameMappingSession and redirects the browser - Called if the user mapping provider doesn't return a localpart for a new user. - Raises a RedirectException which redirects the browser to the username picker. + Called if the user mapping provider doesn't return complete information for a new user. + Raises a RedirectException which redirects the browser to a specified URL. Args: auth_provider_id: A unique identifier for this SSO provider, e.g. @@ -559,12 +591,15 @@ class SsoHandler: client_redirect_url: The redirect URL passed in by the client, which we will eventually redirect back to. + next_step_url: The URL to redirect to for the next step of the new user flow. + extra_login_attributes: An optional dictionary of extra attributes to be provided to the client in the login response. Raises: RedirectException """ + # TODO: If needed, allow using/looking up an existing session here. session_id = random_string(16) now = self._clock.time_msec() session = UsernameMappingSession( @@ -575,13 +610,18 @@ class SsoHandler: client_redirect_url=client_redirect_url, expiry_time_ms=now + self._MAPPING_SESSION_VALIDITY_PERIOD_MS, extra_login_attributes=extra_login_attributes, + # Treat the localpart returned by the user mapping provider as though + # it was chosen by the user. If it's None, it must be chosen eventually. + chosen_localpart=attributes.localpart, + # TODO: Consider letting the user mapping provider specify defaults for + # other user-chosen attributes. ) self._username_mapping_sessions[session_id] = session logger.info("Recorded registration session id %s", session_id) - # Set the cookie and redirect to the username picker - e = RedirectException(b"/_synapse/client/pick_username/account_details") + # Set the cookie and redirect to the next step + e = RedirectException(next_step_url) e.cookies.append( b"%s=%s; path=/" % (USERNAME_MAPPING_SESSION_COOKIE_NAME, session_id.encode("ascii")) @@ -810,16 +850,9 @@ class SsoHandler: ) session.emails_to_use = filtered_emails - # we may now need to collect consent from the user, in which case, redirect - # to the consent-extraction-unit - if self._consent_at_registration: - redirect_url = b"/_synapse/client/new_user_consent" - - # otherwise, redirect to the completion page - else: - redirect_url = b"/_synapse/client/sso_register" - - respond_with_redirect(request, redirect_url) + respond_with_redirect( + request, self._get_url_for_next_new_user_step(session=session) + ) async def handle_terms_accepted( self, request: Request, session_id: str, terms_version: str @@ -847,8 +880,9 @@ class SsoHandler: session.terms_accepted_version = terms_version - # we're done; now we can register the user - respond_with_redirect(request, b"/_synapse/client/sso_register") + respond_with_redirect( + request, self._get_url_for_next_new_user_step(session=session) + ) async def register_sso_user(self, request: Request, session_id: str) -> None: """Called once we have all the info we need to register a new user. diff --git a/synapse/handlers/state_deltas.py b/synapse/handlers/state_deltas.py index 077c7c06..d30ba2b7 100644 --- a/synapse/handlers/state_deltas.py +++ b/synapse/handlers/state_deltas.py @@ -13,6 +13,7 @@ # limitations under the License. import logging +from enum import Enum, auto from typing import TYPE_CHECKING, Optional if TYPE_CHECKING: @@ -21,6 +22,12 @@ if TYPE_CHECKING: logger = logging.getLogger(__name__) +class MatchChange(Enum): + no_change = auto() + now_true = auto() + now_false = auto() + + class StateDeltasHandler: def __init__(self, hs: "HomeServer"): self.store = hs.get_datastore() @@ -31,18 +38,12 @@ class StateDeltasHandler: event_id: Optional[str], key_name: str, public_value: str, - ) -> Optional[bool]: + ) -> MatchChange: """Given two events check if the `key_name` field in content changed from not matching `public_value` to doing so. For example, check if `history_visibility` (`key_name`) changed from `shared` to `world_readable` (`public_value`). - - Returns: - None if the field in the events either both match `public_value` - or if neither do, i.e. there has been no change. - True if it didn't match `public_value` but now does - False if it did match `public_value` but now doesn't """ prev_event = None event = None @@ -54,7 +55,7 @@ class StateDeltasHandler: if not event and not prev_event: logger.debug("Neither event exists: %r %r", prev_event_id, event_id) - return None + return MatchChange.no_change prev_value = None value = None @@ -68,8 +69,8 @@ class StateDeltasHandler: logger.debug("prev_value: %r -> value: %r", prev_value, value) if value == public_value and prev_value != public_value: - return True + return MatchChange.now_true elif value != public_value and prev_value == public_value: - return False + return MatchChange.now_false else: - return None + return MatchChange.no_change diff --git a/synapse/handlers/stats.py b/synapse/handlers/stats.py index 3fd89af2..b64ce8ca 100644 --- a/synapse/handlers/stats.py +++ b/synapse/handlers/stats.py @@ -18,7 +18,7 @@ from typing import TYPE_CHECKING, Any, Dict, Iterable, Optional, Tuple from typing_extensions import Counter as CounterType -from synapse.api.constants import EventTypes, Membership +from synapse.api.constants import EventContentFields, EventTypes, Membership from synapse.metrics import event_processing_positions from synapse.metrics.background_process_metrics import run_as_background_process from synapse.types import JsonDict @@ -54,7 +54,7 @@ class StatsHandler: # Guard to ensure we only process deltas one at a time self._is_processing = False - if self.stats_enabled and hs.config.run_background_tasks: + if self.stats_enabled and hs.config.worker.run_background_tasks: self.notifier.add_replication_callback(self.notify_new_event) # We kick this off so that we don't have to wait for a change before @@ -254,7 +254,7 @@ class StatsHandler: elif typ == EventTypes.Create: room_state["is_federatable"] = ( - event_content.get("m.federate", True) is True + event_content.get(EventContentFields.FEDERATE, True) is True ) elif typ == EventTypes.JoinRules: room_state["join_rules"] = event_content.get("join_rule") @@ -273,7 +273,9 @@ class StatsHandler: elif typ == EventTypes.CanonicalAlias: room_state["canonical_alias"] = event_content.get("alias") elif typ == EventTypes.GuestAccess: - room_state["guest_access"] = event_content.get("guest_access") + room_state["guest_access"] = event_content.get( + EventContentFields.GUEST_ACCESS + ) for room_id, state in room_to_state_updates.items(): logger.debug("Updating room_stats_state for %s: %s", room_id, state) diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 86c3c7f0..edfdb99c 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -505,10 +505,13 @@ class SyncHandler: else: limited = False + log_kv({"limited": limited}) + if potential_recents: recents = sync_config.filter_collection.filter_room_timeline( potential_recents ) + log_kv({"recents_after_sync_filtering": len(recents)}) # We check if there are any state events, if there are then we pass # all current state events to the filter_events function. This is to @@ -526,6 +529,7 @@ class SyncHandler: recents, always_include_ids=current_state_ids, ) + log_kv({"recents_after_visibility_filtering": len(recents)}) else: recents = [] @@ -566,10 +570,15 @@ class SyncHandler: events, end_key = await self.store.get_recent_events_for_room( room_id, limit=load_limit + 1, end_token=end_key ) + + log_kv({"loaded_recents": len(events)}) + loaded_recents = sync_config.filter_collection.filter_room_timeline( events ) + log_kv({"loaded_recents_after_sync_filtering": len(loaded_recents)}) + # We check if there are any state events, if there are then we pass # all current state events to the filter_events function. This is to # ensure that we always include current state in the timeline @@ -586,6 +595,9 @@ class SyncHandler: loaded_recents, always_include_ids=current_state_ids, ) + + log_kv({"loaded_recents_after_client_filtering": len(loaded_recents)}) + loaded_recents.extend(recents) recents = loaded_recents @@ -1116,6 +1128,8 @@ class SyncHandler: logger.debug("Fetching group data") await self._generate_sync_entry_for_groups(sync_result_builder) + num_events = 0 + # debug for https://github.com/matrix-org/synapse/issues/4422 for joined_room in sync_result_builder.joined: room_id = joined_room.room_id @@ -1123,6 +1137,14 @@ class SyncHandler: issue4422_logger.debug( "Sync result for newly joined room %s: %r", room_id, joined_room ) + num_events += len(joined_room.timeline.events) + + log_kv( + { + "joined_rooms_in_result": len(sync_result_builder.joined), + "events_in_result": num_events, + } + ) logger.debug("Sync response calculation complete") return SyncResult( @@ -1467,6 +1489,7 @@ class SyncHandler: if not sync_result_builder.full_state: if since_token and not ephemeral_by_room and not account_data_by_room: have_changed = await self._have_rooms_changed(sync_result_builder) + log_kv({"rooms_have_changed": have_changed}) if not have_changed: tags_by_room = await self.store.get_updated_tags( user_id, since_token.account_data_key @@ -1501,13 +1524,15 @@ class SyncHandler: tags_by_room = await self.store.get_tags_for_user(user_id) + log_kv({"rooms_changed": len(room_changes.room_entries)}) + room_entries = room_changes.room_entries invited = room_changes.invited knocked = room_changes.knocked newly_joined_rooms = room_changes.newly_joined_rooms newly_left_rooms = room_changes.newly_left_rooms - async def handle_room_entries(room_entry): + async def handle_room_entries(room_entry: "RoomSyncResultBuilder"): logger.debug("Generating room entry for %s", room_entry.room_id) res = await self._generate_room_entry( sync_result_builder, @@ -1933,125 +1958,156 @@ class SyncHandler: since_token = room_builder.since_token upto_token = room_builder.upto_token - batch = await self._load_filtered_recents( - room_id, - sync_config, - now_token=upto_token, - since_token=since_token, - potential_recents=events, - newly_joined_room=newly_joined, - ) + with start_active_span("generate_room_entry"): + set_tag("room_id", room_id) + log_kv({"events": len(events or ())}) - # Note: `batch` can be both empty and limited here in the case where - # `_load_filtered_recents` can't find any events the user should see - # (e.g. due to having ignored the sender of the last 50 events). + log_kv( + { + "since_token": since_token, + "upto_token": upto_token, + } + ) - if newly_joined: - # debug for https://github.com/matrix-org/synapse/issues/4422 - issue4422_logger.debug( - "Timeline events after filtering in newly-joined room %s: %r", + batch = await self._load_filtered_recents( room_id, - batch, + sync_config, + now_token=upto_token, + since_token=since_token, + potential_recents=events, + newly_joined_room=newly_joined, + ) + log_kv( + { + "batch_events": len(batch.events), + "prev_batch": batch.prev_batch, + "batch_limited": batch.limited, + } ) - # When we join the room (or the client requests full_state), we should - # send down any existing tags. Usually the user won't have tags in a - # newly joined room, unless either a) they've joined before or b) the - # tag was added by synapse e.g. for server notice rooms. - if full_state: - user_id = sync_result_builder.sync_config.user.to_string() - tags = await self.store.get_tags_for_room(user_id, room_id) + # Note: `batch` can be both empty and limited here in the case where + # `_load_filtered_recents` can't find any events the user should see + # (e.g. due to having ignored the sender of the last 50 events). - # If there aren't any tags, don't send the empty tags list down - # sync - if not tags: - tags = None + if newly_joined: + # debug for https://github.com/matrix-org/synapse/issues/4422 + issue4422_logger.debug( + "Timeline events after filtering in newly-joined room %s: %r", + room_id, + batch, + ) - account_data_events = [] - if tags is not None: - account_data_events.append({"type": "m.tag", "content": {"tags": tags}}) + # When we join the room (or the client requests full_state), we should + # send down any existing tags. Usually the user won't have tags in a + # newly joined room, unless either a) they've joined before or b) the + # tag was added by synapse e.g. for server notice rooms. + if full_state: + user_id = sync_result_builder.sync_config.user.to_string() + tags = await self.store.get_tags_for_room(user_id, room_id) - for account_data_type, content in account_data.items(): - account_data_events.append({"type": account_data_type, "content": content}) + # If there aren't any tags, don't send the empty tags list down + # sync + if not tags: + tags = None - account_data_events = sync_config.filter_collection.filter_room_account_data( - account_data_events - ) + account_data_events = [] + if tags is not None: + account_data_events.append({"type": "m.tag", "content": {"tags": tags}}) - ephemeral = sync_config.filter_collection.filter_room_ephemeral(ephemeral) + for account_data_type, content in account_data.items(): + account_data_events.append( + {"type": account_data_type, "content": content} + ) - if not ( - always_include or batch or account_data_events or ephemeral or full_state - ): - return + account_data_events = ( + sync_config.filter_collection.filter_room_account_data( + account_data_events + ) + ) - state = await self.compute_state_delta( - room_id, batch, sync_config, since_token, now_token, full_state=full_state - ) + ephemeral = sync_config.filter_collection.filter_room_ephemeral(ephemeral) - summary: Optional[JsonDict] = {} - - # we include a summary in room responses when we're lazy loading - # members (as the client otherwise doesn't have enough info to form - # the name itself). - if sync_config.filter_collection.lazy_load_members() and ( - # we recalculate the summary: - # if there are membership changes in the timeline, or - # if membership has changed during a gappy sync, or - # if this is an initial sync. - any(ev.type == EventTypes.Member for ev in batch.events) - or ( - # XXX: this may include false positives in the form of LL - # members which have snuck into state - batch.limited - and any(t == EventTypes.Member for (t, k) in state) - ) - or since_token is None - ): - summary = await self.compute_summary( - room_id, sync_config, batch, state, now_token - ) + if not ( + always_include + or batch + or account_data_events + or ephemeral + or full_state + ): + return - if room_builder.rtype == "joined": - unread_notifications: Dict[str, int] = {} - room_sync = JoinedSyncResult( - room_id=room_id, - timeline=batch, - state=state, - ephemeral=ephemeral, - account_data=account_data_events, - unread_notifications=unread_notifications, - summary=summary, - unread_count=0, + state = await self.compute_state_delta( + room_id, + batch, + sync_config, + since_token, + now_token, + full_state=full_state, ) - if room_sync or always_include: - notifs = await self.unread_notifs_for_room_id(room_id, sync_config) + summary: Optional[JsonDict] = {} + + # we include a summary in room responses when we're lazy loading + # members (as the client otherwise doesn't have enough info to form + # the name itself). + if sync_config.filter_collection.lazy_load_members() and ( + # we recalculate the summary: + # if there are membership changes in the timeline, or + # if membership has changed during a gappy sync, or + # if this is an initial sync. + any(ev.type == EventTypes.Member for ev in batch.events) + or ( + # XXX: this may include false positives in the form of LL + # members which have snuck into state + batch.limited + and any(t == EventTypes.Member for (t, k) in state) + ) + or since_token is None + ): + summary = await self.compute_summary( + room_id, sync_config, batch, state, now_token + ) + + if room_builder.rtype == "joined": + unread_notifications: Dict[str, int] = {} + room_sync = JoinedSyncResult( + room_id=room_id, + timeline=batch, + state=state, + ephemeral=ephemeral, + account_data=account_data_events, + unread_notifications=unread_notifications, + summary=summary, + unread_count=0, + ) - unread_notifications["notification_count"] = notifs["notify_count"] - unread_notifications["highlight_count"] = notifs["highlight_count"] + if room_sync or always_include: + notifs = await self.unread_notifs_for_room_id(room_id, sync_config) - room_sync.unread_count = notifs["unread_count"] + unread_notifications["notification_count"] = notifs["notify_count"] + unread_notifications["highlight_count"] = notifs["highlight_count"] - sync_result_builder.joined.append(room_sync) + room_sync.unread_count = notifs["unread_count"] - if batch.limited and since_token: - user_id = sync_result_builder.sync_config.user.to_string() - logger.debug( - "Incremental gappy sync of %s for user %s with %d state events" - % (room_id, user_id, len(state)) + sync_result_builder.joined.append(room_sync) + + if batch.limited and since_token: + user_id = sync_result_builder.sync_config.user.to_string() + logger.debug( + "Incremental gappy sync of %s for user %s with %d state events" + % (room_id, user_id, len(state)) + ) + elif room_builder.rtype == "archived": + archived_room_sync = ArchivedSyncResult( + room_id=room_id, + timeline=batch, + state=state, + account_data=account_data_events, ) - elif room_builder.rtype == "archived": - archived_room_sync = ArchivedSyncResult( - room_id=room_id, - timeline=batch, - state=state, - account_data=account_data_events, - ) - if archived_room_sync or always_include: - sync_result_builder.archived.append(archived_room_sync) - else: - raise Exception("Unrecognized rtype: %r", room_builder.rtype) + if archived_room_sync or always_include: + sync_result_builder.archived.append(archived_room_sync) + else: + raise Exception("Unrecognized rtype: %r", room_builder.rtype) async def get_rooms_for_user_at( self, user_id: str, room_key: RoomStreamToken diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index a97c4485..9cea011e 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -53,7 +53,7 @@ class FollowerTypingHandler: def __init__(self, hs: "HomeServer"): self.store = hs.get_datastore() - self.server_name = hs.config.server_name + self.server_name = hs.config.server.server_name self.clock = hs.get_clock() self.is_mine_id = hs.is_mine_id @@ -73,7 +73,7 @@ class FollowerTypingHandler: self._room_typing: Dict[str, Set[str]] = {} self._member_last_federation_poke: Dict[RoomMember, int] = {} - self.wheel_timer = WheelTimer(bucket_size=5000) + self.wheel_timer: WheelTimer[RoomMember] = WheelTimer(bucket_size=5000) self._latest_room_serial = 0 self.clock.looping_call(self._handle_timeouts, 5000) diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py index 6edb1da5..6faa1d84 100644 --- a/synapse/handlers/user_directory.py +++ b/synapse/handlers/user_directory.py @@ -17,7 +17,7 @@ from typing import TYPE_CHECKING, Any, Dict, List, Optional import synapse.metrics from synapse.api.constants import EventTypes, HistoryVisibility, JoinRules, Membership -from synapse.handlers.state_deltas import StateDeltasHandler +from synapse.handlers.state_deltas import MatchChange, StateDeltasHandler from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage.roommember import ProfileInfo from synapse.types import JsonDict @@ -30,14 +30,26 @@ logger = logging.getLogger(__name__) class UserDirectoryHandler(StateDeltasHandler): - """Handles querying of and keeping updated the user_directory. + """Handles queries and updates for the user_directory. N.B.: ASSUMES IT IS THE ONLY THING THAT MODIFIES THE USER DIRECTORY - The user directory is filled with users who this server can see are joined to a - world_readable or publicly joinable room. We keep a database table up to date - by streaming changes of the current state and recalculating whether users should - be in the directory or not when necessary. + When a local user searches the user_directory, we report two kinds of users: + + - users this server can see are joined to a world_readable or publicly + joinable room, and + - users belonging to a private room shared by that local user. + + The two cases are tracked separately in the `users_in_public_rooms` and + `users_who_share_private_rooms` tables. Both kinds of users have their + username and avatar tracked in a `user_directory` table. + + This handler has three responsibilities: + 1. Forwarding requests to `/user_directory/search` to the UserDirectoryStore. + 2. Providing hooks for the application to call when local users are added, + removed, or have their profile changed. + 3. Listening for room state changes that indicate remote users have + joined or left a room, or that their profile has changed. """ def __init__(self, hs: "HomeServer"): @@ -130,7 +142,7 @@ class UserDirectoryHandler(StateDeltasHandler): user_id, profile.display_name, profile.avatar_url ) - async def handle_user_deactivated(self, user_id: str) -> None: + async def handle_local_user_deactivated(self, user_id: str) -> None: """Called when a user ID is deactivated""" # FIXME(#3714): We should probably do this in the same worker as all # the other changes. @@ -196,7 +208,7 @@ class UserDirectoryHandler(StateDeltasHandler): public_value=Membership.JOIN, ) - if change is False: + if change is MatchChange.now_false: # Need to check if the server left the room entirely, if so # we might need to remove all the users in that room is_in_room = await self.store.is_host_joined( @@ -219,14 +231,14 @@ class UserDirectoryHandler(StateDeltasHandler): is_support = await self.store.is_support_user(state_key) if not is_support: - if change is None: + if change is MatchChange.no_change: # Handle any profile changes await self._handle_profile_change( state_key, room_id, prev_event_id, event_id ) continue - if change: # The user joined + if change is MatchChange.now_true: # The user joined event = await self.store.get_event(event_id, allow_none=True) # It isn't expected for this event to not exist, but we # don't want the entire background process to break. @@ -263,14 +275,14 @@ class UserDirectoryHandler(StateDeltasHandler): logger.debug("Handling change for %s: %s", typ, room_id) if typ == EventTypes.RoomHistoryVisibility: - change = await self._get_key_change( + publicness = await self._get_key_change( prev_event_id, event_id, key_name="history_visibility", public_value=HistoryVisibility.WORLD_READABLE, ) elif typ == EventTypes.JoinRules: - change = await self._get_key_change( + publicness = await self._get_key_change( prev_event_id, event_id, key_name="join_rule", @@ -278,9 +290,7 @@ class UserDirectoryHandler(StateDeltasHandler): ) else: raise Exception("Invalid event type") - # If change is None, no change. True => become world_readable/public, - # False => was world_readable/public - if change is None: + if publicness is MatchChange.no_change: logger.debug("No change") return @@ -290,13 +300,13 @@ class UserDirectoryHandler(StateDeltasHandler): room_id ) - logger.debug("Change: %r, is_public: %r", change, is_public) + logger.debug("Change: %r, publicness: %r", publicness, is_public) - if change and not is_public: + if publicness is MatchChange.now_true and not is_public: # If we became world readable but room isn't currently public then # we ignore the change return - elif not change and is_public: + elif publicness is MatchChange.now_false and is_public: # If we stopped being world readable but are still public, # ignore the change return |