summaryrefslogtreecommitdiff
path: root/synapse/notifier.py
diff options
context:
space:
mode:
authorErik Johnston <erikj@matrix.org>2016-12-16 10:43:39 +0000
committerErik Johnston <erikj@matrix.org>2016-12-16 10:43:39 +0000
commite25685dbda5a91922a7ecee0dce89db1ea1662a9 (patch)
treefce7147d9b4422f76b5cec8b53312bb34932d84f /synapse/notifier.py
parent4217a384316b04d4c2eed7814ca4defb62af8703 (diff)
Imported Upstream version 0.18.5
Diffstat (limited to 'synapse/notifier.py')
-rw-r--r--synapse/notifier.py49
1 files changed, 32 insertions, 17 deletions
diff --git a/synapse/notifier.py b/synapse/notifier.py
index 48653ae8..acbd4bb5 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -17,6 +17,7 @@ from twisted.internet import defer
from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import AuthError
+from synapse.util import DeferredTimedOutError
from synapse.util.logutils import log_function
from synapse.util.async import ObservableDeferred
from synapse.util.logcontext import PreserveLoggingContext, preserve_fn
@@ -143,6 +144,12 @@ class Notifier(object):
self.clock = hs.get_clock()
self.appservice_handler = hs.get_application_service_handler()
+
+ if hs.should_send_federation():
+ self.federation_sender = hs.get_federation_sender()
+ else:
+ self.federation_sender = None
+
self.state_handler = hs.get_state_handler()
self.clock.looping_call(
@@ -220,6 +227,9 @@ class Notifier(object):
# poke any interested application service.
self.appservice_handler.notify_interested_services(room_stream_id)
+ if self.federation_sender:
+ self.federation_sender.notify_new_events(room_stream_id)
+
if event.type == EventTypes.Member and event.membership == Membership.JOIN:
self._user_joined_room(event.state_key, event.room_id)
@@ -285,14 +295,7 @@ class Notifier(object):
result = None
if timeout:
- # Will be set to a _NotificationListener that we'll be waiting on.
- # Allows us to cancel it.
- listener = None
-
- def timed_out():
- if listener:
- listener.deferred.cancel()
- timer = self.clock.call_later(timeout / 1000., timed_out)
+ end_time = self.clock.time_msec() + timeout
prev_token = from_token
while not result:
@@ -303,6 +306,10 @@ class Notifier(object):
if result:
break
+ now = self.clock.time_msec()
+ if end_time <= now:
+ break
+
# Now we wait for the _NotifierUserStream to be told there
# is a new token.
# We need to supply the token we supplied to callback so
@@ -310,11 +317,14 @@ class Notifier(object):
prev_token = current_token
listener = user_stream.new_listener(prev_token)
with PreserveLoggingContext():
- yield listener.deferred
+ yield self.clock.time_bound_deferred(
+ listener.deferred,
+ time_out=(end_time - now) / 1000.
+ )
+ except DeferredTimedOutError:
+ break
except defer.CancelledError:
break
-
- self.clock.cancel_call_later(timer, ignore_errs=True)
else:
current_token = user_stream.current_token
result = yield callback(from_token, current_token)
@@ -483,22 +493,27 @@ class Notifier(object):
"""
listener = _NotificationListener(None)
- def timed_out():
- listener.deferred.cancel()
+ end_time = self.clock.time_msec() + timeout
- timer = self.clock.call_later(timeout / 1000., timed_out)
while True:
listener.deferred = self.replication_deferred.observe()
result = yield callback()
if result:
break
+ now = self.clock.time_msec()
+ if end_time <= now:
+ break
+
try:
with PreserveLoggingContext():
- yield listener.deferred
+ yield self.clock.time_bound_deferred(
+ listener.deferred,
+ time_out=(end_time - now) / 1000.
+ )
+ except DeferredTimedOutError:
+ break
except defer.CancelledError:
break
- self.clock.cancel_call_later(timer, ignore_errs=True)
-
defer.returnValue(result)