summaryrefslogtreecommitdiff
path: root/synapse/handlers/presence.py
diff options
context:
space:
mode:
authorAndrej Shadura <andrewsh@debian.org>2019-07-18 20:29:37 -0300
committerAndrej Shadura <andrewsh@debian.org>2019-07-18 20:29:37 -0300
commit2c2556601d5da4ffb4205200d95e77439dc5f560 (patch)
tree80ce469d12fb01d9bae03a0d7ce282a6444d2ecc /synapse/handlers/presence.py
parent5aefda23f94b2bb2edadc20b6f596cced667b5de (diff)
New upstream version 1.1.0
Diffstat (limited to 'synapse/handlers/presence.py')
-rw-r--r--synapse/handlers/presence.py332
1 files changed, 177 insertions, 155 deletions
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 557fb5f8..c80dc2eb 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -50,16 +50,20 @@ logger = logging.getLogger(__name__)
notified_presence_counter = Counter("synapse_handler_presence_notified_presence", "")
federation_presence_out_counter = Counter(
- "synapse_handler_presence_federation_presence_out", "")
+ "synapse_handler_presence_federation_presence_out", ""
+)
presence_updates_counter = Counter("synapse_handler_presence_presence_updates", "")
timers_fired_counter = Counter("synapse_handler_presence_timers_fired", "")
-federation_presence_counter = Counter("synapse_handler_presence_federation_presence", "")
+federation_presence_counter = Counter(
+ "synapse_handler_presence_federation_presence", ""
+)
bump_active_time_counter = Counter("synapse_handler_presence_bump_active_time", "")
get_updates_counter = Counter("synapse_handler_presence_get_updates", "", ["type"])
notify_reason_counter = Counter(
- "synapse_handler_presence_notify_reason", "", ["reason"])
+ "synapse_handler_presence_notify_reason", "", ["reason"]
+)
state_transition_counter = Counter(
"synapse_handler_presence_state_transition", "", ["from", "to"]
)
@@ -90,7 +94,6 @@ assert LAST_ACTIVE_GRANULARITY < IDLE_TIMER
class PresenceHandler(object):
-
def __init__(self, hs):
"""
@@ -110,31 +113,26 @@ class PresenceHandler(object):
federation_registry = hs.get_federation_registry()
- federation_registry.register_edu_handler(
- "m.presence", self.incoming_presence
- )
+ federation_registry.register_edu_handler("m.presence", self.incoming_presence)
active_presence = self.store.take_presence_startup_info()
# A dictionary of the current state of users. This is prefilled with
# non-offline presence from the DB. We should fetch from the DB if
# we can't find a users presence in here.
- self.user_to_current_state = {
- state.user_id: state
- for state in active_presence
- }
+ self.user_to_current_state = {state.user_id: state for state in active_presence}
LaterGauge(
- "synapse_handlers_presence_user_to_current_state_size", "", [],
- lambda: len(self.user_to_current_state)
+ "synapse_handlers_presence_user_to_current_state_size",
+ "",
+ [],
+ lambda: len(self.user_to_current_state),
)
now = self.clock.time_msec()
for state in active_presence:
self.wheel_timer.insert(
- now=now,
- obj=state.user_id,
- then=state.last_active_ts + IDLE_TIMER,
+ now=now, obj=state.user_id, then=state.last_active_ts + IDLE_TIMER
)
self.wheel_timer.insert(
now=now,
@@ -193,27 +191,21 @@ class PresenceHandler(object):
"handle_presence_timeouts", self._handle_timeouts
)
- self.clock.call_later(
- 30,
- self.clock.looping_call,
- run_timeout_handler,
- 5000,
- )
+ self.clock.call_later(30, self.clock.looping_call, run_timeout_handler, 5000)
def run_persister():
return run_as_background_process(
"persist_presence_changes", self._persist_unpersisted_changes
)
- self.clock.call_later(
- 60,
- self.clock.looping_call,
- run_persister,
- 60 * 1000,
- )
+ self.clock.call_later(60, self.clock.looping_call, run_persister, 60 * 1000)
- LaterGauge("synapse_handlers_presence_wheel_timer_size", "", [],
- lambda: len(self.wheel_timer))
+ LaterGauge(
+ "synapse_handlers_presence_wheel_timer_size",
+ "",
+ [],
+ lambda: len(self.wheel_timer),
+ )
# Used to handle sending of presence to newly joined users/servers
if hs.config.use_presence:
@@ -241,15 +233,17 @@ class PresenceHandler(object):
logger.info(
"Performing _on_shutdown. Persisting %d unpersisted changes",
- len(self.user_to_current_state)
+ len(self.user_to_current_state),
)
if self.unpersisted_users_changes:
- yield self.store.update_presence([
- self.user_to_current_state[user_id]
- for user_id in self.unpersisted_users_changes
- ])
+ yield self.store.update_presence(
+ [
+ self.user_to_current_state[user_id]
+ for user_id in self.unpersisted_users_changes
+ ]
+ )
logger.info("Finished _on_shutdown")
@defer.inlineCallbacks
@@ -261,13 +255,10 @@ class PresenceHandler(object):
self.unpersisted_users_changes = set()
if unpersisted:
- logger.info(
- "Persisting %d upersisted presence updates", len(unpersisted)
+ logger.info("Persisting %d upersisted presence updates", len(unpersisted))
+ yield self.store.update_presence(
+ [self.user_to_current_state[user_id] for user_id in unpersisted]
)
- yield self.store.update_presence([
- self.user_to_current_state[user_id]
- for user_id in unpersisted
- ])
@defer.inlineCallbacks
def _update_states(self, new_states):
@@ -303,10 +294,11 @@ class PresenceHandler(object):
)
new_state, should_notify, should_ping = handle_update(
- prev_state, new_state,
+ prev_state,
+ new_state,
is_mine=self.is_mine_id(user_id),
wheel_timer=self.wheel_timer,
- now=now
+ now=now,
)
self.user_to_current_state[user_id] = new_state
@@ -328,7 +320,8 @@ class PresenceHandler(object):
self.unpersisted_users_changes -= set(to_notify.keys())
to_federation_ping = {
- user_id: state for user_id, state in to_federation_ping.items()
+ user_id: state
+ for user_id, state in to_federation_ping.items()
if user_id not in to_notify
}
if to_federation_ping:
@@ -351,8 +344,8 @@ class PresenceHandler(object):
# Check whether the lists of syncing processes from an external
# process have expired.
expired_process_ids = [
- process_id for process_id, last_update
- in self.external_process_last_updated_ms.items()
+ process_id
+ for process_id, last_update in self.external_process_last_updated_ms.items()
if now - last_update > EXTERNAL_PROCESS_EXPIRY
]
for process_id in expired_process_ids:
@@ -362,9 +355,7 @@ class PresenceHandler(object):
self.external_process_last_update.pop(process_id)
states = [
- self.user_to_current_state.get(
- user_id, UserPresenceState.default(user_id)
- )
+ self.user_to_current_state.get(user_id, UserPresenceState.default(user_id))
for user_id in users_to_check
]
@@ -394,9 +385,7 @@ class PresenceHandler(object):
prev_state = yield self.current_state_for_user(user_id)
- new_fields = {
- "last_active_ts": self.clock.time_msec(),
- }
+ new_fields = {"last_active_ts": self.clock.time_msec()}
if prev_state.state == PresenceState.UNAVAILABLE:
new_fields["state"] = PresenceState.ONLINE
@@ -430,15 +419,23 @@ class PresenceHandler(object):
if prev_state.state == PresenceState.OFFLINE:
# If they're currently offline then bring them online, otherwise
# just update the last sync times.
- yield self._update_states([prev_state.copy_and_replace(
- state=PresenceState.ONLINE,
- last_active_ts=self.clock.time_msec(),
- last_user_sync_ts=self.clock.time_msec(),
- )])
+ yield self._update_states(
+ [
+ prev_state.copy_and_replace(
+ state=PresenceState.ONLINE,
+ last_active_ts=self.clock.time_msec(),
+ last_user_sync_ts=self.clock.time_msec(),
+ )
+ ]
+ )
else:
- yield self._update_states([prev_state.copy_and_replace(
- last_user_sync_ts=self.clock.time_msec(),
- )])
+ yield self._update_states(
+ [
+ prev_state.copy_and_replace(
+ last_user_sync_ts=self.clock.time_msec()
+ )
+ ]
+ )
@defer.inlineCallbacks
def _end():
@@ -446,9 +443,13 @@ class PresenceHandler(object):
self.user_to_num_current_syncs[user_id] -= 1
prev_state = yield self.current_state_for_user(user_id)
- yield self._update_states([prev_state.copy_and_replace(
- last_user_sync_ts=self.clock.time_msec(),
- )])
+ yield self._update_states(
+ [
+ prev_state.copy_and_replace(
+ last_user_sync_ts=self.clock.time_msec()
+ )
+ ]
+ )
except Exception:
logger.exception("Error updating presence after sync")
@@ -469,7 +470,8 @@ class PresenceHandler(object):
"""
if self.hs.config.use_presence:
syncing_user_ids = {
- user_id for user_id, count in self.user_to_num_current_syncs.items()
+ user_id
+ for user_id, count in self.user_to_num_current_syncs.items()
if count
}
for user_ids in self.external_process_to_current_syncs.values():
@@ -479,7 +481,9 @@ class PresenceHandler(object):
return set()
@defer.inlineCallbacks
- def update_external_syncs_row(self, process_id, user_id, is_syncing, sync_time_msec):
+ def update_external_syncs_row(
+ self, process_id, user_id, is_syncing, sync_time_msec
+ ):
"""Update the syncing users for an external process as a delta.
Args:
@@ -500,20 +504,22 @@ class PresenceHandler(object):
updates = []
if is_syncing and user_id not in process_presence:
if prev_state.state == PresenceState.OFFLINE:
- updates.append(prev_state.copy_and_replace(
- state=PresenceState.ONLINE,
- last_active_ts=sync_time_msec,
- last_user_sync_ts=sync_time_msec,
- ))
+ updates.append(
+ prev_state.copy_and_replace(
+ state=PresenceState.ONLINE,
+ last_active_ts=sync_time_msec,
+ last_user_sync_ts=sync_time_msec,
+ )
+ )
else:
- updates.append(prev_state.copy_and_replace(
- last_user_sync_ts=sync_time_msec,
- ))
+ updates.append(
+ prev_state.copy_and_replace(last_user_sync_ts=sync_time_msec)
+ )
process_presence.add(user_id)
elif user_id in process_presence:
- updates.append(prev_state.copy_and_replace(
- last_user_sync_ts=sync_time_msec,
- ))
+ updates.append(
+ prev_state.copy_and_replace(last_user_sync_ts=sync_time_msec)
+ )
if not is_syncing:
process_presence.discard(user_id)
@@ -537,12 +543,12 @@ class PresenceHandler(object):
prev_states = yield self.current_state_for_users(process_presence)
time_now_ms = self.clock.time_msec()
- yield self._update_states([
- prev_state.copy_and_replace(
- last_user_sync_ts=time_now_ms,
- )
- for prev_state in itervalues(prev_states)
- ])
+ yield self._update_states(
+ [
+ prev_state.copy_and_replace(last_user_sync_ts=time_now_ms)
+ for prev_state in itervalues(prev_states)
+ ]
+ )
self.external_process_last_updated_ms.pop(process_id, None)
@defer.inlineCallbacks
@@ -574,8 +580,7 @@ class PresenceHandler(object):
missing = [user_id for user_id, state in iteritems(states) if not state]
if missing:
new = {
- user_id: UserPresenceState.default(user_id)
- for user_id in missing
+ user_id: UserPresenceState.default(user_id) for user_id in missing
}
states.update(new)
self.user_to_current_state.update(new)
@@ -593,8 +598,10 @@ class PresenceHandler(object):
room_ids_to_states, users_to_states = parties
self.notifier.on_new_event(
- "presence_key", stream_id, rooms=room_ids_to_states.keys(),
- users=[UserID.from_string(u) for u in users_to_states]
+ "presence_key",
+ stream_id,
+ rooms=room_ids_to_states.keys(),
+ users=[UserID.from_string(u) for u in users_to_states],
)
self._push_to_remotes(states)
@@ -605,8 +612,10 @@ class PresenceHandler(object):
room_ids_to_states, users_to_states = parties
self.notifier.on_new_event(
- "presence_key", stream_id, rooms=room_ids_to_states.keys(),
- users=[UserID.from_string(u) for u in users_to_states]
+ "presence_key",
+ stream_id,
+ rooms=room_ids_to_states.keys(),
+ users=[UserID.from_string(u) for u in users_to_states],
)
def _push_to_remotes(self, states):
@@ -631,15 +640,15 @@ class PresenceHandler(object):
user_id = push.get("user_id", None)
if not user_id:
logger.info(
- "Got presence update from %r with no 'user_id': %r",
- origin, push,
+ "Got presence update from %r with no 'user_id': %r", origin, push
)
continue
if get_domain_from_id(user_id) != origin:
logger.info(
"Got presence update from %r with bad 'user_id': %r",
- origin, user_id,
+ origin,
+ user_id,
)
continue
@@ -647,14 +656,12 @@ class PresenceHandler(object):
if not presence_state:
logger.info(
"Got presence update from %r with no 'presence_state': %r",
- origin, push,
+ origin,
+ push,
)
continue
- new_fields = {
- "state": presence_state,
- "last_federation_update_ts": now,
- }
+ new_fields = {"state": presence_state, "last_federation_update_ts": now}
last_active_ago = push.get("last_active_ago", None)
if last_active_ago is not None:
@@ -672,10 +679,7 @@ class PresenceHandler(object):
@defer.inlineCallbacks
def get_state(self, target_user, as_event=False):
- results = yield self.get_states(
- [target_user.to_string()],
- as_event=as_event,
- )
+ results = yield self.get_states([target_user.to_string()], as_event=as_event)
defer.returnValue(results[0])
@@ -699,13 +703,15 @@ class PresenceHandler(object):
now = self.clock.time_msec()
if as_event:
- defer.returnValue([
- {
- "type": "m.presence",
- "content": format_user_presence_state(state, now),
- }
- for state in updates
- ])
+ defer.returnValue(
+ [
+ {
+ "type": "m.presence",
+ "content": format_user_presence_state(state, now),
+ }
+ for state in updates
+ ]
+ )
else:
defer.returnValue(updates)
@@ -717,7 +723,9 @@ class PresenceHandler(object):
presence = state["presence"]
valid_presence = (
- PresenceState.ONLINE, PresenceState.UNAVAILABLE, PresenceState.OFFLINE
+ PresenceState.ONLINE,
+ PresenceState.UNAVAILABLE,
+ PresenceState.OFFLINE,
)
if presence not in valid_presence:
raise SynapseError(400, "Invalid presence state")
@@ -726,9 +734,7 @@ class PresenceHandler(object):
prev_state = yield self.current_state_for_user(user_id)
- new_fields = {
- "state": presence
- }
+ new_fields = {"state": presence}
if not ignore_status_msg:
msg = status_msg if presence != PresenceState.OFFLINE else None
@@ -877,8 +883,7 @@ class PresenceHandler(object):
hosts = set(host for host in hosts if host != self.server_name)
self.federation.send_presence_to_destinations(
- states=[state],
- destinations=hosts,
+ states=[state], destinations=hosts
)
else:
# A remote user has joined the room, so we need to:
@@ -904,7 +909,8 @@ class PresenceHandler(object):
# default state.
now = self.clock.time_msec()
states = [
- state for state in states.values()
+ state
+ for state in states.values()
if state.state != PresenceState.OFFLINE
or now - state.last_active_ts < 7 * 24 * 60 * 60 * 1000
or state.status_msg is not None
@@ -912,8 +918,7 @@ class PresenceHandler(object):
if states:
self.federation.send_presence_to_destinations(
- states=states,
- destinations=[get_domain_from_id(user_id)],
+ states=states, destinations=[get_domain_from_id(user_id)]
)
@@ -937,7 +942,10 @@ def should_notify(old_state, new_state):
notify_reason_counter.labels("current_active_change").inc()
return True
- if new_state.last_active_ts - old_state.last_active_ts > LAST_ACTIVE_GRANULARITY:
+ if (
+ new_state.last_active_ts - old_state.last_active_ts
+ > LAST_ACTIVE_GRANULARITY
+ ):
# Only notify about last active bumps if we're not currently acive
if not new_state.currently_active:
notify_reason_counter.labels("last_active_change_online").inc()
@@ -958,9 +966,7 @@ def format_user_presence_state(state, now, include_user_id=True):
The "user_id" is optional so that this function can be used to format presence
updates for client /sync responses and for federation /send requests.
"""
- content = {
- "presence": state.state,
- }
+ content = {"presence": state.state}
if include_user_id:
content["user_id"] = state.user_id
if state.last_active_ts:
@@ -986,8 +992,15 @@ class PresenceEventSource(object):
@defer.inlineCallbacks
@log_function
- def get_new_events(self, user, from_key, room_ids=None, include_offline=True,
- explicit_room_id=None, **kwargs):
+ def get_new_events(
+ self,
+ user,
+ from_key,
+ room_ids=None,
+ include_offline=True,
+ explicit_room_id=None,
+ **kwargs
+ ):
# The process for getting presence events are:
# 1. Get the rooms the user is in.
# 2. Get the list of user in the rooms.
@@ -1004,11 +1017,28 @@ class PresenceEventSource(object):
if from_key is not None:
from_key = int(from_key)
+ max_token = self.store.get_current_presence_token()
+ if from_key == max_token:
+ # This is necessary as due to the way stream ID generators work
+ # we may get updates that have a stream ID greater than the max
+ # token (e.g. max_token is N but stream generator may return
+ # results for N+2, due to N+1 not having finished being
+ # persisted yet).
+ #
+ # This is usually fine, as it just means that we may send down
+ # some presence updates multiple times. However, we need to be
+ # careful that the sync stream either actually does make some
+ # progress or doesn't return, otherwise clients will end up
+ # tight looping calling /sync due to it immediately returning
+ # the same token repeatedly.
+ #
+ # Hence this guard where we just return nothing so that the sync
+ # doesn't return. C.f. #5503.
+ defer.returnValue(([], max_token))
+
presence = self.get_presence_handler()
stream_change_cache = self.store.presence_stream_cache
- max_token = self.store.get_current_presence_token()
-
users_interested_in = yield self._get_interested_in(user, explicit_room_id)
user_ids_changed = set()
@@ -1030,7 +1060,7 @@ class PresenceEventSource(object):
if from_key:
user_ids_changed = stream_change_cache.get_entities_changed(
- users_interested_in, from_key,
+ users_interested_in, from_key
)
else:
user_ids_changed = users_interested_in
@@ -1040,10 +1070,16 @@ class PresenceEventSource(object):
if include_offline:
defer.returnValue((list(updates.values()), max_token))
else:
- defer.returnValue(([
- s for s in itervalues(updates)
- if s.state != PresenceState.OFFLINE
- ], max_token))
+ defer.returnValue(
+ (
+ [
+ s
+ for s in itervalues(updates)
+ if s.state != PresenceState.OFFLINE
+ ],
+ max_token,
+ )
+ )
def get_current_key(self):
return self.store.get_current_presence_token()
@@ -1061,13 +1097,13 @@ class PresenceEventSource(object):
users_interested_in.add(user_id) # So that we receive our own presence
users_who_share_room = yield self.store.get_users_who_share_room_with_user(
- user_id, on_invalidate=cache_context.invalidate,
+ user_id, on_invalidate=cache_context.invalidate
)
users_interested_in.update(users_who_share_room)
if explicit_room_id:
user_ids = yield self.store.get_users_in_room(
- explicit_room_id, on_invalidate=cache_context.invalidate,
+ explicit_room_id, on_invalidate=cache_context.invalidate
)
users_interested_in.update(user_ids)
@@ -1123,9 +1159,7 @@ def handle_timeout(state, is_mine, syncing_user_ids, now):
if now - state.last_active_ts > IDLE_TIMER:
# Currently online, but last activity ages ago so auto
# idle
- state = state.copy_and_replace(
- state=PresenceState.UNAVAILABLE,
- )
+ state = state.copy_and_replace(state=PresenceState.UNAVAILABLE)
changed = True
elif now - state.last_active_ts > LAST_ACTIVE_GRANULARITY:
# So that we send down a notification that we've
@@ -1145,8 +1179,7 @@ def handle_timeout(state, is_mine, syncing_user_ids, now):
sync_or_active = max(state.last_user_sync_ts, state.last_active_ts)
if now - sync_or_active > SYNC_ONLINE_TIMEOUT:
state = state.copy_and_replace(
- state=PresenceState.OFFLINE,
- status_msg=None,
+ state=PresenceState.OFFLINE, status_msg=None
)
changed = True
else:
@@ -1155,10 +1188,7 @@ def handle_timeout(state, is_mine, syncing_user_ids, now):
# no one gets stuck online forever.
if now - state.last_federation_update_ts > FEDERATION_TIMEOUT:
# The other side seems to have disappeared.
- state = state.copy_and_replace(
- state=PresenceState.OFFLINE,
- status_msg=None,
- )
+ state = state.copy_and_replace(state=PresenceState.OFFLINE, status_msg=None)
changed = True
return state if changed else None
@@ -1193,21 +1223,17 @@ def handle_update(prev_state, new_state, is_mine, wheel_timer, now):
if new_state.state == PresenceState.ONLINE:
# Idle timer
wheel_timer.insert(
- now=now,
- obj=user_id,
- then=new_state.last_active_ts + IDLE_TIMER
+ now=now, obj=user_id, then=new_state.last_active_ts + IDLE_TIMER
)
active = now - new_state.last_active_ts < LAST_ACTIVE_GRANULARITY
- new_state = new_state.copy_and_replace(
- currently_active=active,
- )
+ new_state = new_state.copy_and_replace(currently_active=active)
if active:
wheel_timer.insert(
now=now,
obj=user_id,
- then=new_state.last_active_ts + LAST_ACTIVE_GRANULARITY
+ then=new_state.last_active_ts + LAST_ACTIVE_GRANULARITY,
)
if new_state.state != PresenceState.OFFLINE:
@@ -1215,29 +1241,25 @@ def handle_update(prev_state, new_state, is_mine, wheel_timer, now):
wheel_timer.insert(
now=now,
obj=user_id,
- then=new_state.last_user_sync_ts + SYNC_ONLINE_TIMEOUT
+ then=new_state.last_user_sync_ts + SYNC_ONLINE_TIMEOUT,
)
last_federate = new_state.last_federation_update_ts
if now - last_federate > FEDERATION_PING_INTERVAL:
# Been a while since we've poked remote servers
- new_state = new_state.copy_and_replace(
- last_federation_update_ts=now,
- )
+ new_state = new_state.copy_and_replace(last_federation_update_ts=now)
federation_ping = True
else:
wheel_timer.insert(
now=now,
obj=user_id,
- then=new_state.last_federation_update_ts + FEDERATION_TIMEOUT
+ then=new_state.last_federation_update_ts + FEDERATION_TIMEOUT,
)
# Check whether the change was something worth notifying about
if should_notify(prev_state, new_state):
- new_state = new_state.copy_and_replace(
- last_federation_update_ts=now,
- )
+ new_state = new_state.copy_and_replace(last_federation_update_ts=now)
persist_and_notify = True
return new_state, persist_and_notify, federation_ping