summaryrefslogtreecommitdiff
path: root/synapse/federation/sender/per_destination_queue.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/federation/sender/per_destination_queue.py')
-rw-r--r--synapse/federation/sender/per_destination_queue.py106
1 files changed, 91 insertions, 15 deletions
diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py
index cc0d765e..89df9a61 100644
--- a/synapse/federation/sender/per_destination_queue.py
+++ b/synapse/federation/sender/per_destination_queue.py
@@ -15,7 +15,7 @@
# limitations under the License.
import datetime
import logging
-from typing import TYPE_CHECKING, Dict, Hashable, Iterable, List, Optional, Tuple, cast
+from typing import TYPE_CHECKING, Dict, Hashable, Iterable, List, Optional, Tuple
import attr
from prometheus_client import Counter
@@ -77,6 +77,7 @@ class PerDestinationQueue:
self._transaction_manager = transaction_manager
self._instance_name = hs.get_instance_name()
self._federation_shard_config = hs.config.worker.federation_shard_config
+ self._state = hs.get_state_handler()
self._should_send_on_this_instance = True
if not self._federation_shard_config.should_handle(
@@ -415,22 +416,97 @@ class PerDestinationQueue:
"This should not happen." % event_ids
)
- if logger.isEnabledFor(logging.INFO):
- rooms = [p.room_id for p in catchup_pdus]
- logger.info("Catching up rooms to %s: %r", self._destination, rooms)
+ # We send transactions with events from one room only, as its likely
+ # that the remote will have to do additional processing, which may
+ # take some time. It's better to give it small amounts of work
+ # rather than risk the request timing out and repeatedly being
+ # retried, and not making any progress.
+ #
+ # Note: `catchup_pdus` will have exactly one PDU per room.
+ for pdu in catchup_pdus:
+ # The PDU from the DB will be the last PDU in the room from
+ # *this server* that wasn't sent to the remote. However, other
+ # servers may have sent lots of events since then, and we want
+ # to try and tell the remote only about the *latest* events in
+ # the room. This is so that it doesn't get inundated by events
+ # from various parts of the DAG, which all need to be processed.
+ #
+ # Note: this does mean that in large rooms a server coming back
+ # online will get sent the same events from all the different
+ # servers, but the remote will correctly deduplicate them and
+ # handle it only once.
+
+ # Step 1, fetch the current extremities
+ extrems = await self._store.get_prev_events_for_room(pdu.room_id)
+
+ if pdu.event_id in extrems:
+ # If the event is in the extremities, then great! We can just
+ # use that without having to do further checks.
+ room_catchup_pdus = [pdu]
+ else:
+ # If not, fetch the extremities and figure out which we can
+ # send.
+ extrem_events = await self._store.get_events_as_list(extrems)
+
+ new_pdus = []
+ for p in extrem_events:
+ # We pulled this from the DB, so it'll be non-null
+ assert p.internal_metadata.stream_ordering
+
+ # Filter out events that happened before the remote went
+ # offline
+ if (
+ p.internal_metadata.stream_ordering
+ < self._last_successful_stream_ordering
+ ):
+ continue
- await self._transaction_manager.send_new_transaction(
- self._destination, catchup_pdus, []
- )
+ # Filter out events where the server is not in the room,
+ # e.g. it may have left/been kicked. *Ideally* we'd pull
+ # out the kick and send that, but it's a rare edge case
+ # so we don't bother for now (the server that sent the
+ # kick should send it out if its online).
+ hosts = await self._state.get_hosts_in_room_at_events(
+ p.room_id, [p.event_id]
+ )
+ if self._destination not in hosts:
+ continue
- sent_transactions_counter.inc()
- final_pdu = catchup_pdus[-1]
- self._last_successful_stream_ordering = cast(
- int, final_pdu.internal_metadata.stream_ordering
- )
- await self._store.set_destination_last_successful_stream_ordering(
- self._destination, self._last_successful_stream_ordering
- )
+ new_pdus.append(p)
+
+ # If we've filtered out all the extremities, fall back to
+ # sending the original event. This should ensure that the
+ # server gets at least some of missed events (especially if
+ # the other sending servers are up).
+ if new_pdus:
+ room_catchup_pdus = new_pdus
+ else:
+ room_catchup_pdus = [pdu]
+
+ logger.info(
+ "Catching up rooms to %s: %r", self._destination, pdu.room_id
+ )
+
+ await self._transaction_manager.send_new_transaction(
+ self._destination, room_catchup_pdus, []
+ )
+
+ sent_transactions_counter.inc()
+
+ # We pulled this from the DB, so it'll be non-null
+ assert pdu.internal_metadata.stream_ordering
+
+ # Note that we mark the last successful stream ordering as that
+ # from the *original* PDU, rather than the PDU(s) we actually
+ # send. This is because we use it to mark our position in the
+ # queue of missed PDUs to process.
+ self._last_successful_stream_ordering = (
+ pdu.internal_metadata.stream_ordering
+ )
+
+ await self._store.set_destination_last_successful_stream_ordering(
+ self._destination, self._last_successful_stream_ordering
+ )
def _get_rr_edus(self, force_flush: bool) -> Iterable[Edu]:
if not self._pending_rrs: