summaryrefslogtreecommitdiff
path: root/synapse/appservice/scheduler.py
diff options
context:
space:
mode:
authorAndrej Shadura <andrewsh@debian.org>2019-09-27 17:36:45 +0200
committerAndrej Shadura <andrewsh@debian.org>2019-09-27 17:36:45 +0200
commitf11ce76d3a6a25882c0cb243fb3303b526e39210 (patch)
tree8a917f45af6469325fde5a747ae7fb1757655a07 /synapse/appservice/scheduler.py
parentdf31d372bad705a18c7c05fc174c760289731a8b (diff)
New upstream version 1.4.0~rc1
Diffstat (limited to 'synapse/appservice/scheduler.py')
-rw-r--r--synapse/appservice/scheduler.py153
1 files changed, 90 insertions, 63 deletions
diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py
index 42a350bf..9998f822 100644
--- a/synapse/appservice/scheduler.py
+++ b/synapse/appservice/scheduler.py
@@ -70,35 +70,37 @@ class ApplicationServiceScheduler(object):
self.store = hs.get_datastore()
self.as_api = hs.get_application_service_api()
- def create_recoverer(service, callback):
- return _Recoverer(self.clock, self.store, self.as_api, service, callback)
-
- self.txn_ctrl = _TransactionController(
- self.clock, self.store, self.as_api, create_recoverer
- )
+ self.txn_ctrl = _TransactionController(self.clock, self.store, self.as_api)
self.queuer = _ServiceQueuer(self.txn_ctrl, self.clock)
@defer.inlineCallbacks
def start(self):
logger.info("Starting appservice scheduler")
+
# check for any DOWN ASes and start recoverers for them.
- recoverers = yield _Recoverer.start(
- self.clock, self.store, self.as_api, self.txn_ctrl.on_recovered
+ services = yield self.store.get_appservices_by_state(
+ ApplicationServiceState.DOWN
)
- self.txn_ctrl.add_recoverers(recoverers)
+
+ for service in services:
+ self.txn_ctrl.start_recoverer(service)
def submit_event_for_as(self, service, event):
self.queuer.enqueue(service, event)
class _ServiceQueuer(object):
- """Queues events for the same application service together, sending
- transactions as soon as possible. Once a transaction is sent successfully,
- this schedules any other events in the queue to run.
+ """Queue of events waiting to be sent to appservices.
+
+ Groups events into transactions per-appservice, and sends them on to the
+ TransactionController. Makes sure that we only have one transaction in flight per
+ appservice at a given time.
"""
def __init__(self, txn_ctrl, clock):
self.queued_events = {} # dict of {service_id: [events]}
+
+ # the appservices which currently have a transaction in flight
self.requests_in_flight = set()
self.txn_ctrl = txn_ctrl
self.clock = clock
@@ -136,13 +138,29 @@ class _ServiceQueuer(object):
class _TransactionController(object):
- def __init__(self, clock, store, as_api, recoverer_fn):
+ """Transaction manager.
+
+ Builds AppServiceTransactions and runs their lifecycle. Also starts a Recoverer
+ if a transaction fails.
+
+ (Note we have only have one of these in the homeserver.)
+
+ Args:
+ clock (synapse.util.Clock):
+ store (synapse.storage.DataStore):
+ as_api (synapse.appservice.api.ApplicationServiceApi):
+ """
+
+ def __init__(self, clock, store, as_api):
self.clock = clock
self.store = store
self.as_api = as_api
- self.recoverer_fn = recoverer_fn
- # keep track of how many recoverers there are
- self.recoverers = []
+
+ # map from service id to recoverer instance
+ self.recoverers = {}
+
+ # for UTs
+ self.RECOVERER_CLASS = _Recoverer
@defer.inlineCallbacks
def send(self, service, events):
@@ -154,42 +172,45 @@ class _TransactionController(object):
if sent:
yield txn.complete(self.store)
else:
- run_in_background(self._start_recoverer, service)
+ run_in_background(self._on_txn_fail, service)
except Exception:
logger.exception("Error creating appservice transaction")
- run_in_background(self._start_recoverer, service)
+ run_in_background(self._on_txn_fail, service)
@defer.inlineCallbacks
def on_recovered(self, recoverer):
- self.recoverers.remove(recoverer)
logger.info(
"Successfully recovered application service AS ID %s", recoverer.service.id
)
+ self.recoverers.pop(recoverer.service.id)
logger.info("Remaining active recoverers: %s", len(self.recoverers))
yield self.store.set_appservice_state(
recoverer.service, ApplicationServiceState.UP
)
- def add_recoverers(self, recoverers):
- for r in recoverers:
- self.recoverers.append(r)
- if len(recoverers) > 0:
- logger.info("New active recoverers: %s", len(self.recoverers))
-
@defer.inlineCallbacks
- def _start_recoverer(self, service):
+ def _on_txn_fail(self, service):
try:
yield self.store.set_appservice_state(service, ApplicationServiceState.DOWN)
- logger.info(
- "Application service falling behind. Starting recoverer. AS ID %s",
- service.id,
- )
- recoverer = self.recoverer_fn(service, self.on_recovered)
- self.add_recoverers([recoverer])
- recoverer.recover()
+ self.start_recoverer(service)
except Exception:
logger.exception("Error starting AS recoverer")
+ def start_recoverer(self, service):
+ """Start a Recoverer for the given service
+
+ Args:
+ service (synapse.appservice.ApplicationService):
+ """
+ logger.info("Starting recoverer for AS ID %s", service.id)
+ assert service.id not in self.recoverers
+ recoverer = self.RECOVERER_CLASS(
+ self.clock, self.store, self.as_api, service, self.on_recovered
+ )
+ self.recoverers[service.id] = recoverer
+ recoverer.recover()
+ logger.info("Now %i active recoverers", len(self.recoverers))
+
@defer.inlineCallbacks
def _is_service_up(self, service):
state = yield self.store.get_appservice_state(service)
@@ -197,18 +218,17 @@ class _TransactionController(object):
class _Recoverer(object):
- @staticmethod
- @defer.inlineCallbacks
- def start(clock, store, as_api, callback):
- services = yield store.get_appservices_by_state(ApplicationServiceState.DOWN)
- recoverers = [_Recoverer(clock, store, as_api, s, callback) for s in services]
- for r in recoverers:
- logger.info(
- "Starting recoverer for AS ID %s which was marked as " "DOWN",
- r.service.id,
- )
- r.recover()
- return recoverers
+ """Manages retries and backoff for a DOWN appservice.
+
+ We have one of these for each appservice which is currently considered DOWN.
+
+ Args:
+ clock (synapse.util.Clock):
+ store (synapse.storage.DataStore):
+ as_api (synapse.appservice.api.ApplicationServiceApi):
+ service (synapse.appservice.ApplicationService): the service we are managing
+ callback (callable[_Recoverer]): called once the service recovers.
+ """
def __init__(self, clock, store, as_api, service, callback):
self.clock = clock
@@ -224,7 +244,9 @@ class _Recoverer(object):
"as-recoverer-%s" % (self.service.id,), self.retry
)
- self.clock.call_later((2 ** self.backoff_counter), _retry)
+ delay = 2 ** self.backoff_counter
+ logger.info("Scheduling retries on %s in %fs", self.service.id, delay)
+ self.clock.call_later(delay, _retry)
def _backoff(self):
# cap the backoff to be around 8.5min => (2^9) = 512 secs
@@ -234,25 +256,30 @@ class _Recoverer(object):
@defer.inlineCallbacks
def retry(self):
+ logger.info("Starting retries on %s", self.service.id)
try:
- txn = yield self.store.get_oldest_unsent_txn(self.service)
- if txn:
+ while True:
+ txn = yield self.store.get_oldest_unsent_txn(self.service)
+ if not txn:
+ # nothing left: we're done!
+ self.callback(self)
+ return
+
logger.info(
"Retrying transaction %s for AS ID %s", txn.id, txn.service.id
)
sent = yield txn.send(self.as_api)
- if sent:
- yield txn.complete(self.store)
- # reset the backoff counter and retry immediately
- self.backoff_counter = 1
- yield self.retry()
- else:
- self._backoff()
- else:
- self._set_service_recovered()
- except Exception as e:
- logger.exception(e)
- self._backoff()
-
- def _set_service_recovered(self):
- self.callback(self)
+ if not sent:
+ break
+
+ yield txn.complete(self.store)
+
+ # reset the backoff counter and then process the next transaction
+ self.backoff_counter = 1
+
+ except Exception:
+ logger.exception("Unexpected error running retries")
+
+ # we didn't manage to send all of the transactions before we got an error of
+ # some flavour: reschedule the next retry.
+ self._backoff()