Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Combine logic about not overriding BUSY presence. (#16170)
Browse files Browse the repository at this point in the history
Simplify some of the presence code by reducing duplicated code between
worker & non-worker modes.

The main change is to push some of the logic from `user_syncing` into
`set_state`. This is done by passing whether the user is setting the presence
via a `/sync` with a new `is_sync` flag to `set_state`. If this is `true` some
additional logic is performed:

* Don't override `busy` presence.
* Update the `last_user_sync_ts`.
* Never update the status message.
  • Loading branch information
clokep authored Aug 28, 2023
1 parent 501da8e commit 1bf1436
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 104 deletions.
1 change: 1 addition & 0 deletions changelog.d/16170.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Simplify presence code when using workers.
155 changes: 63 additions & 92 deletions synapse/handlers/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,15 +151,13 @@ def __init__(self, hs: "HomeServer"):

self._federation_queue = PresenceFederationQueue(hs, self)

self._busy_presence_enabled = hs.config.experimental.msc3026_enabled

self.VALID_PRESENCE: Tuple[str, ...] = (
PresenceState.ONLINE,
PresenceState.UNAVAILABLE,
PresenceState.OFFLINE,
)

if self._busy_presence_enabled:
if hs.config.experimental.msc3026_enabled:
self.VALID_PRESENCE += (PresenceState.BUSY,)

active_presence = self.store.take_presence_startup_info()
Expand Down Expand Up @@ -255,17 +253,19 @@ async def set_state(
self,
target_user: UserID,
state: JsonDict,
ignore_status_msg: bool = False,
force_notify: bool = False,
is_sync: bool = False,
) -> None:
"""Set the presence state of the user.
Args:
target_user: The ID of the user to set the presence state of.
state: The presence state as a JSON dictionary.
ignore_status_msg: True to ignore the "status_msg" field of the `state` dict.
If False, the user's current status will be updated.
force_notify: Whether to force notification of the update to clients.
is_sync: True if this update was from a sync, which results in
*not* overriding a previously set BUSY status, updating the
user's last_user_sync_ts, and ignoring the "status_msg" field of
the `state` dict.
"""

@abc.abstractmethod
Expand Down Expand Up @@ -491,23 +491,18 @@ async def user_syncing(
if not affect_presence or not self._presence_enabled:
return _NullContextManager()

prev_state = await self.current_state_for_user(user_id)
if prev_state.state != PresenceState.BUSY:
# We set state here but pass ignore_status_msg = True as we don't want to
# cause the status message to be cleared.
# Note that this causes last_active_ts to be incremented which is not
# what the spec wants: see comment in the BasePresenceHandler version
# of this function.
await self.set_state(
UserID.from_string(user_id),
{"presence": presence_state},
ignore_status_msg=True,
)
# Note that this causes last_active_ts to be incremented which is not
# what the spec wants.
await self.set_state(
UserID.from_string(user_id),
state={"presence": presence_state},
is_sync=True,
)

curr_sync = self._user_to_num_current_syncs.get(user_id, 0)
self._user_to_num_current_syncs[user_id] = curr_sync + 1

# If we went from no in flight sync to some, notify replication
# If this is the first in-flight sync, notify replication
if self._user_to_num_current_syncs[user_id] == 1:
self.mark_as_coming_online(user_id)

Expand All @@ -518,7 +513,7 @@ def _end() -> None:
if user_id in self._user_to_num_current_syncs:
self._user_to_num_current_syncs[user_id] -= 1

# If we went from one in flight sync to non, notify replication
# If there are no more in-flight syncs, notify replication
if self._user_to_num_current_syncs[user_id] == 0:
self.mark_as_going_offline(user_id)

Expand Down Expand Up @@ -598,17 +593,19 @@ async def set_state(
self,
target_user: UserID,
state: JsonDict,
ignore_status_msg: bool = False,
force_notify: bool = False,
is_sync: bool = False,
) -> None:
"""Set the presence state of the user.
Args:
target_user: The ID of the user to set the presence state of.
state: The presence state as a JSON dictionary.
ignore_status_msg: True to ignore the "status_msg" field of the `state` dict.
If False, the user's current status will be updated.
force_notify: Whether to force notification of the update to clients.
is_sync: True if this update was from a sync, which results in
*not* overriding a previously set BUSY status, updating the
user's last_user_sync_ts, and ignoring the "status_msg" field of
the `state` dict.
"""
presence = state["presence"]

Expand All @@ -626,8 +623,8 @@ async def set_state(
instance_name=self._presence_writer_instance,
user_id=user_id,
state=state,
ignore_status_msg=ignore_status_msg,
force_notify=force_notify,
is_sync=is_sync,
)

async def bump_presence_active_time(self, user: UserID) -> None:
Expand Down Expand Up @@ -992,45 +989,13 @@ async def user_syncing(
curr_sync = self.user_to_num_current_syncs.get(user_id, 0)
self.user_to_num_current_syncs[user_id] = curr_sync + 1

prev_state = await self.current_state_for_user(user_id)

# If they're busy then they don't stop being busy just by syncing,
# so just update the last sync time.
if prev_state.state != PresenceState.BUSY:
# XXX: We set_state separately here and just update the last_active_ts above
# This keeps the logic as similar as possible between the worker and single
# process modes. Using set_state will actually cause last_active_ts to be
# updated always, which is not what the spec calls for, but synapse has done
# this for... forever, I think.
await self.set_state(
UserID.from_string(user_id),
{"presence": presence_state},
ignore_status_msg=True,
)
# Retrieve the new state for the logic below. This should come from the
# in-memory cache.
prev_state = await self.current_state_for_user(user_id)

# To keep the single process behaviour consistent with worker mode, run the
# same logic as `update_external_syncs_row`, even though it looks weird.
if prev_state.state == PresenceState.OFFLINE:
await self._update_states(
[
prev_state.copy_and_replace(
state=PresenceState.ONLINE,
last_active_ts=self.clock.time_msec(),
last_user_sync_ts=self.clock.time_msec(),
)
]
)
# otherwise, set the new presence state & update the last sync time,
# but don't update last_active_ts as this isn't an indication that
# they've been active (even though it's probably been updated by
# set_state above)
else:
await self._update_states(
[prev_state.copy_and_replace(last_user_sync_ts=self.clock.time_msec())]
)
# Note that this causes last_active_ts to be incremented which is not
# what the spec wants.
await self.set_state(
UserID.from_string(user_id),
state={"presence": presence_state},
is_sync=True,
)

async def _end() -> None:
try:
Expand Down Expand Up @@ -1080,32 +1045,27 @@ async def update_external_syncs_row(
process_id, set()
)

updates = []
# USER_SYNC is sent when a user starts or stops syncing on a remote
# process. (But only for the initial and last device.)
#
# When a user *starts* syncing it also calls set_state(...) which
# will update the state, last_active_ts, and last_user_sync_ts.
# Simply ensure the user is tracked as syncing in this case.
#
# When a user *stops* syncing, update the last_user_sync_ts and mark
# them as no longer syncing. Note this doesn't quite match the
# monolith behaviour, which updates last_user_sync_ts at the end of
# every sync, not just the last in-flight sync.
if is_syncing and user_id not in process_presence:
if prev_state.state == PresenceState.OFFLINE:
updates.append(
prev_state.copy_and_replace(
state=PresenceState.ONLINE,
last_active_ts=sync_time_msec,
last_user_sync_ts=sync_time_msec,
)
)
else:
updates.append(
prev_state.copy_and_replace(last_user_sync_ts=sync_time_msec)
)
process_presence.add(user_id)
elif user_id in process_presence:
updates.append(
prev_state.copy_and_replace(last_user_sync_ts=sync_time_msec)
elif not is_syncing and user_id in process_presence:
new_state = prev_state.copy_and_replace(
last_user_sync_ts=sync_time_msec
)
await self._update_states([new_state])

if not is_syncing:
process_presence.discard(user_id)

if updates:
await self._update_states(updates)

self.external_process_last_updated_ms[process_id] = self.clock.time_msec()

async def update_external_syncs_clear(self, process_id: str) -> None:
Expand Down Expand Up @@ -1204,17 +1164,19 @@ async def set_state(
self,
target_user: UserID,
state: JsonDict,
ignore_status_msg: bool = False,
force_notify: bool = False,
is_sync: bool = False,
) -> None:
"""Set the presence state of the user.
Args:
target_user: The ID of the user to set the presence state of.
state: The presence state as a JSON dictionary.
ignore_status_msg: True to ignore the "status_msg" field of the `state` dict.
If False, the user's current status will be updated.
force_notify: Whether to force notification of the update to clients.
is_sync: True if this update was from a sync, which results in
*not* overriding a previously set BUSY status, updating the
user's last_user_sync_ts, and ignoring the "status_msg" field of
the `state` dict.
"""
status_msg = state.get("status_msg", None)
presence = state["presence"]
Expand All @@ -1227,18 +1189,27 @@ async def set_state(
return

user_id = target_user.to_string()
now = self.clock.time_msec()

prev_state = await self.current_state_for_user(user_id)

# Syncs do not override a previous presence of busy.
#
# TODO: This is a hack for lack of multi-device support. Unfortunately
# removing this requires coordination with clients.
if prev_state.state == PresenceState.BUSY and is_sync:
presence = PresenceState.BUSY

new_fields = {"state": presence}

if not ignore_status_msg:
new_fields["status_msg"] = status_msg
if presence == PresenceState.ONLINE or presence == PresenceState.BUSY:
new_fields["last_active_ts"] = now

if presence == PresenceState.ONLINE or (
presence == PresenceState.BUSY and self._busy_presence_enabled
):
new_fields["last_active_ts"] = self.clock.time_msec()
if is_sync:
new_fields["last_user_sync_ts"] = now
else:
# Syncs do not override the status message.
new_fields["status_msg"] = status_msg

await self._update_states(
[prev_state.copy_and_replace(**new_fields)], force_notify=force_notify
Expand Down
10 changes: 5 additions & 5 deletions synapse/replication/http/presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ class ReplicationPresenceSetState(ReplicationEndpoint):
{
"state": { ... },
"ignore_status_msg": false,
"force_notify": false
"force_notify": false,
"is_sync": false
}
200 OK
Expand All @@ -96,13 +96,13 @@ def __init__(self, hs: "HomeServer"):
async def _serialize_payload( # type: ignore[override]
user_id: str,
state: JsonDict,
ignore_status_msg: bool = False,
force_notify: bool = False,
is_sync: bool = False,
) -> JsonDict:
return {
"state": state,
"ignore_status_msg": ignore_status_msg,
"force_notify": force_notify,
"is_sync": is_sync,
}

async def _handle_request( # type: ignore[override]
Expand All @@ -111,8 +111,8 @@ async def _handle_request( # type: ignore[override]
await self._presence_handler.set_state(
UserID.from_string(user_id),
content["state"],
content["ignore_status_msg"],
content["force_notify"],
content.get("is_sync", False),
)

return (200, {})
Expand Down
37 changes: 30 additions & 7 deletions tests/handlers/test_presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -641,13 +641,20 @@ def test_external_process_timeout(self) -> None:
"""Test that if an external process doesn't update the records for a while
we time out their syncing users presence.
"""
process_id = "1"

# Notify handler that a user is now syncing.
# Create a worker and use it to handle /sync traffic instead.
# This is used to test that presence changes get replicated from workers
# to the main process correctly.
worker_to_sync_against = self.make_worker_hs(
"synapse.app.generic_worker", {"worker_name": "synchrotron"}
)
worker_presence_handler = worker_to_sync_against.get_presence_handler()

self.get_success(
self.presence_handler.update_external_syncs_row(
process_id, self.user_id, True, self.clock.time_msec()
)
worker_presence_handler.user_syncing(
self.user_id, True, PresenceState.ONLINE
),
by=0.1,
)

# Check that if we wait a while without telling the handler the user has
Expand Down Expand Up @@ -820,7 +827,7 @@ def test_set_presence_from_syncing_keeps_busy(
# This is used to test that presence changes get replicated from workers
# to the main process correctly.
worker_to_sync_against = self.make_worker_hs(
"synapse.app.generic_worker", {"worker_name": "presence_writer"}
"synapse.app.generic_worker", {"worker_name": "synchrotron"}
)

# Set presence to BUSY
Expand All @@ -832,14 +839,30 @@ def test_set_presence_from_syncing_keeps_busy(
self.get_success(
worker_to_sync_against.get_presence_handler().user_syncing(
self.user_id, True, PresenceState.ONLINE
)
),
by=0.1,
)

# Check against the main process that the user's presence did not change.
state = self.get_success(self.presence_handler.get_state(self.user_id_obj))
# we should still be busy
self.assertEqual(state.state, PresenceState.BUSY)

# Advance such that the device would be discarded if it was not busy,
# then pump so _handle_timeouts function to called.
self.reactor.advance(IDLE_TIMER / 1000)
self.reactor.pump([5])

# The account should still be busy.
state = self.get_success(self.presence_handler.get_state(self.user_id_obj))
self.assertEqual(state.state, PresenceState.BUSY)

# Ensure that a /presence call can set the user *off* busy.
self._set_presencestate_with_status_msg(PresenceState.ONLINE, status_msg)

state = self.get_success(self.presence_handler.get_state(self.user_id_obj))
self.assertEqual(state.state, PresenceState.ONLINE)

def _set_presencestate_with_status_msg(
self, state: str, status_msg: Optional[str]
) -> None:
Expand Down

0 comments on commit 1bf1436

Please sign in to comment.