summaryrefslogtreecommitdiff
path: root/synapse/federation/federation_client.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/federation/federation_client.py')
-rw-r--r--synapse/federation/federation_client.py732
1 files changed, 732 insertions, 0 deletions
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
new file mode 100644
index 00000000..c6a8c124
--- /dev/null
+++ b/synapse/federation/federation_client.py
@@ -0,0 +1,732 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from twisted.internet import defer
+
+from .federation_base import FederationBase
+from synapse.api.constants import Membership
+from .units import Edu
+
+from synapse.api.errors import (
+ CodeMessageException, HttpResponseException, SynapseError,
+)
+from synapse.util import unwrapFirstError
+from synapse.util.caches.expiringcache import ExpiringCache
+from synapse.util.logutils import log_function
+from synapse.events import FrozenEvent
+import synapse.metrics
+
+from synapse.util.retryutils import get_retry_limiter, NotRetryingDestination
+
+import copy
+import itertools
+import logging
+import random
+
+
+logger = logging.getLogger(__name__)
+
+
+# synapse.federation.federation_client is a silly name
+metrics = synapse.metrics.get_metrics_for("synapse.federation.client")
+
+sent_pdus_destination_dist = metrics.register_distribution("sent_pdu_destinations")
+
+sent_edus_counter = metrics.register_counter("sent_edus")
+
+sent_queries_counter = metrics.register_counter("sent_queries", labels=["type"])
+
+
+class FederationClient(FederationBase):
+
+ def start_get_pdu_cache(self):
+ self._get_pdu_cache = ExpiringCache(
+ cache_name="get_pdu_cache",
+ clock=self._clock,
+ max_len=1000,
+ expiry_ms=120*1000,
+ reset_expiry_on_get=False,
+ )
+
+ self._get_pdu_cache.start()
+
+ @log_function
+ def send_pdu(self, pdu, destinations):
+ """Informs the replication layer about a new PDU generated within the
+ home server that should be transmitted to others.
+
+ TODO: Figure out when we should actually resolve the deferred.
+
+ Args:
+ pdu (Pdu): The new Pdu.
+
+ Returns:
+ Deferred: Completes when we have successfully processed the PDU
+ and replicated it to any interested remote home servers.
+ """
+ order = self._order
+ self._order += 1
+
+ sent_pdus_destination_dist.inc_by(len(destinations))
+
+ logger.debug("[%s] transaction_layer.enqueue_pdu... ", pdu.event_id)
+
+ # TODO, add errback, etc.
+ self._transaction_queue.enqueue_pdu(pdu, destinations, order)
+
+ logger.debug(
+ "[%s] transaction_layer.enqueue_pdu... done",
+ pdu.event_id
+ )
+
+ @log_function
+ def send_edu(self, destination, edu_type, content):
+ edu = Edu(
+ origin=self.server_name,
+ destination=destination,
+ edu_type=edu_type,
+ content=content,
+ )
+
+ sent_edus_counter.inc()
+
+ # TODO, add errback, etc.
+ self._transaction_queue.enqueue_edu(edu)
+ return defer.succeed(None)
+
+ @log_function
+ def send_failure(self, failure, destination):
+ self._transaction_queue.enqueue_failure(failure, destination)
+ return defer.succeed(None)
+
+ @log_function
+ def make_query(self, destination, query_type, args,
+ retry_on_dns_fail=True):
+ """Sends a federation Query to a remote homeserver of the given type
+ and arguments.
+
+ Args:
+ destination (str): Domain name of the remote homeserver
+ query_type (str): Category of the query type; should match the
+ handler name used in register_query_handler().
+ args (dict): Mapping of strings to strings containing the details
+ of the query request.
+
+ Returns:
+ a Deferred which will eventually yield a JSON object from the
+ response
+ """
+ sent_queries_counter.inc(query_type)
+
+ return self.transport_layer.make_query(
+ destination, query_type, args, retry_on_dns_fail=retry_on_dns_fail
+ )
+
+ @log_function
+ def query_client_keys(self, destination, content):
+ """Query device keys for a device hosted on a remote server.
+
+ Args:
+ destination (str): Domain name of the remote homeserver
+ content (dict): The query content.
+
+ Returns:
+ a Deferred which will eventually yield a JSON object from the
+ response
+ """
+ sent_queries_counter.inc("client_device_keys")
+ return self.transport_layer.query_client_keys(destination, content)
+
+ @log_function
+ def claim_client_keys(self, destination, content):
+ """Claims one-time keys for a device hosted on a remote server.
+
+ Args:
+ destination (str): Domain name of the remote homeserver
+ content (dict): The query content.
+
+ Returns:
+ a Deferred which will eventually yield a JSON object from the
+ response
+ """
+ sent_queries_counter.inc("client_one_time_keys")
+ return self.transport_layer.claim_client_keys(destination, content)
+
+ @defer.inlineCallbacks
+ @log_function
+ def backfill(self, dest, context, limit, extremities):
+ """Requests some more historic PDUs for the given context from the
+ given destination server.
+
+ Args:
+ dest (str): The remote home server to ask.
+ context (str): The context to backfill.
+ limit (int): The maximum number of PDUs to return.
+ extremities (list): List of PDU id and origins of the first pdus
+ we have seen from the context
+
+ Returns:
+ Deferred: Results in the received PDUs.
+ """
+ logger.debug("backfill extrem=%s", extremities)
+
+ # If there are no extremeties then we've (probably) reached the start.
+ if not extremities:
+ return
+
+ transaction_data = yield self.transport_layer.backfill(
+ dest, context, extremities, limit)
+
+ logger.debug("backfill transaction_data=%s", repr(transaction_data))
+
+ pdus = [
+ self.event_from_pdu_json(p, outlier=False)
+ for p in transaction_data["pdus"]
+ ]
+
+ # FIXME: We should handle signature failures more gracefully.
+ pdus[:] = yield defer.gatherResults(
+ self._check_sigs_and_hashes(pdus),
+ consumeErrors=True,
+ ).addErrback(unwrapFirstError)
+
+ defer.returnValue(pdus)
+
+ @defer.inlineCallbacks
+ @log_function
+ def get_pdu(self, destinations, event_id, outlier=False, timeout=None):
+ """Requests the PDU with given origin and ID from the remote home
+ servers.
+
+ Will attempt to get the PDU from each destination in the list until
+ one succeeds.
+
+ This will persist the PDU locally upon receipt.
+
+ Args:
+ destinations (list): Which home servers to query
+ pdu_origin (str): The home server that originally sent the pdu.
+ event_id (str)
+ outlier (bool): Indicates whether the PDU is an `outlier`, i.e. if
+ it's from an arbitary point in the context as opposed to part
+ of the current block of PDUs. Defaults to `False`
+ timeout (int): How long to try (in ms) each destination for before
+ moving to the next destination. None indicates no timeout.
+
+ Returns:
+ Deferred: Results in the requested PDU.
+ """
+
+ # TODO: Rate limit the number of times we try and get the same event.
+
+ if self._get_pdu_cache:
+ e = self._get_pdu_cache.get(event_id)
+ if e:
+ defer.returnValue(e)
+
+ pdu = None
+ for destination in destinations:
+ try:
+ limiter = yield get_retry_limiter(
+ destination,
+ self._clock,
+ self.store,
+ )
+
+ with limiter:
+ transaction_data = yield self.transport_layer.get_event(
+ destination, event_id, timeout=timeout,
+ )
+
+ logger.debug("transaction_data %r", transaction_data)
+
+ pdu_list = [
+ self.event_from_pdu_json(p, outlier=outlier)
+ for p in transaction_data["pdus"]
+ ]
+
+ if pdu_list and pdu_list[0]:
+ pdu = pdu_list[0]
+
+ # Check signatures are correct.
+ pdu = yield self._check_sigs_and_hashes([pdu])[0]
+
+ break
+
+ except SynapseError:
+ logger.info(
+ "Failed to get PDU %s from %s because %s",
+ event_id, destination, e,
+ )
+ continue
+ except CodeMessageException as e:
+ if 400 <= e.code < 500:
+ raise
+
+ logger.info(
+ "Failed to get PDU %s from %s because %s",
+ event_id, destination, e,
+ )
+ continue
+ except NotRetryingDestination as e:
+ logger.info(e.message)
+ continue
+ except Exception as e:
+ logger.info(
+ "Failed to get PDU %s from %s because %s",
+ event_id, destination, e,
+ )
+ continue
+
+ if self._get_pdu_cache is not None and pdu:
+ self._get_pdu_cache[event_id] = pdu
+
+ defer.returnValue(pdu)
+
+ @defer.inlineCallbacks
+ @log_function
+ def get_state_for_room(self, destination, room_id, event_id):
+ """Requests all of the `current` state PDUs for a given room from
+ a remote home server.
+
+ Args:
+ destination (str): The remote homeserver to query for the state.
+ room_id (str): The id of the room we're interested in.
+ event_id (str): The id of the event we want the state at.
+
+ Returns:
+ Deferred: Results in a list of PDUs.
+ """
+
+ result = yield self.transport_layer.get_room_state(
+ destination, room_id, event_id=event_id,
+ )
+
+ pdus = [
+ self.event_from_pdu_json(p, outlier=True) for p in result["pdus"]
+ ]
+
+ auth_chain = [
+ self.event_from_pdu_json(p, outlier=True)
+ for p in result.get("auth_chain", [])
+ ]
+
+ signed_pdus = yield self._check_sigs_and_hash_and_fetch(
+ destination, pdus, outlier=True
+ )
+
+ signed_auth = yield self._check_sigs_and_hash_and_fetch(
+ destination, auth_chain, outlier=True
+ )
+
+ signed_auth.sort(key=lambda e: e.depth)
+
+ defer.returnValue((signed_pdus, signed_auth))
+
+ @defer.inlineCallbacks
+ @log_function
+ def get_event_auth(self, destination, room_id, event_id):
+ res = yield self.transport_layer.get_event_auth(
+ destination, room_id, event_id,
+ )
+
+ auth_chain = [
+ self.event_from_pdu_json(p, outlier=True)
+ for p in res["auth_chain"]
+ ]
+
+ signed_auth = yield self._check_sigs_and_hash_and_fetch(
+ destination, auth_chain, outlier=True
+ )
+
+ signed_auth.sort(key=lambda e: e.depth)
+
+ defer.returnValue(signed_auth)
+
+ @defer.inlineCallbacks
+ def make_membership_event(self, destinations, room_id, user_id, membership,
+ content={},):
+ """
+ Creates an m.room.member event, with context, without participating in the room.
+
+ Does so by asking one of the already participating servers to create an
+ event with proper context.
+
+ Note that this does not append any events to any graphs.
+
+ Args:
+ destinations (str): Candidate homeservers which are probably
+ participating in the room.
+ room_id (str): The room in which the event will happen.
+ user_id (str): The user whose membership is being evented.
+ membership (str): The "membership" property of the event. Must be
+ one of "join" or "leave".
+ content (object): Any additional data to put into the content field
+ of the event.
+ Return:
+ A tuple of (origin (str), event (object)) where origin is the remote
+ homeserver which generated the event.
+ """
+ valid_memberships = {Membership.JOIN, Membership.LEAVE}
+ if membership not in valid_memberships:
+ raise RuntimeError(
+ "make_membership_event called with membership='%s', must be one of %s" %
+ (membership, ",".join(valid_memberships))
+ )
+ for destination in destinations:
+ if destination == self.server_name:
+ continue
+
+ try:
+ ret = yield self.transport_layer.make_membership_event(
+ destination, room_id, user_id, membership
+ )
+
+ pdu_dict = ret["event"]
+
+ logger.debug("Got response to make_%s: %s", membership, pdu_dict)
+
+ pdu_dict["content"].update(content)
+
+ # The protoevent received over the JSON wire may not have all
+ # the required fields. Lets just gloss over that because
+ # there's some we never care about
+ if "prev_state" not in pdu_dict:
+ pdu_dict["prev_state"] = []
+
+ defer.returnValue(
+ (destination, self.event_from_pdu_json(pdu_dict))
+ )
+ break
+ except CodeMessageException:
+ raise
+ except Exception as e:
+ logger.warn(
+ "Failed to make_%s via %s: %s",
+ membership, destination, e.message
+ )
+
+ raise RuntimeError("Failed to send to any server.")
+
+ @defer.inlineCallbacks
+ def send_join(self, destinations, pdu):
+ for destination in destinations:
+ if destination == self.server_name:
+ continue
+
+ try:
+ time_now = self._clock.time_msec()
+ _, content = yield self.transport_layer.send_join(
+ destination=destination,
+ room_id=pdu.room_id,
+ event_id=pdu.event_id,
+ content=pdu.get_pdu_json(time_now),
+ )
+
+ logger.debug("Got content: %s", content)
+
+ state = [
+ self.event_from_pdu_json(p, outlier=True)
+ for p in content.get("state", [])
+ ]
+
+ auth_chain = [
+ self.event_from_pdu_json(p, outlier=True)
+ for p in content.get("auth_chain", [])
+ ]
+
+ pdus = {
+ p.event_id: p
+ for p in itertools.chain(state, auth_chain)
+ }
+
+ valid_pdus = yield self._check_sigs_and_hash_and_fetch(
+ destination, pdus.values(),
+ outlier=True,
+ )
+
+ valid_pdus_map = {
+ p.event_id: p
+ for p in valid_pdus
+ }
+
+ # NB: We *need* to copy to ensure that we don't have multiple
+ # references being passed on, as that causes... issues.
+ signed_state = [
+ copy.copy(valid_pdus_map[p.event_id])
+ for p in state
+ if p.event_id in valid_pdus_map
+ ]
+
+ signed_auth = [
+ valid_pdus_map[p.event_id]
+ for p in auth_chain
+ if p.event_id in valid_pdus_map
+ ]
+
+ # NB: We *need* to copy to ensure that we don't have multiple
+ # references being passed on, as that causes... issues.
+ for s in signed_state:
+ s.internal_metadata = copy.deepcopy(s.internal_metadata)
+
+ auth_chain.sort(key=lambda e: e.depth)
+
+ defer.returnValue({
+ "state": signed_state,
+ "auth_chain": signed_auth,
+ "origin": destination,
+ })
+ except CodeMessageException:
+ raise
+ except Exception as e:
+ logger.exception(
+ "Failed to send_join via %s: %s",
+ destination, e.message
+ )
+
+ raise RuntimeError("Failed to send to any server.")
+
+ @defer.inlineCallbacks
+ def send_invite(self, destination, room_id, event_id, pdu):
+ time_now = self._clock.time_msec()
+ code, content = yield self.transport_layer.send_invite(
+ destination=destination,
+ room_id=room_id,
+ event_id=event_id,
+ content=pdu.get_pdu_json(time_now),
+ )
+
+ pdu_dict = content["event"]
+
+ logger.debug("Got response to send_invite: %s", pdu_dict)
+
+ pdu = self.event_from_pdu_json(pdu_dict)
+
+ # Check signatures are correct.
+ pdu = yield self._check_sigs_and_hash(pdu)
+
+ # FIXME: We should handle signature failures more gracefully.
+
+ defer.returnValue(pdu)
+
+ @defer.inlineCallbacks
+ def send_leave(self, destinations, pdu):
+ for destination in destinations:
+ if destination == self.server_name:
+ continue
+
+ try:
+ time_now = self._clock.time_msec()
+ _, content = yield self.transport_layer.send_leave(
+ destination=destination,
+ room_id=pdu.room_id,
+ event_id=pdu.event_id,
+ content=pdu.get_pdu_json(time_now),
+ )
+
+ logger.debug("Got content: %s", content)
+ defer.returnValue(None)
+ except CodeMessageException:
+ raise
+ except Exception as e:
+ logger.exception(
+ "Failed to send_leave via %s: %s",
+ destination, e.message
+ )
+
+ raise RuntimeError("Failed to send to any server.")
+
+ @defer.inlineCallbacks
+ def query_auth(self, destination, room_id, event_id, local_auth):
+ """
+ Params:
+ destination (str)
+ event_it (str)
+ local_auth (list)
+ """
+ time_now = self._clock.time_msec()
+
+ send_content = {
+ "auth_chain": [e.get_pdu_json(time_now) for e in local_auth],
+ }
+
+ code, content = yield self.transport_layer.send_query_auth(
+ destination=destination,
+ room_id=room_id,
+ event_id=event_id,
+ content=send_content,
+ )
+
+ auth_chain = [
+ self.event_from_pdu_json(e)
+ for e in content["auth_chain"]
+ ]
+
+ signed_auth = yield self._check_sigs_and_hash_and_fetch(
+ destination, auth_chain, outlier=True
+ )
+
+ signed_auth.sort(key=lambda e: e.depth)
+
+ ret = {
+ "auth_chain": signed_auth,
+ "rejects": content.get("rejects", []),
+ "missing": content.get("missing", []),
+ }
+
+ defer.returnValue(ret)
+
+ @defer.inlineCallbacks
+ def get_missing_events(self, destination, room_id, earliest_events_ids,
+ latest_events, limit, min_depth):
+ """Tries to fetch events we are missing. This is called when we receive
+ an event without having received all of its ancestors.
+
+ Args:
+ destination (str)
+ room_id (str)
+ earliest_events_ids (list): List of event ids. Effectively the
+ events we expected to receive, but haven't. `get_missing_events`
+ should only return events that didn't happen before these.
+ latest_events (list): List of events we have received that we don't
+ have all previous events for.
+ limit (int): Maximum number of events to return.
+ min_depth (int): Minimum depth of events tor return.
+ """
+ try:
+ content = yield self.transport_layer.get_missing_events(
+ destination=destination,
+ room_id=room_id,
+ earliest_events=earliest_events_ids,
+ latest_events=[e.event_id for e in latest_events],
+ limit=limit,
+ min_depth=min_depth,
+ )
+
+ events = [
+ self.event_from_pdu_json(e)
+ for e in content.get("events", [])
+ ]
+
+ signed_events = yield self._check_sigs_and_hash_and_fetch(
+ destination, events, outlier=False
+ )
+
+ have_gotten_all_from_destination = True
+ except HttpResponseException as e:
+ if not e.code == 400:
+ raise
+
+ # We are probably hitting an old server that doesn't support
+ # get_missing_events
+ signed_events = []
+ have_gotten_all_from_destination = False
+
+ if len(signed_events) >= limit:
+ defer.returnValue(signed_events)
+
+ servers = yield self.store.get_joined_hosts_for_room(room_id)
+
+ servers = set(servers)
+ servers.discard(self.server_name)
+
+ failed_to_fetch = set()
+
+ while len(signed_events) < limit:
+ # Are we missing any?
+
+ seen_events = set(earliest_events_ids)
+ seen_events.update(e.event_id for e in signed_events if e)
+
+ missing_events = {}
+ for e in itertools.chain(latest_events, signed_events):
+ if e.depth > min_depth:
+ missing_events.update({
+ e_id: e.depth for e_id, _ in e.prev_events
+ if e_id not in seen_events
+ and e_id not in failed_to_fetch
+ })
+
+ if not missing_events:
+ break
+
+ have_seen = yield self.store.have_events(missing_events)
+
+ for k in have_seen:
+ missing_events.pop(k, None)
+
+ if not missing_events:
+ break
+
+ # Okay, we haven't gotten everything yet. Lets get them.
+ ordered_missing = sorted(missing_events.items(), key=lambda x: x[0])
+
+ if have_gotten_all_from_destination:
+ servers.discard(destination)
+
+ def random_server_list():
+ srvs = list(servers)
+ random.shuffle(srvs)
+ return srvs
+
+ deferreds = [
+ self.get_pdu(
+ destinations=random_server_list(),
+ event_id=e_id,
+ )
+ for e_id, depth in ordered_missing[:limit - len(signed_events)]
+ ]
+
+ res = yield defer.DeferredList(deferreds, consumeErrors=True)
+ for (result, val), (e_id, _) in zip(res, ordered_missing):
+ if result and val:
+ signed_events.append(val)
+ else:
+ failed_to_fetch.add(e_id)
+
+ defer.returnValue(signed_events)
+
+ def event_from_pdu_json(self, pdu_json, outlier=False):
+ event = FrozenEvent(
+ pdu_json
+ )
+
+ event.internal_metadata.outlier = outlier
+
+ return event
+
+ @defer.inlineCallbacks
+ def forward_third_party_invite(self, destinations, room_id, event_dict):
+ for destination in destinations:
+ if destination == self.server_name:
+ continue
+
+ try:
+ yield self.transport_layer.exchange_third_party_invite(
+ destination=destination,
+ room_id=room_id,
+ event_dict=event_dict,
+ )
+ defer.returnValue(None)
+ except CodeMessageException:
+ raise
+ except Exception as e:
+ logger.exception(
+ "Failed to send_third_party_invite via %s: %s",
+ destination, e.message
+ )
+
+ raise RuntimeError("Failed to send to any server.")