summaryrefslogtreecommitdiff
path: root/synapse/handlers
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers')
-rw-r--r--synapse/handlers/admin.py22
-rw-r--r--synapse/handlers/appservice.py91
-rw-r--r--synapse/handlers/auth.py4
-rw-r--r--synapse/handlers/directory.py2
-rw-r--r--synapse/handlers/e2e_keys.py211
-rw-r--r--synapse/handlers/federation_event.py6
-rw-r--r--synapse/handlers/identity.py6
-rw-r--r--synapse/handlers/message.py23
-rw-r--r--synapse/handlers/pagination.py2
-rw-r--r--synapse/handlers/presence.py2
-rw-r--r--synapse/handlers/profile.py4
-rw-r--r--synapse/handlers/room.py4
-rw-r--r--synapse/handlers/room_batch.py2
-rw-r--r--synapse/handlers/room_member.py4
-rw-r--r--synapse/handlers/search.py12
-rw-r--r--synapse/handlers/typing.py6
16 files changed, 262 insertions, 139 deletions
diff --git a/synapse/handlers/admin.py b/synapse/handlers/admin.py
index a53cd62d..be3203ac 100644
--- a/synapse/handlers/admin.py
+++ b/synapse/handlers/admin.py
@@ -90,6 +90,7 @@ class AdminHandler:
Membership.LEAVE,
Membership.BAN,
Membership.INVITE,
+ Membership.KNOCK,
),
)
@@ -122,6 +123,13 @@ class AdminHandler:
invited_state = invite.unsigned["invite_room_state"]
writer.write_invite(room_id, invite, invited_state)
+ if room.membership == Membership.KNOCK:
+ event_id = room.event_id
+ knock = await self.store.get_event(event_id, allow_none=True)
+ if knock:
+ knock_state = knock.unsigned["knock_room_state"]
+ writer.write_knock(room_id, knock, knock_state)
+
continue
# We only want to bother fetching events up to the last time they
@@ -239,6 +247,20 @@ class ExfiltrationWriter(metaclass=abc.ABCMeta):
raise NotImplementedError()
@abc.abstractmethod
+ def write_knock(
+ self, room_id: str, event: EventBase, state: StateMap[dict]
+ ) -> None:
+ """Write a knock for the room, with associated knock state.
+
+ Args:
+ room_id: The room ID the knock is for.
+ event: The knock event.
+ state: A subset of the state at the knock, with a subset of the
+ event keys (type, state_key content and sender).
+ """
+ raise NotImplementedError()
+
+ @abc.abstractmethod
def finished(self) -> Any:
"""Called when all data has successfully been exported and written.
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index 36c206da..ddc9105e 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -34,6 +34,7 @@ from synapse.metrics.background_process_metrics import (
)
from synapse.storage.databases.main.directory import RoomAliasMapping
from synapse.types import JsonDict, RoomAlias, RoomStreamToken, UserID
+from synapse.util.async_helpers import Linearizer
from synapse.util.metrics import Measure
if TYPE_CHECKING:
@@ -58,6 +59,10 @@ class ApplicationServicesHandler:
self.current_max = 0
self.is_processing = False
+ self._ephemeral_events_linearizer = Linearizer(
+ name="appservice_ephemeral_events"
+ )
+
def notify_interested_services(self, max_token: RoomStreamToken) -> None:
"""Notifies (pushes) all application services interested in this event.
@@ -182,7 +187,7 @@ class ApplicationServicesHandler:
def notify_interested_services_ephemeral(
self,
stream_key: str,
- new_token: Optional[int],
+ new_token: Union[int, RoomStreamToken],
users: Optional[Collection[Union[str, UserID]]] = None,
) -> None:
"""
@@ -203,7 +208,7 @@ class ApplicationServicesHandler:
Appservices will only receive ephemeral events that fall within their
registered user and room namespaces.
- new_token: The latest stream token.
+ new_token: The stream token of the event.
users: The users that should be informed of the new event, if any.
"""
if not self.notify_appservices:
@@ -212,6 +217,19 @@ class ApplicationServicesHandler:
if stream_key not in ("typing_key", "receipt_key", "presence_key"):
return
+ # Assert that new_token is an integer (and not a RoomStreamToken).
+ # All of the supported streams that this function handles use an
+ # integer to track progress (rather than a RoomStreamToken - a
+ # vector clock implementation) as they don't support multiple
+ # stream writers.
+ #
+ # As a result, we simply assert that new_token is an integer.
+ # If we do end up needing to pass a RoomStreamToken down here
+ # in the future, using RoomStreamToken.stream (the minimum stream
+ # position) to convert to an ascending integer value should work.
+ # Additional context: https://github.com/matrix-org/synapse/pull/11137
+ assert isinstance(new_token, int)
+
services = [
service
for service in self.store.get_app_services()
@@ -231,14 +249,13 @@ class ApplicationServicesHandler:
self,
services: List[ApplicationService],
stream_key: str,
- new_token: Optional[int],
+ new_token: int,
users: Collection[Union[str, UserID]],
) -> None:
logger.debug("Checking interested services for %s" % (stream_key))
with Measure(self.clock, "notify_interested_services_ephemeral"):
for service in services:
- # Only handle typing if we have the latest token
- if stream_key == "typing_key" and new_token is not None:
+ if stream_key == "typing_key":
# Note that we don't persist the token (via set_type_stream_id_for_appservice)
# for typing_key due to performance reasons and due to their highly
# ephemeral nature.
@@ -248,26 +265,37 @@ class ApplicationServicesHandler:
events = await self._handle_typing(service, new_token)
if events:
self.scheduler.submit_ephemeral_events_for_as(service, events)
+ continue
- elif stream_key == "receipt_key":
- events = await self._handle_receipts(service)
- if events:
- self.scheduler.submit_ephemeral_events_for_as(service, events)
-
- # Persist the latest handled stream token for this appservice
- await self.store.set_type_stream_id_for_appservice(
- service, "read_receipt", new_token
+ # Since we read/update the stream position for this AS/stream
+ with (
+ await self._ephemeral_events_linearizer.queue(
+ (service.id, stream_key)
)
+ ):
+ if stream_key == "receipt_key":
+ events = await self._handle_receipts(service, new_token)
+ if events:
+ self.scheduler.submit_ephemeral_events_for_as(
+ service, events
+ )
+
+ # Persist the latest handled stream token for this appservice
+ await self.store.set_type_stream_id_for_appservice(
+ service, "read_receipt", new_token
+ )
- elif stream_key == "presence_key":
- events = await self._handle_presence(service, users)
- if events:
- self.scheduler.submit_ephemeral_events_for_as(service, events)
+ elif stream_key == "presence_key":
+ events = await self._handle_presence(service, users, new_token)
+ if events:
+ self.scheduler.submit_ephemeral_events_for_as(
+ service, events
+ )
- # Persist the latest handled stream token for this appservice
- await self.store.set_type_stream_id_for_appservice(
- service, "presence", new_token
- )
+ # Persist the latest handled stream token for this appservice
+ await self.store.set_type_stream_id_for_appservice(
+ service, "presence", new_token
+ )
async def _handle_typing(
self, service: ApplicationService, new_token: int
@@ -304,7 +332,9 @@ class ApplicationServicesHandler:
)
return typing
- async def _handle_receipts(self, service: ApplicationService) -> List[JsonDict]:
+ async def _handle_receipts(
+ self, service: ApplicationService, new_token: Optional[int]
+ ) -> List[JsonDict]:
"""
Return the latest read receipts that the given application service should receive.
@@ -323,6 +353,12 @@ class ApplicationServicesHandler:
from_key = await self.store.get_type_stream_id_for_appservice(
service, "read_receipt"
)
+ if new_token is not None and new_token <= from_key:
+ logger.debug(
+ "Rejecting token lower than or equal to stored: %s" % (new_token,)
+ )
+ return []
+
receipts_source = self.event_sources.sources.receipt
receipts, _ = await receipts_source.get_new_events_as(
service=service, from_key=from_key
@@ -330,7 +366,10 @@ class ApplicationServicesHandler:
return receipts
async def _handle_presence(
- self, service: ApplicationService, users: Collection[Union[str, UserID]]
+ self,
+ service: ApplicationService,
+ users: Collection[Union[str, UserID]],
+ new_token: Optional[int],
) -> List[JsonDict]:
"""
Return the latest presence updates that the given application service should receive.
@@ -353,6 +392,12 @@ class ApplicationServicesHandler:
from_key = await self.store.get_type_stream_id_for_appservice(
service, "presence"
)
+ if new_token is not None and new_token <= from_key:
+ logger.debug(
+ "Rejecting token lower than or equal to stored: %s" % (new_token,)
+ )
+ return []
+
for user in users:
if isinstance(user, str):
user = UserID.from_string(user)
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index d508d7d3..60e59d11 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -1989,7 +1989,9 @@ class PasswordAuthProvider:
self,
check_3pid_auth: Optional[CHECK_3PID_AUTH_CALLBACK] = None,
on_logged_out: Optional[ON_LOGGED_OUT_CALLBACK] = None,
- auth_checkers: Optional[Dict[Tuple[str, Tuple], CHECK_AUTH_CALLBACK]] = None,
+ auth_checkers: Optional[
+ Dict[Tuple[str, Tuple[str, ...]], CHECK_AUTH_CALLBACK]
+ ] = None,
) -> None:
# Register check_3pid_auth callback
if check_3pid_auth is not None:
diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py
index 8567cb0e..8ca5f60b 100644
--- a/synapse/handlers/directory.py
+++ b/synapse/handlers/directory.py
@@ -245,7 +245,7 @@ class DirectoryHandler:
servers = result.servers
else:
try:
- fed_result = await self.federation.make_query(
+ fed_result: Optional[JsonDict] = await self.federation.make_query(
destination=room_alias.domain,
query_type="directory",
args={"room_alias": room_alias.to_string()},
diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py
index d0fb2fc7..60c11e3d 100644
--- a/synapse/handlers/e2e_keys.py
+++ b/synapse/handlers/e2e_keys.py
@@ -201,95 +201,19 @@ class E2eKeysHandler:
r[user_id] = remote_queries[user_id]
# Now fetch any devices that we don't have in our cache
- @trace
- async def do_remote_query(destination: str) -> None:
- """This is called when we are querying the device list of a user on
- a remote homeserver and their device list is not in the device list
- cache. If we share a room with this user and we're not querying for
- specific user we will update the cache with their device list.
- """
-
- destination_query = remote_queries_not_in_cache[destination]
-
- # We first consider whether we wish to update the device list cache with
- # the users device list. We want to track a user's devices when the
- # authenticated user shares a room with the queried user and the query
- # has not specified a particular device.
- # If we update the cache for the queried user we remove them from further
- # queries. We use the more efficient batched query_client_keys for all
- # remaining users
- user_ids_updated = []
- for (user_id, device_list) in destination_query.items():
- if user_id in user_ids_updated:
- continue
-
- if device_list:
- continue
-
- room_ids = await self.store.get_rooms_for_user(user_id)
- if not room_ids:
- continue
-
- # We've decided we're sharing a room with this user and should
- # probably be tracking their device lists. However, we haven't
- # done an initial sync on the device list so we do it now.
- try:
- if self._is_master:
- user_devices = await self.device_handler.device_list_updater.user_device_resync(
- user_id
- )
- else:
- user_devices = await self._user_device_resync_client(
- user_id=user_id
- )
-
- user_devices = user_devices["devices"]
- user_results = results.setdefault(user_id, {})
- for device in user_devices:
- user_results[device["device_id"]] = device["keys"]
- user_ids_updated.append(user_id)
- except Exception as e:
- failures[destination] = _exception_to_failure(e)
-
- if len(destination_query) == len(user_ids_updated):
- # We've updated all the users in the query and we do not need to
- # make any further remote calls.
- return
-
- # Remove all the users from the query which we have updated
- for user_id in user_ids_updated:
- destination_query.pop(user_id)
-
- try:
- remote_result = await self.federation.query_client_keys(
- destination, {"device_keys": destination_query}, timeout=timeout
- )
-
- for user_id, keys in remote_result["device_keys"].items():
- if user_id in destination_query:
- results[user_id] = keys
-
- if "master_keys" in remote_result:
- for user_id, key in remote_result["master_keys"].items():
- if user_id in destination_query:
- cross_signing_keys["master_keys"][user_id] = key
-
- if "self_signing_keys" in remote_result:
- for user_id, key in remote_result["self_signing_keys"].items():
- if user_id in destination_query:
- cross_signing_keys["self_signing_keys"][user_id] = key
-
- except Exception as e:
- failure = _exception_to_failure(e)
- failures[destination] = failure
- set_tag("error", True)
- set_tag("reason", failure)
-
await make_deferred_yieldable(
defer.gatherResults(
[
- run_in_background(do_remote_query, destination)
- for destination in remote_queries_not_in_cache
+ run_in_background(
+ self._query_devices_for_destination,
+ results,
+ cross_signing_keys,
+ failures,
+ destination,
+ queries,
+ timeout,
+ )
+ for destination, queries in remote_queries_not_in_cache.items()
],
consumeErrors=True,
).addErrback(unwrapFirstError)
@@ -301,6 +225,121 @@ class E2eKeysHandler:
return ret
+ @trace
+ async def _query_devices_for_destination(
+ self,
+ results: JsonDict,
+ cross_signing_keys: JsonDict,
+ failures: Dict[str, JsonDict],
+ destination: str,
+ destination_query: Dict[str, Iterable[str]],
+ timeout: int,
+ ) -> None:
+ """This is called when we are querying the device list of a user on
+ a remote homeserver and their device list is not in the device list
+ cache. If we share a room with this user and we're not querying for
+ specific user we will update the cache with their device list.
+
+ Args:
+ results: A map from user ID to their device keys, which gets
+ updated with the newly fetched keys.
+ cross_signing_keys: Map from user ID to their cross signing keys,
+ which gets updated with the newly fetched keys.
+ failures: Map of destinations to failures that have occurred while
+ attempting to fetch keys.
+ destination: The remote server to query
+ destination_query: The query dict of devices to query the remote
+ server for.
+ timeout: The timeout for remote HTTP requests.
+ """
+
+ # We first consider whether we wish to update the device list cache with
+ # the users device list. We want to track a user's devices when the
+ # authenticated user shares a room with the queried user and the query
+ # has not specified a particular device.
+ # If we update the cache for the queried user we remove them from further
+ # queries. We use the more efficient batched query_client_keys for all
+ # remaining users
+ user_ids_updated = []
+ for (user_id, device_list) in destination_query.items():
+ if user_id in user_ids_updated:
+ continue
+
+ if device_list:
+ continue
+
+ room_ids = await self.store.get_rooms_for_user(user_id)
+ if not room_ids:
+ continue
+
+ # We've decided we're sharing a room with this user and should
+ # probably be tracking their device lists. However, we haven't
+ # done an initial sync on the device list so we do it now.
+ try:
+ if self._is_master:
+ resync_results = await self.device_handler.device_list_updater.user_device_resync(
+ user_id
+ )
+ else:
+ resync_results = await self._user_device_resync_client(
+ user_id=user_id
+ )
+
+ # Add the device keys to the results.
+ user_devices = resync_results["devices"]
+ user_results = results.setdefault(user_id, {})
+ for device in user_devices:
+ user_results[device["device_id"]] = device["keys"]
+ user_ids_updated.append(user_id)
+
+ # Add any cross signing keys to the results.
+ master_key = resync_results.get("master_key")
+ self_signing_key = resync_results.get("self_signing_key")
+
+ if master_key:
+ cross_signing_keys["master_keys"][user_id] = master_key
+
+ if self_signing_key:
+ cross_signing_keys["self_signing_keys"][user_id] = self_signing_key
+ except Exception as e:
+ failures[destination] = _exception_to_failure(e)
+
+ if len(destination_query) == len(user_ids_updated):
+ # We've updated all the users in the query and we do not need to
+ # make any further remote calls.
+ return
+
+ # Remove all the users from the query which we have updated
+ for user_id in user_ids_updated:
+ destination_query.pop(user_id)
+
+ try:
+ remote_result = await self.federation.query_client_keys(
+ destination, {"device_keys": destination_query}, timeout=timeout
+ )
+
+ for user_id, keys in remote_result["device_keys"].items():
+ if user_id in destination_query:
+ results[user_id] = keys
+
+ if "master_keys" in remote_result:
+ for user_id, key in remote_result["master_keys"].items():
+ if user_id in destination_query:
+ cross_signing_keys["master_keys"][user_id] = key
+
+ if "self_signing_keys" in remote_result:
+ for user_id, key in remote_result["self_signing_keys"].items():
+ if user_id in destination_query:
+ cross_signing_keys["self_signing_keys"][user_id] = key
+
+ except Exception as e:
+ failure = _exception_to_failure(e)
+ failures[destination] = failure
+ set_tag("error", True)
+ set_tag("reason", failure)
+
+ return
+
async def get_cross_signing_keys_from_cache(
self, query: Iterable[str], from_user_id: Optional[str]
) -> Dict[str, Dict[str, dict]]:
diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py
index 9584d5bd..1a1cd93b 100644
--- a/synapse/handlers/federation_event.py
+++ b/synapse/handlers/federation_event.py
@@ -477,7 +477,7 @@ class FederationEventHandler:
@log_function
async def backfill(
- self, dest: str, room_id: str, limit: int, extremities: Iterable[str]
+ self, dest: str, room_id: str, limit: int, extremities: Collection[str]
) -> None:
"""Trigger a backfill request to `dest` for the given `room_id`
@@ -1643,7 +1643,7 @@ class FederationEventHandler:
event: the event whose auth_events we want
Returns:
- all of the events in `event.auth_events`, after deduplication
+ all of the events listed in `event.auth_events_ids`, after deduplication
Raises:
AuthError if we were unable to fetch the auth_events for any reason.
@@ -1916,7 +1916,7 @@ class FederationEventHandler:
event_pos = PersistedEventPosition(
self._instance_name, event.internal_metadata.stream_ordering
)
- self._notifier.on_new_room_event(
+ await self._notifier.on_new_room_event(
event, event_pos, max_stream_token, extra_users=extra_users
)
diff --git a/synapse/handlers/identity.py b/synapse/handlers/identity.py
index 7ef8698a..3dbe611f 100644
--- a/synapse/handlers/identity.py
+++ b/synapse/handlers/identity.py
@@ -537,10 +537,6 @@ class IdentityHandler:
except RequestTimedOutError:
raise SynapseError(500, "Timed out contacting identity server")
- # It is already checked that public_baseurl is configured since this code
- # should only be used if account_threepid_delegate_msisdn is true.
- assert self.hs.config.server.public_baseurl
-
# we need to tell the client to send the token back to us, since it doesn't
# otherwise know where to send it, so add submit_url response parameter
# (see also MSC2078)
@@ -879,6 +875,8 @@ class IdentityHandler:
}
if room_type is not None:
+ invite_config["room_type"] = room_type
+ # TODO The unstable field is deprecated and should be removed in the future.
invite_config["org.matrix.msc3288.room_type"] = room_type
# If a custom web client location is available, include it in the request.
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 2e024b55..d4c2a6ab 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -1318,6 +1318,8 @@ class EventCreationHandler:
# user is actually admin or not).
is_admin_redaction = False
if event.type == EventTypes.Redaction:
+ assert event.redacts is not None
+
original_event = await self.store.get_event(
event.redacts,
redact_behaviour=EventRedactBehaviour.AS_IS,
@@ -1413,6 +1415,8 @@ class EventCreationHandler:
)
if event.type == EventTypes.Redaction:
+ assert event.redacts is not None
+
original_event = await self.store.get_event(
event.redacts,
redact_behaviour=EventRedactBehaviour.AS_IS,
@@ -1500,11 +1504,13 @@ class EventCreationHandler:
next_batch_id = event.content.get(
EventContentFields.MSC2716_NEXT_BATCH_ID
)
- conflicting_insertion_event_id = (
- await self.store.get_insertion_event_by_batch_id(
- event.room_id, next_batch_id
+ conflicting_insertion_event_id = None
+ if next_batch_id:
+ conflicting_insertion_event_id = (
+ await self.store.get_insertion_event_id_by_batch_id(
+ event.room_id, next_batch_id
+ )
)
- )
if conflicting_insertion_event_id is not None:
# The current insertion event that we're processing is invalid
# because an insertion event already exists in the room with the
@@ -1537,13 +1543,16 @@ class EventCreationHandler:
# If there's an expiry timestamp on the event, schedule its expiry.
self._message_handler.maybe_schedule_expiry(event)
- def _notify() -> None:
+ async def _notify() -> None:
try:
- self.notifier.on_new_room_event(
+ await self.notifier.on_new_room_event(
event, event_pos, max_stream_token, extra_users=extra_users
)
except Exception:
- logger.exception("Error notifying about new room event")
+ logger.exception(
+ "Error notifying about new room event %s",
+ event.event_id,
+ )
run_in_background(_notify)
diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py
index 60ff8963..abfe7be0 100644
--- a/synapse/handlers/pagination.py
+++ b/synapse/handlers/pagination.py
@@ -438,7 +438,7 @@ class PaginationHandler:
}
state = None
- if event_filter and event_filter.lazy_load_members() and len(events) > 0:
+ if event_filter and event_filter.lazy_load_members and len(events) > 0:
# TODO: remove redundant members
# FIXME: we also care about invite targets etc.
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index fdab50da..3df872c5 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -52,6 +52,7 @@ import synapse.metrics
from synapse.api.constants import EventTypes, Membership, PresenceState
from synapse.api.errors import SynapseError
from synapse.api.presence import UserPresenceState
+from synapse.appservice import ApplicationService
from synapse.events.presence_router import PresenceRouter
from synapse.logging.context import run_in_background
from synapse.logging.utils import log_function
@@ -1551,6 +1552,7 @@ class PresenceEventSource(EventSource[int, UserPresenceState]):
is_guest: bool = False,
explicit_room_id: Optional[str] = None,
include_offline: bool = True,
+ service: Optional[ApplicationService] = None,
) -> Tuple[List[UserPresenceState], int]:
# The process for getting presence events are:
# 1. Get the rooms the user is in.
diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py
index e6c3cf58..6b5a6ded 100644
--- a/synapse/handlers/profile.py
+++ b/synapse/handlers/profile.py
@@ -456,7 +456,11 @@ class ProfileHandler:
continue
new_name = profile.get("displayname")
+ if not isinstance(new_name, str):
+ new_name = None
new_avatar = profile.get("avatar_url")
+ if not isinstance(new_avatar, str):
+ new_avatar = None
# We always hit update to update the last_check timestamp
await self.store.update_remote_profile_cache(user_id, new_name, new_avatar)
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index cf01d58e..969eb3b9 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -525,7 +525,7 @@ class RoomCreationHandler:
):
await self.room_member_handler.update_membership(
requester,
- UserID.from_string(old_event["state_key"]),
+ UserID.from_string(old_event.state_key),
new_room_id,
"ban",
ratelimit=False,
@@ -1173,7 +1173,7 @@ class RoomContextHandler:
else:
last_event_id = event_id
- if event_filter and event_filter.lazy_load_members():
+ if event_filter and event_filter.lazy_load_members:
state_filter = StateFilter.from_lazy_load_member_list(
ev.sender
for ev in itertools.chain(
diff --git a/synapse/handlers/room_batch.py b/synapse/handlers/room_batch.py
index 2f5a3e4d..07232863 100644
--- a/synapse/handlers/room_batch.py
+++ b/synapse/handlers/room_batch.py
@@ -355,7 +355,7 @@ class RoomBatchHandler:
for (event, context) in reversed(events_to_persist):
await self.event_creation_handler.handle_new_client_event(
await self.create_requester_for_user_id_from_app_service(
- event["sender"], app_service_requester.app_service
+ event.sender, app_service_requester.app_service
),
event=event,
context=context,
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 74e6c7ec..08244b69 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -1669,7 +1669,9 @@ class RoomMemberMasterHandler(RoomMemberHandler):
#
# the prev_events consist solely of the previous membership event.
prev_event_ids = [previous_membership_event.event_id]
- auth_event_ids = previous_membership_event.auth_event_ids() + prev_event_ids
+ auth_event_ids = (
+ list(previous_membership_event.auth_event_ids()) + prev_event_ids
+ )
event, context = await self.event_creation_handler.create_event(
requester,
diff --git a/synapse/handlers/search.py b/synapse/handlers/search.py
index a3ffa26b..6e4dff80 100644
--- a/synapse/handlers/search.py
+++ b/synapse/handlers/search.py
@@ -249,7 +249,7 @@ class SearchHandler:
)
events.sort(key=lambda e: -rank_map[e.event_id])
- allowed_events = events[: search_filter.limit()]
+ allowed_events = events[: search_filter.limit]
for e in allowed_events:
rm = room_groups.setdefault(
@@ -271,13 +271,13 @@ class SearchHandler:
# We keep looping and we keep filtering until we reach the limit
# or we run out of things.
# But only go around 5 times since otherwise synapse will be sad.
- while len(room_events) < search_filter.limit() and i < 5:
+ while len(room_events) < search_filter.limit and i < 5:
i += 1
search_result = await self.store.search_rooms(
room_ids,
search_term,
keys,
- search_filter.limit() * 2,
+ search_filter.limit * 2,
pagination_token=pagination_token,
)
@@ -299,9 +299,9 @@ class SearchHandler:
)
room_events.extend(events)
- room_events = room_events[: search_filter.limit()]
+ room_events = room_events[: search_filter.limit]
- if len(results) < search_filter.limit() * 2:
+ if len(results) < search_filter.limit * 2:
pagination_token = None
break
else:
@@ -311,7 +311,7 @@ class SearchHandler:
group = room_groups.setdefault(event.room_id, {"results": []})
group["results"].append(event.event_id)
- if room_events and len(room_events) >= search_filter.limit():
+ if room_events and len(room_events) >= search_filter.limit:
last_event_id = room_events[-1].event_id
pagination_token = results_map[last_event_id]["pagination_token"]
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index c411d699..22c61748 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -62,8 +62,8 @@ class FollowerTypingHandler:
if hs.should_send_federation():
self.federation = hs.get_federation_sender()
- if hs.config.worker.writers.typing != hs.get_instance_name():
- hs.get_federation_registry().register_instance_for_edu(
+ if hs.get_instance_name() not in hs.config.worker.writers.typing:
+ hs.get_federation_registry().register_instances_for_edu(
"m.typing",
hs.config.worker.writers.typing,
)
@@ -205,7 +205,7 @@ class TypingWriterHandler(FollowerTypingHandler):
def __init__(self, hs: "HomeServer"):
super().__init__(hs)
- assert hs.config.worker.writers.typing == hs.get_instance_name()
+ assert hs.get_instance_name() in hs.config.worker.writers.typing
self.auth = hs.get_auth()
self.notifier = hs.get_notifier()