summaryrefslogtreecommitdiff
path: root/synapse/app/synchrotron.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/app/synchrotron.py')
-rw-r--r--synapse/app/synchrotron.py29
1 files changed, 21 insertions, 8 deletions
diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py
index 66623407..0354e82b 100644
--- a/synapse/app/synchrotron.py
+++ b/synapse/app/synchrotron.py
@@ -33,7 +33,7 @@ from synapse.http.server import JsonResource
from synapse.http.site import SynapseSite
from synapse.metrics import RegistryProxy
from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
-from synapse.replication.slave.storage._base import BaseSlavedStore
+from synapse.replication.slave.storage._base import BaseSlavedStore, __func__
from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
@@ -147,7 +147,7 @@ class SynchrotronPresence(object):
and haven't come back yet. If there are poke the master about them.
"""
now = self.clock.time_msec()
- for user_id, last_sync_ms in self.users_going_offline.items():
+ for user_id, last_sync_ms in list(self.users_going_offline.items()):
if now - last_sync_ms > 10 * 1000:
self.users_going_offline.pop(user_id, None)
self.send_user_sync(user_id, False, last_sync_ms)
@@ -156,9 +156,9 @@ class SynchrotronPresence(object):
# TODO Hows this supposed to work?
pass
- get_states = PresenceHandler.get_states.__func__
- get_state = PresenceHandler.get_state.__func__
- current_state_for_users = PresenceHandler.current_state_for_users.__func__
+ get_states = __func__(PresenceHandler.get_states)
+ get_state = __func__(PresenceHandler.get_state)
+ current_state_for_users = __func__(PresenceHandler.current_state_for_users)
def user_syncing(self, user_id, affect_presence):
if affect_presence:
@@ -208,7 +208,7 @@ class SynchrotronPresence(object):
) for row in rows]
for state in states:
- self.user_to_current_state[row.user_id] = state
+ self.user_to_current_state[state.user_id] = state
stream_id = token
yield self.notify_from_replication(states, stream_id)
@@ -226,7 +226,15 @@ class SynchrotronPresence(object):
class SynchrotronTyping(object):
def __init__(self, hs):
self._latest_room_serial = 0
+ self._reset()
+
+ def _reset(self):
+ """
+ Reset the typing handler's data caches.
+ """
+ # map room IDs to serial numbers
self._room_serials = {}
+ # map room IDs to sets of users currently typing
self._room_typing = {}
def stream_positions(self):
@@ -236,6 +244,12 @@ class SynchrotronTyping(object):
return {"typing": self._latest_room_serial}
def process_replication_rows(self, token, rows):
+ if self._latest_room_serial > token:
+ # The master has gone backwards. To prevent inconsistent data, just
+ # clear everything.
+ self._reset()
+
+ # Set the latest serial token to whatever the server gave us.
self._latest_room_serial = token
for row in rows:
@@ -410,7 +424,7 @@ def start(config_options):
"Synapse synchrotron", config_options
)
except ConfigError as e:
- sys.stderr.write("\n" + e.message + "\n")
+ sys.stderr.write("\n" + str(e) + "\n")
sys.exit(1)
assert config.worker_app == "synapse.app.synchrotron"
@@ -435,7 +449,6 @@ def start(config_options):
def start():
ss.get_datastore().start_profiling()
- ss.get_state_handler().start_caching()
reactor.callWhenRunning(start)