summaryrefslogtreecommitdiff
path: root/synapse/federation/federation_server.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/federation/federation_server.py')
-rw-r--r--synapse/federation/federation_server.py43
1 files changed, 18 insertions, 25 deletions
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index ac0f2ccf..29619aee 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -122,12 +122,12 @@ class FederationServer(FederationBase):
# origins that we are currently processing a transaction from.
# a dict from origin to txn id.
- self._active_transactions = {} # type: Dict[str, str]
+ self._active_transactions: Dict[str, str] = {}
# We cache results for transaction with the same ID
- self._transaction_resp_cache = ResponseCache(
+ self._transaction_resp_cache: ResponseCache[Tuple[str, str]] = ResponseCache(
hs.get_clock(), "fed_txn_handler", timeout_ms=30000
- ) # type: ResponseCache[Tuple[str, str]]
+ )
self.transaction_actions = TransactionActions(self.store)
@@ -135,12 +135,12 @@ class FederationServer(FederationBase):
# We cache responses to state queries, as they take a while and often
# come in waves.
- self._state_resp_cache = ResponseCache(
- hs.get_clock(), "state_resp", timeout_ms=30000
- ) # type: ResponseCache[Tuple[str, Optional[str]]]
- self._state_ids_resp_cache = ResponseCache(
+ self._state_resp_cache: ResponseCache[
+ Tuple[str, Optional[str]]
+ ] = ResponseCache(hs.get_clock(), "state_resp", timeout_ms=30000)
+ self._state_ids_resp_cache: ResponseCache[Tuple[str, str]] = ResponseCache(
hs.get_clock(), "state_ids_resp", timeout_ms=30000
- ) # type: ResponseCache[Tuple[str, str]]
+ )
self._federation_metrics_domains = (
hs.config.federation.federation_metrics_domains
@@ -337,7 +337,7 @@ class FederationServer(FederationBase):
origin_host, _ = parse_server_name(origin)
- pdus_by_room = {} # type: Dict[str, List[EventBase]]
+ pdus_by_room: Dict[str, List[EventBase]] = {}
newest_pdu_ts = 0
@@ -516,9 +516,9 @@ class FederationServer(FederationBase):
self, room_id: str, event_id: Optional[str]
) -> Dict[str, list]:
if event_id:
- pdus = await self.handler.get_state_for_pdu(
+ pdus: Iterable[EventBase] = await self.handler.get_state_for_pdu(
room_id, event_id
- ) # type: Iterable[EventBase]
+ )
else:
pdus = (await self.state.get_current_state(room_id)).values()
@@ -562,8 +562,7 @@ class FederationServer(FederationBase):
raise IncompatibleRoomVersionError(room_version=room_version)
pdu = await self.handler.on_make_join_request(origin, room_id, user_id)
- time_now = self._clock.time_msec()
- return {"event": pdu.get_pdu_json(time_now), "room_version": room_version}
+ return {"event": pdu.get_templated_pdu_json(), "room_version": room_version}
async def on_invite_request(
self, origin: str, content: JsonDict, room_version_id: str
@@ -611,8 +610,7 @@ class FederationServer(FederationBase):
room_version = await self.store.get_room_version_id(room_id)
- time_now = self._clock.time_msec()
- return {"event": pdu.get_pdu_json(time_now), "room_version": room_version}
+ return {"event": pdu.get_templated_pdu_json(), "room_version": room_version}
async def on_send_leave_request(
self, origin: str, content: JsonDict, room_id: str
@@ -659,9 +657,8 @@ class FederationServer(FederationBase):
)
pdu = await self.handler.on_make_knock_request(origin, room_id, user_id)
- time_now = self._clock.time_msec()
return {
- "event": pdu.get_pdu_json(time_now),
+ "event": pdu.get_templated_pdu_json(),
"room_version": room_version.identifier,
}
@@ -791,7 +788,7 @@ class FederationServer(FederationBase):
log_kv({"message": "Claiming one time keys.", "user, device pairs": query})
results = await self.store.claim_e2e_one_time_keys(query)
- json_result = {} # type: Dict[str, Dict[str, dict]]
+ json_result: Dict[str, Dict[str, dict]] = {}
for user_id, device_keys in results.items():
for device_id, keys in device_keys.items():
for key_id, json_str in keys.items():
@@ -1119,17 +1116,13 @@ class FederationHandlerRegistry:
self._get_query_client = ReplicationGetQueryRestServlet.make_client(hs)
self._send_edu = ReplicationFederationSendEduRestServlet.make_client(hs)
- self.edu_handlers = (
- {}
- ) # type: Dict[str, Callable[[str, dict], Awaitable[None]]]
- self.query_handlers = (
- {}
- ) # type: Dict[str, Callable[[dict], Awaitable[JsonDict]]]
+ self.edu_handlers: Dict[str, Callable[[str, dict], Awaitable[None]]] = {}
+ self.query_handlers: Dict[str, Callable[[dict], Awaitable[JsonDict]]] = {}
# Map from type to instance names that we should route EDU handling to.
# We randomly choose one instance from the list to route to for each new
# EDU received.
- self._edu_type_to_instance = {} # type: Dict[str, List[str]]
+ self._edu_type_to_instance: Dict[str, List[str]] = {}
def register_edu_handler(
self, edu_type: str, handler: Callable[[str, JsonDict], Awaitable[None]]