From cce22cde8a8e3e57b92c241aaf4850cca74c4bd7 Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Thu, 15 Apr 2021 16:36:36 +0100 Subject: [PATCH 01/30] Add a util file for non API-specific constants, and move ONE_HOUR to it (This constant is used in the next commit.) So there's not much here right now, and perhaps this could fit better in another file. Though I didn't see anything quite appropriate. --- synapse/appservice/api.py | 3 +-- synapse/util/constants.py | 15 +++++++++++++++ 2 files changed, 16 insertions(+), 2 deletions(-) create mode 100644 synapse/util/constants.py diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py index fe04d7a67293..57a073ac77b7 100644 --- a/synapse/appservice/api.py +++ b/synapse/appservice/api.py @@ -24,6 +24,7 @@ from synapse.http.client import SimpleHttpClient from synapse.types import JsonDict, ThirdPartyInstanceID from synapse.util.caches.response_cache import ResponseCache +from synapse.util.constants import HOUR_IN_MS if TYPE_CHECKING: from synapse.appservice import ApplicationService @@ -46,8 +47,6 @@ "synapse_appservice_api_sent_events", "Number of events sent to the AS", ["service"] ) -HOUR_IN_MS = 60 * 60 * 1000 - APP_SERVICE_PREFIX = "/_matrix/app/unstable" diff --git a/synapse/util/constants.py b/synapse/util/constants.py new file mode 100644 index 000000000000..ece1941b06a2 --- /dev/null +++ b/synapse/util/constants.py @@ -0,0 +1,15 @@ +# Copyright 2021 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +HOUR_IN_MS = 60 * 60 * 1000 From 84fb5f06d30cac2fb2d6dd3ff06c3d029d97b21c Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Thu, 15 Apr 2021 16:35:12 +0100 Subject: [PATCH 02/30] Add migration and storage methods for users_to_send_full_presence_to table --- synapse/storage/databases/main/presence.py | 75 ++++++++++++++++++- .../59/13users_to_send_full_presence_to.sql | 34 +++++++++ 2 files changed, 108 insertions(+), 1 deletion(-) create mode 100644 synapse/storage/schema/main/delta/59/13users_to_send_full_presence_to.sql diff --git a/synapse/storage/databases/main/presence.py b/synapse/storage/databases/main/presence.py index db22fab23ea0..e34db0e42a9f 100644 --- a/synapse/storage/databases/main/presence.py +++ b/synapse/storage/databases/main/presence.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import TYPE_CHECKING, Dict, List, Tuple +from typing import TYPE_CHECKING, Dict, Iterable, List, Tuple from synapse.api.presence import PresenceState, UserPresenceState from synapse.replication.tcp.streams import PresenceStream @@ -210,6 +210,79 @@ async def get_presence_for_users(self, user_ids): return {row["user_id"]: UserPresenceState(**row) for row in rows} + async def is_user_in_users_to_send_full_presence_to(self, user_id: str) -> bool: + """Check whether the given user is one we need to send full presence to. + + Args: + user_id: The ID of the user to check. + + Returns: + True if the user should have full presence sent to them, False otherwise. + """ + + def _is_user_in_users_to_send_full_presence_to_txn(txn): + sql = """ + SELECT 1 FROM users_to_send_full_presence_to + WHERE user_id = ? + """ + txn.execute(sql, (user_id,)) + return bool(txn.fetchone()) + + return await self.db_pool.runInteraction( + "is_user_in_users_to_send_full_presence_to", + _is_user_in_users_to_send_full_presence_to_txn, + ) + + async def add_users_to_send_full_presence_to(self, user_ids: Iterable[str]): + """Adds to the list of users who should receive a full snapshot of presence + upon their next sync. + + Entries are kept for at least USERS_TO_SEND_FULL_PRESENCE_TO_ENTRY_LIFETIME_MS, + and are culled whenever this method is called. + + Args: + user_ids: An iterable of user IDs. + """ + time_now = self.hs.get_clock().time_msec() + + def _add_users_to_send_full_presence_to_txn(txn): + # Add user entries to the table (or update their added_ms if they already exist) + self.db_pool.simple_upsert_many_txn( + txn, + table="users_to_send_full_presence_to", + key_names=("user_id",), + key_values=((user_id,) for user_id in user_ids), + value_names=("added_ms",), + value_values=((time_now,) for _ in user_ids), + ) + + # Delete entries in the table that have expired + sql = """ + DELETE FROM users_to_send_full_presence_to + WHERE added_ms < ? + """ + txn.execute( + sql, (time_now - USERS_TO_SEND_FULL_PRESENCE_TO_ENTRY_LIFETIME_MS) + ) + + await self.db_pool.runInteraction( + "add_users_to_send_full_presence_to", + _add_users_to_send_full_presence_to_txn, + ) + + async def remove_user_from_users_to_send_full_presence_to(self, user_id: str): + """Removes a user from those to send full presence to. This should only be done + once we have sent full presence to them following a successful sync. + + Args: + user_id: The ID of the user to remove from the table. + """ + await self.db_pool.simple_delete( + table="users_to_send_full_presence_to", + keyvalues={"user_id": user_id}, + desc="remove_user_from_users_to_send_full_presence_to", + ) + async def get_presence_for_all_users( self, include_offline: bool = True, diff --git a/synapse/storage/schema/main/delta/59/13users_to_send_full_presence_to.sql b/synapse/storage/schema/main/delta/59/13users_to_send_full_presence_to.sql new file mode 100644 index 000000000000..07b0f53ecfa1 --- /dev/null +++ b/synapse/storage/schema/main/delta/59/13users_to_send_full_presence_to.sql @@ -0,0 +1,34 @@ +/* Copyright 2021 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- Add a table that keeps track of a list of users who should, upon their next +-- sync request, receive presence for all currently online users that they are +-- "interested" in. + +-- The motivation for a DB table over an in-memory list is so that this list +-- can be added to and retrieved from by any worker. Specifically, we don't +-- want to duplicate work across multiple sync workers. + +CREATE TABLE IF NOT EXISTS users_to_send_full_presence_to( + -- The user ID to send full presence to. + user_id TEXT PRIMARY KEY, + -- A presence stream ID token - the current presence stream token when the row was last upserted. + -- If a user calls /sync and this token is part of the update they're to receive, we also include + -- full user presence in the response. + -- This allows multiple devices for a user to receive full presence whenever they next call /sync. + presence_stream_id BIGINT, + FOREIGN KEY (user_id) + REFERENCES users (name) +); \ No newline at end of file From f7e68876f9cb4397bc33469234206a500e64cea2 Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Thu, 15 Apr 2021 16:38:39 +0100 Subject: [PATCH 03/30] Modify ModuleApi to upsert entries into our new table --- synapse/module_api/__init__.py | 53 ++++++++++------------ synapse/storage/databases/main/presence.py | 2 +- 2 files changed, 25 insertions(+), 30 deletions(-) diff --git a/synapse/module_api/__init__.py b/synapse/module_api/__init__.py index a1a2b9aeccd3..71ebbe457e25 100644 --- a/synapse/module_api/__init__.py +++ b/synapse/module_api/__init__.py @@ -56,14 +56,6 @@ def __init__(self, hs, auth_handler): self._http_client = hs.get_simple_http_client() # type: SimpleHttpClient self._public_room_list_manager = PublicRoomListManager(hs) - # The next time these users sync, they will receive the current presence - # state of all local users. Users are added by send_local_online_presence_to, - # and removed after a successful sync. - # - # We make this a private variable to deter modules from accessing it directly, - # though other classes in Synapse will still do so. - self._send_full_presence_to_local_users = set() - @property def http_client(self): """Allows making outbound HTTP requests to remote resources. @@ -414,30 +406,33 @@ async def send_local_online_presence_to(self, users: Iterable[str]) -> None: "on processes that send federation", ) + local_users = set() + remote_users = set() for user in users: if self._hs.is_mine_id(user): - # Modify SyncHandler._generate_sync_entry_for_presence to call - # presence_source.get_new_events with an empty `from_key` if - # that user's ID were in a list modified by ModuleApi somewhere. - # That user would then get all presence state on next incremental sync. - - # Force a presence initial_sync for this user next time - self._send_full_presence_to_local_users.add(user) + local_users.add(user) else: - # Retrieve presence state for currently online users that this user - # is considered interested in - presence_events, _ = await self._presence_stream.get_new_events( - UserID.from_string(user), from_key=None, include_offline=False - ) - - # Send to remote destinations. - - # We pull out the presence handler here to break a cyclic - # dependency between the presence router and module API. - presence_handler = self._hs.get_presence_handler() - await presence_handler.maybe_send_presence_to_interested_destinations( - presence_events - ) + remote_users.add(user) + + if local_users: + # Force a presence initial_sync for these users next time they sync. + await self._store.add_users_to_send_full_presence_to(local_users) + + for user in remote_users: + # Retrieve presence state for currently online users that this user + # is considered interested in + presence_events, _ = await self._presence_stream.get_new_events( + UserID.from_string(user), from_key=None, include_offline=False + ) + + # Send to remote destinations. + + # We pull out the presence handler here to break a cyclic + # dependency between the presence router and module API. + presence_handler = self._hs.get_presence_handler() + await presence_handler.maybe_send_presence_to_interested_destinations( + presence_events + ) class PublicRoomListManager: diff --git a/synapse/storage/databases/main/presence.py b/synapse/storage/databases/main/presence.py index e34db0e42a9f..d568139be628 100644 --- a/synapse/storage/databases/main/presence.py +++ b/synapse/storage/databases/main/presence.py @@ -262,7 +262,7 @@ def _add_users_to_send_full_presence_to_txn(txn): WHERE added_ms < ? """ txn.execute( - sql, (time_now - USERS_TO_SEND_FULL_PRESENCE_TO_ENTRY_LIFETIME_MS) + sql, (time_now - USERS_TO_SEND_FULL_PRESENCE_TO_ENTRY_LIFETIME_MS,) ) await self.db_pool.runInteraction( From 9d9f46db181688dd70fb5e6c9047be37fe17d554 Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Thu, 15 Apr 2021 16:38:51 +0100 Subject: [PATCH 04/30] Modify SyncHandler to pull from the new table --- synapse/handlers/presence.py | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 6fd1f34289f8..f3a4bd8a1df2 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -1389,11 +1389,10 @@ def __init__(self, hs: "HomeServer"): # # Presence -> Notifier -> PresenceEventSource -> Presence # - # Same with get_module_api, get_presence_router + # Same with get_presence_router: # # AuthHandler -> Notifier -> PresenceEventSource -> ModuleApi -> AuthHandler self.get_presence_handler = hs.get_presence_handler - self.get_module_api = hs.get_module_api self.get_presence_router = hs.get_presence_router self.clock = hs.get_clock() self.store = hs.get_datastore() @@ -1423,8 +1422,13 @@ async def get_new_events( user_id = user.to_string() stream_change_cache = self.store.presence_stream_cache + # Check if this user should receive all current, online user presence + user_in_users_to_send_full_presence_to = ( + await self.store.is_user_in_users_to_send_full_presence_to(user_id) + ) + with Measure(self.clock, "presence.get_new_events"): - if user_id in self.get_module_api()._send_full_presence_to_local_users: + if user_in_users_to_send_full_presence_to: # This user has been specified by a module to receive all current, online # user presence. Removing from_key and setting include_offline to false # will do effectively this. @@ -1468,8 +1472,8 @@ async def get_new_events( ) # Remove the user from the list of users to receive all presence - if user_id in self.get_module_api()._send_full_presence_to_local_users: - self.get_module_api()._send_full_presence_to_local_users.remove( + if user_in_users_to_send_full_presence_to: + await self.store.remove_user_from_users_to_send_full_presence_to( user_id ) @@ -1523,8 +1527,8 @@ async def get_new_events( presence_updates = list(users_to_state.values()) # Remove the user from the list of users to receive all presence - if user_id in self.get_module_api()._send_full_presence_to_local_users: - self.get_module_api()._send_full_presence_to_local_users.remove(user_id) + if user_in_users_to_send_full_presence_to: + await self.store.remove_user_from_users_to_send_full_presence_to(user_id) if not include_offline: # Filter out offline presence states From a71fd4003c67a90e7bd17545e7580e81ffcbc5f9 Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Thu, 15 Apr 2021 17:16:34 +0100 Subject: [PATCH 05/30] Changelog --- changelog.d/9823.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/9823.misc diff --git a/changelog.d/9823.misc b/changelog.d/9823.misc new file mode 100644 index 000000000000..bf924ab68cc4 --- /dev/null +++ b/changelog.d/9823.misc @@ -0,0 +1 @@ +Allow sending full presence to users via workers other than the one that called `ModuleApi.send_local_online_presence_to`. \ No newline at end of file From 0951d36304f08c3fba1aaa1b779479844749bc52 Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Wed, 5 May 2021 12:04:21 +0100 Subject: [PATCH 06/30] Remove users_to_send_full_presence_to table culling; add fk for user_id --- synapse/storage/databases/main/presence.py | 33 +++++-------------- .../59/13users_to_send_full_presence_to.sql | 6 +++- 2 files changed, 13 insertions(+), 26 deletions(-) diff --git a/synapse/storage/databases/main/presence.py b/synapse/storage/databases/main/presence.py index d568139be628..bd7d60be9ce9 100644 --- a/synapse/storage/databases/main/presence.py +++ b/synapse/storage/databases/main/presence.py @@ -243,31 +243,14 @@ async def add_users_to_send_full_presence_to(self, user_ids: Iterable[str]): Args: user_ids: An iterable of user IDs. """ - time_now = self.hs.get_clock().time_msec() - - def _add_users_to_send_full_presence_to_txn(txn): - # Add user entries to the table (or update their added_ms if they already exist) - self.db_pool.simple_upsert_many_txn( - txn, - table="users_to_send_full_presence_to", - key_names=("user_id",), - key_values=((user_id,) for user_id in user_ids), - value_names=("added_ms",), - value_values=((time_now,) for _ in user_ids), - ) - - # Delete entries in the table that have expired - sql = """ - DELETE FROM users_to_send_full_presence_to - WHERE added_ms < ? - """ - txn.execute( - sql, (time_now - USERS_TO_SEND_FULL_PRESENCE_TO_ENTRY_LIFETIME_MS,) - ) - - await self.db_pool.runInteraction( - "add_users_to_send_full_presence_to", - _add_users_to_send_full_presence_to_txn, + # Add user entries to the table + await self.db_pool.simple_upsert_many( + table="users_to_send_full_presence_to", + key_names=("user_id",), + key_values=[(user_id,) for user_id in user_ids], + value_names=(), + value_values=(), + desc="add_users_to_send_full_presence_to", ) async def remove_user_from_users_to_send_full_presence_to(self, user_id: str): diff --git a/synapse/storage/schema/main/delta/59/13users_to_send_full_presence_to.sql b/synapse/storage/schema/main/delta/59/13users_to_send_full_presence_to.sql index 07b0f53ecfa1..ad44b3290e08 100644 --- a/synapse/storage/schema/main/delta/59/13users_to_send_full_presence_to.sql +++ b/synapse/storage/schema/main/delta/59/13users_to_send_full_presence_to.sql @@ -31,4 +31,8 @@ CREATE TABLE IF NOT EXISTS users_to_send_full_presence_to( presence_stream_id BIGINT, FOREIGN KEY (user_id) REFERENCES users (name) -); \ No newline at end of file +); + +-- For checking expired entries +CREATE INDEX IF NOT EXISTS users_to_send_full_presence_to_added_ms + ON users_to_send_full_presence_to(added_ms); \ No newline at end of file From 3ebbc96b8a77e8d61fa0624c8b43615e912b30f4 Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Wed, 5 May 2021 16:02:36 +0100 Subject: [PATCH 07/30] Add presence_stream_id column to users_to_send_full_presence_to table We now return full presence in a /sync response based on the provided sync token, rather than if the user has received the response already. This is necessary in circumstances where users have more than one device - which they often do! This comment also modifies the spot in PresenceHandler where we check if the user should receive all presence. We only need to make the check if from_key is not None. If from_key is None, the user will be receiving all presence states anyways. --- synapse/handlers/presence.py | 34 +++++-------- synapse/storage/databases/main/presence.py | 50 +++++++++---------- .../59/13users_to_send_full_presence_to.sql | 6 +-- 3 files changed, 38 insertions(+), 52 deletions(-) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index f3a4bd8a1df2..051449fee6ba 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -1422,22 +1422,22 @@ async def get_new_events( user_id = user.to_string() stream_change_cache = self.store.presence_stream_cache - # Check if this user should receive all current, online user presence - user_in_users_to_send_full_presence_to = ( - await self.store.is_user_in_users_to_send_full_presence_to(user_id) - ) - with Measure(self.clock, "presence.get_new_events"): - if user_in_users_to_send_full_presence_to: - # This user has been specified by a module to receive all current, online - # user presence. Removing from_key and setting include_offline to false - # will do effectively this. - from_key = None - include_offline = False - if from_key is not None: from_key = int(from_key) + # Check if this user should receive all current, online user presence. We only + # bother to do this if from_key is set, as otherwise the user will receive all + # user presence anyways. + if await self.store.should_user_receive_full_presence_with_token( + user_id, from_key + ): + # This user has been specified by a module to receive all current, online + # user presence. Removing from_key and setting include_offline to false + # will do effectively this. + from_key = None + include_offline = False + max_token = self.store.get_current_presence_token() if from_key == max_token: # This is necessary as due to the way stream ID generators work @@ -1471,12 +1471,6 @@ async def get_new_events( user_id, include_offline, from_key ) - # Remove the user from the list of users to receive all presence - if user_in_users_to_send_full_presence_to: - await self.store.remove_user_from_users_to_send_full_presence_to( - user_id - ) - return presence_updates, max_token # Make mypy happy. users_interested_in should now be a set @@ -1526,10 +1520,6 @@ async def get_new_events( ) presence_updates = list(users_to_state.values()) - # Remove the user from the list of users to receive all presence - if user_in_users_to_send_full_presence_to: - await self.store.remove_user_from_users_to_send_full_presence_to(user_id) - if not include_offline: # Filter out offline presence states presence_updates = self._filter_offline_presence_state(presence_updates) diff --git a/synapse/storage/databases/main/presence.py b/synapse/storage/databases/main/presence.py index bd7d60be9ce9..3e72d1006246 100644 --- a/synapse/storage/databases/main/presence.py +++ b/synapse/storage/databases/main/presence.py @@ -210,62 +210,62 @@ async def get_presence_for_users(self, user_ids): return {row["user_id"]: UserPresenceState(**row) for row in rows} - async def is_user_in_users_to_send_full_presence_to(self, user_id: str) -> bool: - """Check whether the given user is one we need to send full presence to. + async def should_user_receive_full_presence_with_token( + self, + user_id: str, + from_token: int, + ) -> bool: + """Check whether the given user should receive full presence using the stream token + they're updating from. Args: user_id: The ID of the user to check. + from_token: The stream token included in their /sync token. Returns: True if the user should have full presence sent to them, False otherwise. """ - def _is_user_in_users_to_send_full_presence_to_txn(txn): + def _should_user_receive_full_presence_with_token_txn(txn): sql = """ SELECT 1 FROM users_to_send_full_presence_to WHERE user_id = ? + AND presence_stream_id >= ? """ - txn.execute(sql, (user_id,)) + txn.execute(sql, (user_id, from_token)) return bool(txn.fetchone()) return await self.db_pool.runInteraction( - "is_user_in_users_to_send_full_presence_to", - _is_user_in_users_to_send_full_presence_to_txn, + "should_user_receive_full_presence_with_token", + _should_user_receive_full_presence_with_token_txn, ) async def add_users_to_send_full_presence_to(self, user_ids: Iterable[str]): """Adds to the list of users who should receive a full snapshot of presence upon their next sync. - Entries are kept for at least USERS_TO_SEND_FULL_PRESENCE_TO_ENTRY_LIFETIME_MS, - and are culled whenever this method is called. - Args: user_ids: An iterable of user IDs. """ - # Add user entries to the table + + # Add user entries to the table, updating the presence_stream_id column if the user already + # exists in the table. await self.db_pool.simple_upsert_many( table="users_to_send_full_presence_to", key_names=("user_id",), key_values=[(user_id,) for user_id in user_ids], - value_names=(), - value_values=(), + value_names=("presence_stream_id",), + # We save the current presence stream ID token along with the user ID entry so + # that when a user /sync's, even if they syncing multiple times across separate + # devices at different times, each device will receive full presence once - when + # the presence stream ID in their sync token is less than the one in the table + # for their user ID. + value_values=( + (self._presence_id_gen.get_current_token(),) for _ in user_ids + ), desc="add_users_to_send_full_presence_to", ) - async def remove_user_from_users_to_send_full_presence_to(self, user_id: str): - """Removes a user from those to send full presence to. This should only be done - once we have sent full presence to them following a successful sync. - - Args: - user_id: The ID of the user to remove from the table. - """ - await self.db_pool.simple_delete( - table="users_to_send_full_presence_to", - keyvalues={"user_id": user_id}, - desc="remove_user_from_users_to_send_full_presence_to", - ) - async def get_presence_for_all_users( self, include_offline: bool = True, diff --git a/synapse/storage/schema/main/delta/59/13users_to_send_full_presence_to.sql b/synapse/storage/schema/main/delta/59/13users_to_send_full_presence_to.sql index ad44b3290e08..07b0f53ecfa1 100644 --- a/synapse/storage/schema/main/delta/59/13users_to_send_full_presence_to.sql +++ b/synapse/storage/schema/main/delta/59/13users_to_send_full_presence_to.sql @@ -31,8 +31,4 @@ CREATE TABLE IF NOT EXISTS users_to_send_full_presence_to( presence_stream_id BIGINT, FOREIGN KEY (user_id) REFERENCES users (name) -); - --- For checking expired entries -CREATE INDEX IF NOT EXISTS users_to_send_full_presence_to_added_ms - ON users_to_send_full_presence_to(added_ms); \ No newline at end of file +); \ No newline at end of file From 245e672cba9c4d69152be499d4865f76d7e8fa60 Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Mon, 10 May 2021 14:46:38 +0100 Subject: [PATCH 08/30] Updating tests to account for new stream-token based sync tracking --- synapse/rest/admin/server_notice_servlet.py | 5 +- tests/module_api/test_api.py | 276 ++++++++++++------ .../test_sharded_event_persister.py | 2 +- tests/utils.py | 2 +- 4 files changed, 194 insertions(+), 91 deletions(-) diff --git a/synapse/rest/admin/server_notice_servlet.py b/synapse/rest/admin/server_notice_servlet.py index cc3ab5854b0b..3def03515fa6 100644 --- a/synapse/rest/admin/server_notice_servlet.py +++ b/synapse/rest/admin/server_notice_servlet.py @@ -54,7 +54,6 @@ def __init__(self, hs: "HomeServer"): self.hs = hs self.auth = hs.get_auth() self.txns = HttpTransactionCache(hs) - self.snm = hs.get_server_notices_manager() def register(self, json_resource: HttpServer): PATTERN = "/send_server_notice" @@ -77,7 +76,7 @@ async def on_POST( event_type = body.get("type", EventTypes.Message) state_key = body.get("state_key") - if not self.snm.is_enabled(): + if not self.hs.get_server_notices_manager().is_enabled(): raise SynapseError(400, "Server notices are not enabled on this server") user_id = body["user_id"] @@ -85,7 +84,7 @@ async def on_POST( if not self.hs.is_mine_id(user_id): raise SynapseError(400, "Server notices can only be sent to local users") - event = await self.snm.send_notice( + event = await self.hs.get_server_notices_manager().send_notice( user_id=body["user_id"], type=event_type, state_key=state_key, diff --git a/tests/module_api/test_api.py b/tests/module_api/test_api.py index 742ad14b8c3a..78f666f95c7e 100644 --- a/tests/module_api/test_api.py +++ b/tests/module_api/test_api.py @@ -19,14 +19,16 @@ from synapse.handlers.presence import UserPresenceState from synapse.rest import admin from synapse.rest.client.v1 import login, presence, room -from synapse.types import create_requester +from synapse.types import create_requester, StreamToken from tests.events.test_presence_router import send_presence_update, sync_presence from tests.test_utils.event_injection import inject_member_event -from tests.unittest import FederatingHomeserverTestCase, override_config +from tests.unittest import HomeserverTestCase, override_config +from tests.replication._base import BaseMultiWorkerStreamTestCase +from tests.utils import USE_POSTGRES_FOR_TESTS -class ModuleApiTestCase(FederatingHomeserverTestCase): +class ModuleApiTestCase(HomeserverTestCase): servlets = [ admin.register_servlets, login.register_servlets, @@ -217,90 +219,9 @@ def test_public_rooms(self): ) self.assertFalse(is_in_public_rooms) - # The ability to send federation is required by send_local_online_presence_to. - @override_config({"send_federation": True}) def test_send_local_online_presence_to(self): - """Tests that send_local_presence_to_users sends local online presence to local users.""" - # Create a user who will send presence updates - self.presence_receiver_id = self.register_user("presence_receiver", "monkey") - self.presence_receiver_tok = self.login("presence_receiver", "monkey") - - # And another user that will send presence updates out - self.presence_sender_id = self.register_user("presence_sender", "monkey") - self.presence_sender_tok = self.login("presence_sender", "monkey") - - # Put them in a room together so they will receive each other's presence updates - room_id = self.helper.create_room_as( - self.presence_receiver_id, - tok=self.presence_receiver_tok, - ) - self.helper.join(room_id, self.presence_sender_id, tok=self.presence_sender_tok) - - # Presence sender comes online - send_presence_update( - self, - self.presence_sender_id, - self.presence_sender_tok, - "online", - "I'm online!", - ) - - # Presence receiver should have received it - presence_updates, sync_token = sync_presence(self, self.presence_receiver_id) - self.assertEqual(len(presence_updates), 1) - - presence_update = presence_updates[0] # type: UserPresenceState - self.assertEqual(presence_update.user_id, self.presence_sender_id) - self.assertEqual(presence_update.state, "online") - - # Syncing again should result in no presence updates - presence_updates, sync_token = sync_presence( - self, self.presence_receiver_id, sync_token - ) - self.assertEqual(len(presence_updates), 0) - - # Trigger sending local online presence - self.get_success( - self.module_api.send_local_online_presence_to( - [ - self.presence_receiver_id, - ] - ) - ) - - # Presence receiver should have received online presence again - presence_updates, sync_token = sync_presence( - self, self.presence_receiver_id, sync_token - ) - self.assertEqual(len(presence_updates), 1) - - presence_update = presence_updates[0] # type: UserPresenceState - self.assertEqual(presence_update.user_id, self.presence_sender_id) - self.assertEqual(presence_update.state, "online") - - # Presence sender goes offline - send_presence_update( - self, - self.presence_sender_id, - self.presence_sender_tok, - "offline", - "I slink back into the darkness.", - ) - - # Trigger sending local online presence - self.get_success( - self.module_api.send_local_online_presence_to( - [ - self.presence_receiver_id, - ] - ) - ) - - # Presence receiver should *not* have received offline state - presence_updates, sync_token = sync_presence( - self, self.presence_receiver_id, sync_token - ) - self.assertEqual(len(presence_updates), 0) + # Test sending local online presence to users from the main process + _test_sending_local_online_presence_to_local_user(self, test_with_workers=False) @override_config({"send_federation": True}) def test_send_local_online_presence_to_federation(self): @@ -374,3 +295,186 @@ def test_send_local_online_presence_to_federation(self): found_update = True self.assertTrue(found_update) + + +class ModuleApiWorkerTestCase(BaseMultiWorkerStreamTestCase): + """For testing ModuleApi functionality in a multi-worker setup""" + + # Testing stream ID replication from the main to worker processes requires postgres + # (due to needing `MultiWriterIdGenerator`). + if not USE_POSTGRES_FOR_TESTS: + skip = "Requires Postgres" + + servlets = [ + admin.register_servlets, + login.register_servlets, + room.register_servlets, + presence.register_servlets, + ] + + def prepare(self, reactor, clock, homeserver): + self.store = homeserver.get_datastore() + self.module_api = homeserver.get_module_api() + self.event_creation_handler = homeserver.get_event_creation_handler() + self.sync_handler = homeserver.get_sync_handler() + + def make_homeserver(self, reactor, clock): + return self.setup_test_homeserver() + + def _get_worker_hs_config(self) -> dict: + config = self.default_config() + config["worker_app"] = "synapse.app.generic_worker" + config["worker_replication_host"] = "testserv" + config["worker_replication_http_port"] = "8765" + + return config + + def test_send_local_online_presence_to_workers(self): + # Test sending local online presence to users from a worker process + _test_sending_local_online_presence_to_local_user(self, test_with_workers=True) + + +def _test_sending_local_online_presence_to_local_user(self: HomeserverTestCase, test_with_workers: bool = False): + """Tests that send_local_presence_to_users sends local online presence to local users. + + Args: + test_with_workers: If True, this method will call ModuleApi.send_local_online_presence_to on a + worker process. The test users will still sync with the main process. The purpose of testing + with a worker is to check whether a Synapse module running on a worker can inform other workers/ + the main process that they should include additional presence when a user next syncs. + """ + if test_with_workers: + # Create a worker process to make module_api calls against + worker_hs = self.make_worker_hs("synapse.app.client_reader") + + # Create a user who will send presence updates + self.presence_receiver_id = self.register_user("presence_receiver", "monkey") + self.presence_receiver_tok = self.login("presence_receiver", "monkey") + + # And another user that will send presence updates out + self.presence_sender_id = self.register_user("presence_sender", "monkey") + self.presence_sender_tok = self.login("presence_sender", "monkey") + + # Put them in a room together so they will receive each other's presence updates + room_id = self.helper.create_room_as( + self.presence_receiver_id, + tok=self.presence_receiver_tok, + ) + self.helper.join(room_id, self.presence_sender_id, tok=self.presence_sender_tok) + + # Presence sender comes online + send_presence_update( + self, + self.presence_sender_id, + self.presence_sender_tok, + "online", + "I'm online!", + ) + + # Presence receiver should have received it + presence_updates, sync_token = sync_presence(self, self.presence_receiver_id) + self.assertEqual(len(presence_updates), 1) + + presence_update = presence_updates[0] # type: UserPresenceState + self.assertEqual(presence_update.user_id, self.presence_sender_id) + self.assertEqual(presence_update.state, "online") + + if test_with_workers: + # Replicate the current sync presence token from the main process to the worker process. + # We need to do this so that the worker process knows the current presence stream ID to + # insert into the database when we call ModuleApi.send_local_online_presence_to. + self.replicate() + + # Syncing again should result in no presence updates + presence_updates, sync_token = sync_presence( + self, self.presence_receiver_id, sync_token + ) + self.assertEqual(len(presence_updates), 0) + + # We do an (initial) sync with a second "device" now, getting a new sync token. + # We'll use this in a moment. + _, sync_token_second_device = sync_presence(self, self.presence_receiver_id) + + # Determine on which worker to call ModuleApi.send_local_online_presence_to on + if test_with_workers: + module_api_to_use = worker_hs.get_module_api() + else: + module_api_to_use = self.module_api + + # Trigger sending local online presence on the worker process. We expect this information + # to be saved to the database where all other workers can access it. + self.get_success( + module_api_to_use.send_local_online_presence_to( + [ + self.presence_receiver_id, + ] + ) + ) + + if test_with_workers: + self.replicate() + self.pump(0.1) + + # The presence receiver should have received online presence again. + print("Sync token initially:", sync_token) + presence_updates, sync_token = sync_presence( + self, self.presence_receiver_id, sync_token + ) + self.assertEqual(len(presence_updates), 1) + print("Sync token after a sync:", sync_token) + + presence_update = presence_updates[0] # type: UserPresenceState + self.assertEqual(presence_update.user_id, self.presence_sender_id) + self.assertEqual(presence_update.state, "online") + + # We attempt to sync with the second sync token we received above - just to check that + # multiple syncing devices will each receive the necessary online presence. + presence_updates, sync_token_second_device = sync_presence( + self, self.presence_receiver_id, sync_token_second_device + ) + self.assertEqual(len(presence_updates), 1) + + presence_update = presence_updates[0] # type: UserPresenceState + self.assertEqual(presence_update.user_id, self.presence_sender_id) + self.assertEqual(presence_update.state, "online") + + # However, if we now sync with either "device", we won't receive another burst of online presence + # until the API is called again sometime in the future + presence_updates, sync_token = sync_presence( + self, self.presence_receiver_id, sync_token + ) + print("Sync token after the second sync:", sync_token) + print(presence_updates) + + # Now we check that we don't receive *offline* updates using ModuleApi.send_local_online_presence_to. + + # Presence sender goes offline + send_presence_update( + self, + self.presence_sender_id, + self.presence_sender_tok, + "offline", + "I slink back into the darkness.", + ) + + # Presence receiver should have received the updated, offline state + presence_updates, sync_token = sync_presence( + self, self.presence_receiver_id, sync_token + ) + print("Sync token after the third sync:", sync_token) + self.assertEqual(len(presence_updates), 1) + + # Now trigger sending local online presence. + self.get_success( + self.module_api.send_local_online_presence_to( + [ + self.presence_receiver_id, + ] + ) + ) + + # Presence receiver should *not* have received offline state + presence_updates, sync_token = sync_presence( + self, self.presence_receiver_id, sync_token + ) + self.assertEqual(len(presence_updates), 0) diff --git a/tests/replication/test_sharded_event_persister.py b/tests/replication/test_sharded_event_persister.py index d739eb6b17f0..5eca5c165d0a 100644 --- a/tests/replication/test_sharded_event_persister.py +++ b/tests/replication/test_sharded_event_persister.py @@ -30,7 +30,7 @@ class EventPersisterShardTestCase(BaseMultiWorkerStreamTestCase): """Checks event persisting sharding works""" # Event persister sharding requires postgres (due to needing - # `MutliWriterIdGenerator`). + # `MultiWriterIdGenerator`). if not USE_POSTGRES_FOR_TESTS: skip = "Requires Postgres" diff --git a/tests/utils.py b/tests/utils.py index 6bd008dcfe21..f695a4d9639b 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -236,7 +236,7 @@ def setup_test_homeserver( else: database_config = { "name": "sqlite3", - "args": {"database": ":memory:", "cp_min": 1, "cp_max": 1}, + "args": {"database": "test.db", "cp_min": 1, "cp_max": 1}, } database = DatabaseConnectionConfig("master", database_config) From b4e7e9d0f358dd3ab34b0e7d0228da30e0d6bcb6 Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Tue, 11 May 2021 14:16:47 +0100 Subject: [PATCH 09/30] Fix stream token multiple devices --- synapse/handlers/presence.py | 24 ++++++++++++++++++++++ synapse/storage/databases/main/presence.py | 9 ++++++++ 2 files changed, 33 insertions(+) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 051449fee6ba..2272c69f8ea7 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -296,6 +296,30 @@ async def maybe_send_presence_to_interested_destinations( for destinations, states in hosts_and_states: self._federation.send_presence_to_destinations(states, destinations) + async def resend_current_presence_for_users(self, user_ids: Iterable[str]): + """ + Grabs the current presence state for a given set of users and adds it + to the top of the presence stream. + + Args: + user_ids: The IDs of the users to use. + """ + # Get the current presence state for each user (defaults to offline if not found) + current_presence_for_users = await self.current_state_for_users(user_ids) + + for user_id, current_presence_state in current_presence_for_users.items(): + # Convert the UserPresenceState object into a serializable dict + state = { + "presence": current_presence_state.state, + "status_message": current_presence_state.status_msg, + } + + # Copy the presence state to the tip of the presence stream + print(f"Adding a presence update for {user_id}: {state}") + await self.set_state(UserID.from_string(user_id), state) + + print("bla") + class _NullContextManager(ContextManager[None]): """A context manager which does nothing.""" diff --git a/synapse/storage/databases/main/presence.py b/synapse/storage/databases/main/presence.py index 3e72d1006246..9079b5f59203 100644 --- a/synapse/storage/databases/main/presence.py +++ b/synapse/storage/databases/main/presence.py @@ -57,6 +57,7 @@ def __init__( db_conn, "presence_stream", "stream_id" ) + self.hs = hs self._presence_on_startup = self._get_active_presence(db_conn) presence_cache_prefill, min_presence_val = self.db_pool.get_cache_dict( @@ -266,6 +267,14 @@ async def add_users_to_send_full_presence_to(self, user_ids: Iterable[str]): desc="add_users_to_send_full_presence_to", ) + # Add a new entry to the presence stream. Since we use stream tokens to determine whether a + # local user should receive a full snapshot presence when they sync, we need to bump the + # presence stream so that subsequent syncs with no presence activity in between won't result + # in the client receiving multiple full snapshots of presence. + # If we bump the stream ID, then the user will get a higher stream token next sync, and thus + # won't receive another snapshot. + await self.hs.get_presence_handler().resend_current_presence_for_users(user_ids) + async def get_presence_for_all_users( self, include_offline: bool = True, From 337197d90045b799d445196611e8f5c85b78379a Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Tue, 11 May 2021 18:26:25 +0100 Subject: [PATCH 10/30] Add a new key to presence_set_state replication request, force_notify If force_notify is True, we will create a new stream ID for this presence update, even if it matches the user's previous presence update state. Creating a new stream ID will notify clients, hence 'force_notify'. We're resending the current state of the user who's getting full presence in order to force the presence stream ID to advance 1 - as we compare stream IDs when checking if a device (who's presencing a presence stream ID as part of their sync token) has already received full online presence as part of a previous sync. If we didn't do so, it's possible for a syncing device to receive full online presence multiple times if the presence stream doesn't advance otherwise. --- synapse/handlers/presence.py | 15 ++++++++++----- synapse/replication/http/presence.py | 3 ++- tests/module_api/test_api.py | 1 - 3 files changed, 12 insertions(+), 7 deletions(-) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 2272c69f8ea7..ba76312baaf3 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -222,7 +222,7 @@ async def current_state_for_users( @abc.abstractmethod async def set_state( - self, target_user: UserID, state: JsonDict, ignore_status_msg: bool = False + self, target_user: UserID, state: JsonDict, ignore_status_msg: bool = False, force_notify: bool = False ) -> None: """Set the presence state of the user. """ @@ -316,7 +316,7 @@ async def resend_current_presence_for_users(self, user_ids: Iterable[str]): # Copy the presence state to the tip of the presence stream print(f"Adding a presence update for {user_id}: {state}") - await self.set_state(UserID.from_string(user_id), state) + await self.set_state(UserID.from_string(user_id), state, force_notify=True) print("bla") @@ -504,6 +504,7 @@ async def set_state( target_user: UserID, state: JsonDict, ignore_status_msg: bool = False, + force_notify: bool = False, ) -> None: """Set the presence state of the user.""" presence = state["presence"] @@ -532,6 +533,7 @@ async def set_state( user_id=user_id, state=state, ignore_status_msg=ignore_status_msg, + force_notify=force_notify, ) async def bump_presence_active_time(self, user: UserID) -> None: @@ -701,7 +703,7 @@ async def _persist_unpersisted_changes(self) -> None: [self.user_to_current_state[user_id] for user_id in unpersisted] ) - async def _update_states(self, new_states: Iterable[UserPresenceState]) -> None: + async def _update_states(self, new_states: Iterable[UserPresenceState], force_notify: bool = False) -> None: """Updates presence of users. Sets the appropriate timeouts. Pokes the notifier and federation if and only if the changed presence state should be sent to clients/servers. @@ -744,6 +746,9 @@ async def _update_states(self, new_states: Iterable[UserPresenceState]) -> None: now=now, ) + if force_notify: + should_notify = True + self.user_to_current_state[user_id] = new_state if should_notify: @@ -1082,7 +1087,7 @@ async def incoming_presence(self, origin: str, content: JsonDict) -> None: await self._update_states(updates) async def set_state( - self, target_user: UserID, state: JsonDict, ignore_status_msg: bool = False + self, target_user: UserID, state: JsonDict, ignore_status_msg: bool = False, force_notify: bool = False ) -> None: """Set the presence state of the user.""" status_msg = state.get("status_msg", None) @@ -1115,7 +1120,7 @@ async def set_state( ): new_fields["last_active_ts"] = self.clock.time_msec() - await self._update_states([prev_state.copy_and_replace(**new_fields)]) + await self._update_states([prev_state.copy_and_replace(**new_fields)], force_notify=force_notify) async def is_visible(self, observed_user: UserID, observer_user: UserID) -> bool: """Returns whether a user can see another user's presence.""" diff --git a/synapse/replication/http/presence.py b/synapse/replication/http/presence.py index f25307620d55..730b8c765f3c 100644 --- a/synapse/replication/http/presence.py +++ b/synapse/replication/http/presence.py @@ -91,10 +91,11 @@ def __init__(self, hs: "HomeServer"): self._presence_handler = hs.get_presence_handler() @staticmethod - async def _serialize_payload(user_id, state, ignore_status_msg=False): + async def _serialize_payload(user_id, state, ignore_status_msg=False, force_notify=False): return { "state": state, "ignore_status_msg": ignore_status_msg, + "force_notify": force_notify, } async def _handle_request(self, request, user_id): diff --git a/tests/module_api/test_api.py b/tests/module_api/test_api.py index 78f666f95c7e..0b14418e40c7 100644 --- a/tests/module_api/test_api.py +++ b/tests/module_api/test_api.py @@ -413,7 +413,6 @@ def _test_sending_local_online_presence_to_local_user(self: HomeserverTestCase, if test_with_workers: self.replicate() - self.pump(0.1) # The presence receiver should have received online presence again. print("Sync token initially:", sync_token) From 0b1523010505edadc7d9702a152372a81b3bd6a4 Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Wed, 12 May 2021 13:41:16 +0100 Subject: [PATCH 11/30] self -> test_case. Fix worker-based test It turned out we needed to advance the reactor in order to successfully write the body of the replication request from the worker to the main process. We do this so that we can test calling send_local_online_presence_to on a worker, while a client is syncing via the main process. --- synapse/handlers/presence.py | 3 - synapse/replication/http/presence.py | 3 +- tests/module_api/test_api.py | 140 +++++++++++++-------------- 3 files changed, 70 insertions(+), 76 deletions(-) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index ba76312baaf3..fc8626294507 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -315,11 +315,8 @@ async def resend_current_presence_for_users(self, user_ids: Iterable[str]): } # Copy the presence state to the tip of the presence stream - print(f"Adding a presence update for {user_id}: {state}") await self.set_state(UserID.from_string(user_id), state, force_notify=True) - print("bla") - class _NullContextManager(ContextManager[None]): """A context manager which does nothing.""" diff --git a/synapse/replication/http/presence.py b/synapse/replication/http/presence.py index 730b8c765f3c..b98f39876aa8 100644 --- a/synapse/replication/http/presence.py +++ b/synapse/replication/http/presence.py @@ -73,6 +73,7 @@ class ReplicationPresenceSetState(ReplicationEndpoint): { "state": { ... }, "ignore_status_msg": false, + "force_notify": false } 200 OK @@ -102,7 +103,7 @@ async def _handle_request(self, request, user_id): content = parse_json_object_from_request(request) await self._presence_handler.set_state( - UserID.from_string(user_id), content["state"], content["ignore_status_msg"] + UserID.from_string(user_id), content["state"], content["ignore_status_msg"], content["force_notify"], ) return ( diff --git a/tests/module_api/test_api.py b/tests/module_api/test_api.py index 0b14418e40c7..6549ace2c4e2 100644 --- a/tests/module_api/test_api.py +++ b/tests/module_api/test_api.py @@ -11,6 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +from twisted.internet import defer from unittest.mock import Mock from synapse.api.constants import EduTypes @@ -227,8 +228,8 @@ def test_send_local_online_presence_to(self): def test_send_local_online_presence_to_federation(self): """Tests that send_local_presence_to_users sends local online presence to remote users.""" # Create a user who will send presence updates - self.presence_sender_id = self.register_user("presence_sender", "monkey") - self.presence_sender_tok = self.login("presence_sender", "monkey") + self.presence_sender_id = self.register_user("presence_sender1", "monkey") + self.presence_sender_tok = self.login("presence_sender1", "monkey") # And a room they're a part of room_id = self.helper.create_room_as( @@ -313,30 +314,24 @@ class ModuleApiWorkerTestCase(BaseMultiWorkerStreamTestCase): ] def prepare(self, reactor, clock, homeserver): - self.store = homeserver.get_datastore() self.module_api = homeserver.get_module_api() - self.event_creation_handler = homeserver.get_event_creation_handler() self.sync_handler = homeserver.get_sync_handler() - def make_homeserver(self, reactor, clock): - return self.setup_test_homeserver() - - def _get_worker_hs_config(self) -> dict: - config = self.default_config() - config["worker_app"] = "synapse.app.generic_worker" - config["worker_replication_host"] = "testserv" - config["worker_replication_http_port"] = "8765" - - return config - def test_send_local_online_presence_to_workers(self): # Test sending local online presence to users from a worker process _test_sending_local_online_presence_to_local_user(self, test_with_workers=True) -def _test_sending_local_online_presence_to_local_user(self: HomeserverTestCase, test_with_workers: bool = False): +def _test_sending_local_online_presence_to_local_user(test_case: HomeserverTestCase, test_with_workers: bool = False): """Tests that send_local_presence_to_users sends local online presence to local users. + This simultaneously tests two different usecases: + * Testing that this method works when either called from a worker or the main process. + - We test this by calling this method from both a TestCase that runs in monolith mode, and one that + runs with a main and generic_worker. + * Testing that multiple devices syncing simultaneously will all receive a snapshot of local, + online presence - but only once per device. + Args: test_with_workers: If True, this method will call ModuleApi.send_local_online_presence_to on a worker process. The test users will still sync with the main process. The purpose of testing @@ -345,135 +340,136 @@ def _test_sending_local_online_presence_to_local_user(self: HomeserverTestCase, """ if test_with_workers: # Create a worker process to make module_api calls against - worker_hs = self.make_worker_hs("synapse.app.client_reader") + worker_hs = test_case.make_worker_hs("synapse.app.generic_worker") # Create a user who will send presence updates - self.presence_receiver_id = self.register_user("presence_receiver", "monkey") - self.presence_receiver_tok = self.login("presence_receiver", "monkey") + test_case.presence_receiver_id = test_case.register_user("presence_receiver1", "monkey") + test_case.presence_receiver_tok = test_case.login("presence_receiver1", "monkey") # And another user that will send presence updates out - self.presence_sender_id = self.register_user("presence_sender", "monkey") - self.presence_sender_tok = self.login("presence_sender", "monkey") + test_case.presence_sender_id = test_case.register_user("presence_sender2", "monkey") + test_case.presence_sender_tok = test_case.login("presence_sender2", "monkey") # Put them in a room together so they will receive each other's presence updates - room_id = self.helper.create_room_as( - self.presence_receiver_id, - tok=self.presence_receiver_tok, + room_id = test_case.helper.create_room_as( + test_case.presence_receiver_id, + tok=test_case.presence_receiver_tok, ) - self.helper.join(room_id, self.presence_sender_id, tok=self.presence_sender_tok) + test_case.helper.join(room_id, test_case.presence_sender_id, tok=test_case.presence_sender_tok) # Presence sender comes online send_presence_update( - self, - self.presence_sender_id, - self.presence_sender_tok, + test_case, + test_case.presence_sender_id, + test_case.presence_sender_tok, "online", "I'm online!", ) # Presence receiver should have received it - presence_updates, sync_token = sync_presence(self, self.presence_receiver_id) - self.assertEqual(len(presence_updates), 1) + presence_updates, sync_token = sync_presence(test_case, test_case.presence_receiver_id) + test_case.assertEqual(len(presence_updates), 1) presence_update = presence_updates[0] # type: UserPresenceState - self.assertEqual(presence_update.user_id, self.presence_sender_id) - self.assertEqual(presence_update.state, "online") + test_case.assertEqual(presence_update.user_id, test_case.presence_sender_id) + test_case.assertEqual(presence_update.state, "online") if test_with_workers: # Replicate the current sync presence token from the main process to the worker process. # We need to do this so that the worker process knows the current presence stream ID to # insert into the database when we call ModuleApi.send_local_online_presence_to. - self.replicate() + test_case.replicate() # Syncing again should result in no presence updates presence_updates, sync_token = sync_presence( - self, self.presence_receiver_id, sync_token + test_case, test_case.presence_receiver_id, sync_token ) - self.assertEqual(len(presence_updates), 0) + test_case.assertEqual(len(presence_updates), 0) # We do an (initial) sync with a second "device" now, getting a new sync token. # We'll use this in a moment. - _, sync_token_second_device = sync_presence(self, self.presence_receiver_id) + _, sync_token_second_device = sync_presence(test_case, test_case.presence_receiver_id) - # Determine on which worker to call ModuleApi.send_local_online_presence_to on + # Determine on which process (main or worker) to call ModuleApi.send_local_online_presence_to on if test_with_workers: module_api_to_use = worker_hs.get_module_api() else: - module_api_to_use = self.module_api - - # Trigger sending local online presence on the worker process. We expect this information - # to be saved to the database where all other workers can access it. - self.get_success( - module_api_to_use.send_local_online_presence_to( - [ - self.presence_receiver_id, - ] - ) + module_api_to_use = test_case.module_api + + # Trigger sending local online presence. We expect this information + # to be saved to the database where all processes can access it. + # Note that we're syncing via the master. + d = module_api_to_use.send_local_online_presence_to( + [ + test_case.presence_receiver_id, + ] ) + d = defer.ensureDeferred(d) if test_with_workers: - self.replicate() + # In order for the required presence_set_state replication request to occur between the + # worker and main process, we need to pump the reactor. Otherwise, the coordinator that + # reads the request on the main process won't do so, and the request will time out. + while not d.called: + test_case.reactor.advance(0.1) + + test_case.get_success(d) # The presence receiver should have received online presence again. - print("Sync token initially:", sync_token) presence_updates, sync_token = sync_presence( - self, self.presence_receiver_id, sync_token + test_case, test_case.presence_receiver_id, sync_token ) - self.assertEqual(len(presence_updates), 1) - print("Sync token after a sync:", sync_token) + test_case.assertEqual(len(presence_updates), 1) presence_update = presence_updates[0] # type: UserPresenceState - self.assertEqual(presence_update.user_id, self.presence_sender_id) - self.assertEqual(presence_update.state, "online") + test_case.assertEqual(presence_update.user_id, test_case.presence_sender_id) + test_case.assertEqual(presence_update.state, "online") # We attempt to sync with the second sync token we received above - just to check that # multiple syncing devices will each receive the necessary online presence. presence_updates, sync_token_second_device = sync_presence( - self, self.presence_receiver_id, sync_token_second_device + test_case, test_case.presence_receiver_id, sync_token_second_device ) - self.assertEqual(len(presence_updates), 1) + test_case.assertEqual(len(presence_updates), 1) presence_update = presence_updates[0] # type: UserPresenceState - self.assertEqual(presence_update.user_id, self.presence_sender_id) - self.assertEqual(presence_update.state, "online") + test_case.assertEqual(presence_update.user_id, test_case.presence_sender_id) + test_case.assertEqual(presence_update.state, "online") # However, if we now sync with either "device", we won't receive another burst of online presence # until the API is called again sometime in the future presence_updates, sync_token = sync_presence( - self, self.presence_receiver_id, sync_token + test_case, test_case.presence_receiver_id, sync_token ) - print("Sync token after the second sync:", sync_token) - print(presence_updates) # Now we check that we don't receive *offline* updates using ModuleApi.send_local_online_presence_to. # Presence sender goes offline send_presence_update( - self, - self.presence_sender_id, - self.presence_sender_tok, + test_case, + test_case.presence_sender_id, + test_case.presence_sender_tok, "offline", "I slink back into the darkness.", ) # Presence receiver should have received the updated, offline state presence_updates, sync_token = sync_presence( - self, self.presence_receiver_id, sync_token + test_case, test_case.presence_receiver_id, sync_token ) - print("Sync token after the third sync:", sync_token) - self.assertEqual(len(presence_updates), 1) + test_case.assertEqual(len(presence_updates), 1) # Now trigger sending local online presence. - self.get_success( - self.module_api.send_local_online_presence_to( + test_case.get_success( + test_case.module_api.send_local_online_presence_to( [ - self.presence_receiver_id, + test_case.presence_receiver_id, ] ) ) # Presence receiver should *not* have received offline state presence_updates, sync_token = sync_presence( - self, self.presence_receiver_id, sync_token + test_case, test_case.presence_receiver_id, sync_token ) - self.assertEqual(len(presence_updates), 0) + test_case.assertEqual(len(presence_updates), 0) From 822f87a8746b022599b32ef97e66c70f6aec8375 Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Wed, 12 May 2021 15:46:22 +0100 Subject: [PATCH 12/30] Modify send_local_online_presence_to to send to local users on non-fed workers --- synapse/module_api/__init__.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/synapse/module_api/__init__.py b/synapse/module_api/__init__.py index 71ebbe457e25..71422be8b48f 100644 --- a/synapse/module_api/__init__.py +++ b/synapse/module_api/__init__.py @@ -400,12 +400,6 @@ async def send_local_online_presence_to(self, users: Iterable[str]) -> None: Note that this method can only be run on the main or federation_sender worker processes. """ - if not self._hs.should_send_federation(): - raise Exception( - "send_local_online_presence_to can only be run " - "on processes that send federation", - ) - local_users = set() remote_users = set() for user in users: @@ -414,6 +408,12 @@ async def send_local_online_presence_to(self, users: Iterable[str]) -> None: else: remote_users.add(user) + if remote_users and not self._hs.should_send_federation(): + raise Exception( + "send_local_online_presence_to can only be called " + "with remote users on processes that send federation", + ) + if local_users: # Force a presence initial_sync for these users next time they sync. await self._store.add_users_to_send_full_presence_to(local_users) From 2e408ac42b4ada16f7949f8973fbab7aabcf9d9c Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Wed, 12 May 2021 15:48:09 +0100 Subject: [PATCH 13/30] lint --- synapse/handlers/presence.py | 20 ++++++++++++++++---- synapse/replication/http/presence.py | 9 +++++++-- tests/module_api/test_api.py | 27 +++++++++++++++++++-------- 3 files changed, 42 insertions(+), 14 deletions(-) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index fc8626294507..088d26003323 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -222,7 +222,11 @@ async def current_state_for_users( @abc.abstractmethod async def set_state( - self, target_user: UserID, state: JsonDict, ignore_status_msg: bool = False, force_notify: bool = False + self, + target_user: UserID, + state: JsonDict, + ignore_status_msg: bool = False, + force_notify: bool = False, ) -> None: """Set the presence state of the user. """ @@ -700,7 +704,9 @@ async def _persist_unpersisted_changes(self) -> None: [self.user_to_current_state[user_id] for user_id in unpersisted] ) - async def _update_states(self, new_states: Iterable[UserPresenceState], force_notify: bool = False) -> None: + async def _update_states( + self, new_states: Iterable[UserPresenceState], force_notify: bool = False + ) -> None: """Updates presence of users. Sets the appropriate timeouts. Pokes the notifier and federation if and only if the changed presence state should be sent to clients/servers. @@ -1084,7 +1090,11 @@ async def incoming_presence(self, origin: str, content: JsonDict) -> None: await self._update_states(updates) async def set_state( - self, target_user: UserID, state: JsonDict, ignore_status_msg: bool = False, force_notify: bool = False + self, + target_user: UserID, + state: JsonDict, + ignore_status_msg: bool = False, + force_notify: bool = False, ) -> None: """Set the presence state of the user.""" status_msg = state.get("status_msg", None) @@ -1117,7 +1127,9 @@ async def set_state( ): new_fields["last_active_ts"] = self.clock.time_msec() - await self._update_states([prev_state.copy_and_replace(**new_fields)], force_notify=force_notify) + await self._update_states( + [prev_state.copy_and_replace(**new_fields)], force_notify=force_notify + ) async def is_visible(self, observed_user: UserID, observer_user: UserID) -> bool: """Returns whether a user can see another user's presence.""" diff --git a/synapse/replication/http/presence.py b/synapse/replication/http/presence.py index b98f39876aa8..bb0024795349 100644 --- a/synapse/replication/http/presence.py +++ b/synapse/replication/http/presence.py @@ -92,7 +92,9 @@ def __init__(self, hs: "HomeServer"): self._presence_handler = hs.get_presence_handler() @staticmethod - async def _serialize_payload(user_id, state, ignore_status_msg=False, force_notify=False): + async def _serialize_payload( + user_id, state, ignore_status_msg=False, force_notify=False + ): return { "state": state, "ignore_status_msg": ignore_status_msg, @@ -103,7 +105,10 @@ async def _handle_request(self, request, user_id): content = parse_json_object_from_request(request) await self._presence_handler.set_state( - UserID.from_string(user_id), content["state"], content["ignore_status_msg"], content["force_notify"], + UserID.from_string(user_id), + content["state"], + content["ignore_status_msg"], + content["force_notify"], ) return ( diff --git a/tests/module_api/test_api.py b/tests/module_api/test_api.py index 6549ace2c4e2..63e06096a754 100644 --- a/tests/module_api/test_api.py +++ b/tests/module_api/test_api.py @@ -11,21 +11,22 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from twisted.internet import defer from unittest.mock import Mock +from twisted.internet import defer + from synapse.api.constants import EduTypes from synapse.events import EventBase from synapse.federation.units import Transaction from synapse.handlers.presence import UserPresenceState from synapse.rest import admin from synapse.rest.client.v1 import login, presence, room -from synapse.types import create_requester, StreamToken +from synapse.types import create_requester from tests.events.test_presence_router import send_presence_update, sync_presence +from tests.replication._base import BaseMultiWorkerStreamTestCase from tests.test_utils.event_injection import inject_member_event from tests.unittest import HomeserverTestCase, override_config -from tests.replication._base import BaseMultiWorkerStreamTestCase from tests.utils import USE_POSTGRES_FOR_TESTS @@ -322,7 +323,9 @@ def test_send_local_online_presence_to_workers(self): _test_sending_local_online_presence_to_local_user(self, test_with_workers=True) -def _test_sending_local_online_presence_to_local_user(test_case: HomeserverTestCase, test_with_workers: bool = False): +def _test_sending_local_online_presence_to_local_user( + test_case: HomeserverTestCase, test_with_workers: bool = False +): """Tests that send_local_presence_to_users sends local online presence to local users. This simultaneously tests two different usecases: @@ -343,7 +346,9 @@ def _test_sending_local_online_presence_to_local_user(test_case: HomeserverTestC worker_hs = test_case.make_worker_hs("synapse.app.generic_worker") # Create a user who will send presence updates - test_case.presence_receiver_id = test_case.register_user("presence_receiver1", "monkey") + test_case.presence_receiver_id = test_case.register_user( + "presence_receiver1", "monkey" + ) test_case.presence_receiver_tok = test_case.login("presence_receiver1", "monkey") # And another user that will send presence updates out @@ -355,7 +360,9 @@ def _test_sending_local_online_presence_to_local_user(test_case: HomeserverTestC test_case.presence_receiver_id, tok=test_case.presence_receiver_tok, ) - test_case.helper.join(room_id, test_case.presence_sender_id, tok=test_case.presence_sender_tok) + test_case.helper.join( + room_id, test_case.presence_sender_id, tok=test_case.presence_sender_tok + ) # Presence sender comes online send_presence_update( @@ -367,7 +374,9 @@ def _test_sending_local_online_presence_to_local_user(test_case: HomeserverTestC ) # Presence receiver should have received it - presence_updates, sync_token = sync_presence(test_case, test_case.presence_receiver_id) + presence_updates, sync_token = sync_presence( + test_case, test_case.presence_receiver_id + ) test_case.assertEqual(len(presence_updates), 1) presence_update = presence_updates[0] # type: UserPresenceState @@ -388,7 +397,9 @@ def _test_sending_local_online_presence_to_local_user(test_case: HomeserverTestC # We do an (initial) sync with a second "device" now, getting a new sync token. # We'll use this in a moment. - _, sync_token_second_device = sync_presence(test_case, test_case.presence_receiver_id) + _, sync_token_second_device = sync_presence( + test_case, test_case.presence_receiver_id + ) # Determine on which process (main or worker) to call ModuleApi.send_local_online_presence_to on if test_with_workers: From 7ca661853fc70c11fcf81d464d3c98dd3f6571a9 Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Wed, 12 May 2021 15:52:44 +0100 Subject: [PATCH 14/30] Revert server_notices_manager change --- synapse/rest/admin/server_notice_servlet.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/synapse/rest/admin/server_notice_servlet.py b/synapse/rest/admin/server_notice_servlet.py index 3def03515fa6..cc3ab5854b0b 100644 --- a/synapse/rest/admin/server_notice_servlet.py +++ b/synapse/rest/admin/server_notice_servlet.py @@ -54,6 +54,7 @@ def __init__(self, hs: "HomeServer"): self.hs = hs self.auth = hs.get_auth() self.txns = HttpTransactionCache(hs) + self.snm = hs.get_server_notices_manager() def register(self, json_resource: HttpServer): PATTERN = "/send_server_notice" @@ -76,7 +77,7 @@ async def on_POST( event_type = body.get("type", EventTypes.Message) state_key = body.get("state_key") - if not self.hs.get_server_notices_manager().is_enabled(): + if not self.snm.is_enabled(): raise SynapseError(400, "Server notices are not enabled on this server") user_id = body["user_id"] @@ -84,7 +85,7 @@ async def on_POST( if not self.hs.is_mine_id(user_id): raise SynapseError(400, "Server notices can only be sent to local users") - event = await self.hs.get_server_notices_manager().send_notice( + event = await self.snm.send_notice( user_id=body["user_id"], type=event_type, state_key=state_key, From 6b5401056e187ff581825685590deab6fd16c0df Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Wed, 12 May 2021 15:53:01 +0100 Subject: [PATCH 15/30] Don't commit :memory: -> test.db :) --- tests/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/utils.py b/tests/utils.py index f695a4d9639b..6bd008dcfe21 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -236,7 +236,7 @@ def setup_test_homeserver( else: database_config = { "name": "sqlite3", - "args": {"database": "test.db", "cp_min": 1, "cp_max": 1}, + "args": {"database": ":memory:", "cp_min": 1, "cp_max": 1}, } database = DatabaseConnectionConfig("master", database_config) From ce73710f9927951f3a645d7257977581de9b38d2 Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Wed, 12 May 2021 16:14:03 +0100 Subject: [PATCH 16/30] Revert "Modify send_local_online_presence_to to send to local users on non-fed workers" This reverts commit 822f87a8746b022599b32ef97e66c70f6aec8375. --- synapse/module_api/__init__.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/synapse/module_api/__init__.py b/synapse/module_api/__init__.py index 71422be8b48f..71ebbe457e25 100644 --- a/synapse/module_api/__init__.py +++ b/synapse/module_api/__init__.py @@ -400,6 +400,12 @@ async def send_local_online_presence_to(self, users: Iterable[str]) -> None: Note that this method can only be run on the main or federation_sender worker processes. """ + if not self._hs.should_send_federation(): + raise Exception( + "send_local_online_presence_to can only be run " + "on processes that send federation", + ) + local_users = set() remote_users = set() for user in users: @@ -408,12 +414,6 @@ async def send_local_online_presence_to(self, users: Iterable[str]) -> None: else: remote_users.add(user) - if remote_users and not self._hs.should_send_federation(): - raise Exception( - "send_local_online_presence_to can only be called " - "with remote users on processes that send federation", - ) - if local_users: # Force a presence initial_sync for these users next time they sync. await self._store.add_users_to_send_full_presence_to(local_users) From 839f671b564f4fb93c5fbbb9f1f91adb315f8f72 Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Wed, 12 May 2021 16:24:31 +0100 Subject: [PATCH 17/30] Send Server Notices change was required after all --- synapse/rest/admin/server_notice_servlet.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/synapse/rest/admin/server_notice_servlet.py b/synapse/rest/admin/server_notice_servlet.py index cc3ab5854b0b..b5e4c474efc8 100644 --- a/synapse/rest/admin/server_notice_servlet.py +++ b/synapse/rest/admin/server_notice_servlet.py @@ -54,7 +54,6 @@ def __init__(self, hs: "HomeServer"): self.hs = hs self.auth = hs.get_auth() self.txns = HttpTransactionCache(hs) - self.snm = hs.get_server_notices_manager() def register(self, json_resource: HttpServer): PATTERN = "/send_server_notice" @@ -77,7 +76,10 @@ async def on_POST( event_type = body.get("type", EventTypes.Message) state_key = body.get("state_key") - if not self.snm.is_enabled(): + # We grab the server notices manager here as its initialisation has a check for worker processes, + # but worker processes still need to initialise SendServerNoticeServlet (as it is part of the + # admin api). + if not self.hs.get_server_notices_manager().is_enabled(): raise SynapseError(400, "Server notices are not enabled on this server") user_id = body["user_id"] @@ -85,7 +87,7 @@ async def on_POST( if not self.hs.is_mine_id(user_id): raise SynapseError(400, "Server notices can only be sent to local users") - event = await self.snm.send_notice( + event = await self.hs.get_server_notices_manager().send_notice( user_id=body["user_id"], type=event_type, state_key=state_key, From a907e0a76cb8bd661fddb1a455a099bb57da7ec4 Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Wed, 12 May 2021 17:21:10 +0100 Subject: [PATCH 18/30] Switch to using PresenceFederationQueue.send_presence_to_destinations We also switch the worker we're testing with from a generic_worker to one that can write to the presence stream. --- docs/presence_router_module.md | 4 +++- synapse/module_api/__init__.py | 14 +++++++++----- tests/module_api/test_api.py | 33 ++++++++++++++++++++++++++------- 3 files changed, 38 insertions(+), 13 deletions(-) diff --git a/docs/presence_router_module.md b/docs/presence_router_module.md index d6566d978d06..1a6260fd41ca 100644 --- a/docs/presence_router_module.md +++ b/docs/presence_router_module.md @@ -28,7 +28,9 @@ async def ModuleApi.send_local_online_presence_to(users: Iterable[str]) -> None which can be given a list of local or remote MXIDs to broadcast known, online user presence to (for those users that the receiving user is considered interested in). It does not include state for users who are currently offline, and it can only be -called on workers that support sending federation. +called on workers that support sending federation. Additionally, this method must +only be called from the main process, or from a worker that supports writing to +the [presence stream](https://github.com/matrix-org/synapse/blob/master/docs/workers.md#stream-writers). ### Module structure diff --git a/synapse/module_api/__init__.py b/synapse/module_api/__init__.py index 71ebbe457e25..6df59e9e08e5 100644 --- a/synapse/module_api/__init__.py +++ b/synapse/module_api/__init__.py @@ -397,13 +397,17 @@ async def send_local_online_presence_to(self, users: Iterable[str]) -> None: Updates to remote users will be sent immediately, whereas local users will receive them on their next sync attempt. - Note that this method can only be run on the main or federation_sender worker - processes. + Note that this method can only be run on the main or worker processes that have the + ability to write to the presence stream. """ - if not self._hs.should_send_federation(): + if ( + self._hs.config.worker_app + and self._hs._instance_name not in self._hs.config.worker.writers.presence + ): raise Exception( "send_local_online_presence_to can only be run " - "on processes that send federation", + "on processes that have the ability to write to the" + "presence stream (this includes the main process)", ) local_users = set() @@ -430,7 +434,7 @@ async def send_local_online_presence_to(self, users: Iterable[str]) -> None: # We pull out the presence handler here to break a cyclic # dependency between the presence router and module API. presence_handler = self._hs.get_presence_handler() - await presence_handler.maybe_send_presence_to_interested_destinations( + await presence_handler.get_federation_queue().send_presence_to_destinations( presence_events ) diff --git a/tests/module_api/test_api.py b/tests/module_api/test_api.py index 63e06096a754..2c68b9a13c85 100644 --- a/tests/module_api/test_api.py +++ b/tests/module_api/test_api.py @@ -314,6 +314,15 @@ class ModuleApiWorkerTestCase(BaseMultiWorkerStreamTestCase): presence.register_servlets, ] + def default_config(self): + conf = super().default_config() + conf["redis"] = {"enabled": "true"} + conf["stream_writers"] = {"presence": ["presence_writer"]} + conf["instance_map"] = { + "presence_writer": {"host": "testserv", "port": 1001}, + } + return conf + def prepare(self, reactor, clock, homeserver): self.module_api = homeserver.get_module_api() self.sync_handler = homeserver.get_sync_handler() @@ -343,7 +352,9 @@ def _test_sending_local_online_presence_to_local_user( """ if test_with_workers: # Create a worker process to make module_api calls against - worker_hs = test_case.make_worker_hs("synapse.app.generic_worker") + worker_hs = test_case.make_worker_hs( + "synapse.app.generic_worker", {"worker_name": "presence_writer"} + ) # Create a user who will send presence updates test_case.presence_receiver_id = test_case.register_user( @@ -471,13 +482,21 @@ def _test_sending_local_online_presence_to_local_user( test_case.assertEqual(len(presence_updates), 1) # Now trigger sending local online presence. - test_case.get_success( - test_case.module_api.send_local_online_presence_to( - [ - test_case.presence_receiver_id, - ] - ) + d = module_api_to_use.send_local_online_presence_to( + [ + test_case.presence_receiver_id, + ] ) + d = defer.ensureDeferred(d) + + if test_with_workers: + # In order for the required presence_set_state replication request to occur between the + # worker and main process, we need to pump the reactor. Otherwise, the coordinator that + # reads the request on the main process won't do so, and the request will time out. + while not d.called: + test_case.reactor.advance(0.1) + + test_case.get_success(d) # Presence receiver should *not* have received offline state presence_updates, sync_token = sync_presence( From d2f07a7e4f1617a3406b534301e2a4bb4ab89971 Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Wed, 12 May 2021 18:48:06 +0100 Subject: [PATCH 19/30] No need to split a single-use constant out to a separate file --- synapse/appservice/api.py | 3 ++- synapse/util/constants.py | 15 --------------- 2 files changed, 2 insertions(+), 16 deletions(-) delete mode 100644 synapse/util/constants.py diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py index 57a073ac77b7..fe04d7a67293 100644 --- a/synapse/appservice/api.py +++ b/synapse/appservice/api.py @@ -24,7 +24,6 @@ from synapse.http.client import SimpleHttpClient from synapse.types import JsonDict, ThirdPartyInstanceID from synapse.util.caches.response_cache import ResponseCache -from synapse.util.constants import HOUR_IN_MS if TYPE_CHECKING: from synapse.appservice import ApplicationService @@ -47,6 +46,8 @@ "synapse_appservice_api_sent_events", "Number of events sent to the AS", ["service"] ) +HOUR_IN_MS = 60 * 60 * 1000 + APP_SERVICE_PREFIX = "/_matrix/app/unstable" diff --git a/synapse/util/constants.py b/synapse/util/constants.py deleted file mode 100644 index ece1941b06a2..000000000000 --- a/synapse/util/constants.py +++ /dev/null @@ -1,15 +0,0 @@ -# Copyright 2021 The Matrix.org Foundation C.I.C. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -HOUR_IN_MS = 60 * 60 * 1000 From 682e12573f73d5cd99501607587bc60ed4eb6fb1 Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Wed, 12 May 2021 19:05:46 +0100 Subject: [PATCH 20/30] Describe force_notify in docstrings --- synapse/handlers/presence.py | 43 ++++++++++++++++++++++++++++++++---- 1 file changed, 39 insertions(+), 4 deletions(-) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 088d26003323..4d21f0b95fd4 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -228,7 +228,15 @@ async def set_state( ignore_status_msg: bool = False, force_notify: bool = False, ) -> None: - """Set the presence state of the user. """ + """Set the presence state of the user. + + Args: + target_user: The ID of the user to set the presence state of. + state: The presence state as a JSON dictionary. + ignore_status_msg: True to ignore the "status_msg" field of the `state` dict. + If False, the user's current status will be updated. + force_notify: Whether to force notification of the update to clients. + """ @abc.abstractmethod async def bump_presence_active_time(self, user: UserID): @@ -305,6 +313,9 @@ async def resend_current_presence_for_users(self, user_ids: Iterable[str]): Grabs the current presence state for a given set of users and adds it to the top of the presence stream. + This is used to bump the current presence stream ID without actually changing + any user's presence state. + Args: user_ids: The IDs of the users to use. """ @@ -318,7 +329,11 @@ async def resend_current_presence_for_users(self, user_ids: Iterable[str]): "status_message": current_presence_state.status_msg, } - # Copy the presence state to the tip of the presence stream + # Copy the presence state to the tip of the presence stream. + # + # We set force_notify=True here so that this presence update is guaranteed to + # increment the presence stream ID (which resending the current user's presence + # otherwise would not do). await self.set_state(UserID.from_string(user_id), state, force_notify=True) @@ -507,7 +522,15 @@ async def set_state( ignore_status_msg: bool = False, force_notify: bool = False, ) -> None: - """Set the presence state of the user.""" + """Set the presence state of the user. + + Args: + target_user: The ID of the user to set the presence state of. + state: The presence state as a JSON dictionary. + ignore_status_msg: True to ignore the "status_msg" field of the `state` dict. + If False, the user's current status will be updated. + force_notify: Whether to force notification of the update to clients. + """ presence = state["presence"] valid_presence = ( @@ -713,6 +736,10 @@ async def _update_states( Args: new_states: The new user presence state updates to process. + force_notify: Whether to force notifying clients of this presence state update, + even if it doesn't change the state of a user's presence (e.g online -> online). + This is currently used to bump the max presence stream ID without changing any + user's presence (see PresenceHandler.resend_current_presence_for_users). """ now = self.clock.time_msec() @@ -1096,7 +1123,15 @@ async def set_state( ignore_status_msg: bool = False, force_notify: bool = False, ) -> None: - """Set the presence state of the user.""" + """Set the presence state of the user. + + Args: + target_user: The ID of the user to set the presence state of. + state: The presence state as a JSON dictionary. + ignore_status_msg: True to ignore the "status_msg" field of the `state` dict. + If False, the user's current status will be updated. + force_notify: Whether to force notification of the update to clients. + """ status_msg = state.get("status_msg", None) presence = state["presence"] From 6839bcfcabd94829be03ab7e0609de2390d20059 Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Wed, 12 May 2021 19:16:27 +0100 Subject: [PATCH 21/30] Have the handler call the storage method, not the other way around --- synapse/handlers/presence.py | 27 +++++++++++++++------- synapse/module_api/__init__.py | 12 +++++----- synapse/storage/databases/main/presence.py | 9 -------- 3 files changed, 25 insertions(+), 23 deletions(-) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 4d21f0b95fd4..985fc56a310f 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -308,17 +308,28 @@ async def maybe_send_presence_to_interested_destinations( for destinations, states in hosts_and_states: self._federation.send_presence_to_destinations(states, destinations) - async def resend_current_presence_for_users(self, user_ids: Iterable[str]): + async def add_users_to_send_full_presence_to(self, user_ids: Iterable[str]): """ - Grabs the current presence state for a given set of users and adds it - to the top of the presence stream. + Adds to the list of users who should receive a full snapshot of presence + upon their next sync. Note that this only works for local users. - This is used to bump the current presence stream ID without actually changing - any user's presence state. + Then, grabs the current presence state for a given set of users and adds it + to the top of the presence stream. Args: - user_ids: The IDs of the users to use. + user_ids: The IDs of the local users to send full presence to. """ + # Mark the user as receiving full presence on their next sync + await self.store.add_users_to_send_full_presence_to(user_ids) + + # Add a new entry to the presence stream. Since we use stream tokens to determine whether a + # local user should receive a full snapshot of presence when they sync, we need to bump the + # presence stream so that subsequent syncs with no presence activity in between won't result + # in the client receiving multiple full snapshots of presence. + # + # If we bump the stream ID, then the user will get a higher stream token next sync, and thus + # correctly won't receive a second snapshot. + # Get the current presence state for each user (defaults to offline if not found) current_presence_for_users = await self.current_state_for_users(user_ids) @@ -330,7 +341,7 @@ async def resend_current_presence_for_users(self, user_ids: Iterable[str]): } # Copy the presence state to the tip of the presence stream. - # + # We set force_notify=True here so that this presence update is guaranteed to # increment the presence stream ID (which resending the current user's presence # otherwise would not do). @@ -739,7 +750,7 @@ async def _update_states( force_notify: Whether to force notifying clients of this presence state update, even if it doesn't change the state of a user's presence (e.g online -> online). This is currently used to bump the max presence stream ID without changing any - user's presence (see PresenceHandler.resend_current_presence_for_users). + user's presence (see PresenceHandler.add_users_to_send_full_presence_to). """ now = self.clock.time_msec() diff --git a/synapse/module_api/__init__.py b/synapse/module_api/__init__.py index 6df59e9e08e5..f0d64e2f2f30 100644 --- a/synapse/module_api/__init__.py +++ b/synapse/module_api/__init__.py @@ -418,22 +418,22 @@ async def send_local_online_presence_to(self, users: Iterable[str]) -> None: else: remote_users.add(user) + # We pull out the presence handler here to break a cyclic + # dependency between the presence router and module API. + presence_handler = self._hs.get_presence_handler() + if local_users: # Force a presence initial_sync for these users next time they sync. - await self._store.add_users_to_send_full_presence_to(local_users) + await presence_handler.add_users_to_send_full_presence_to(local_users) for user in remote_users: # Retrieve presence state for currently online users that this user - # is considered interested in + # is considered interested in. presence_events, _ = await self._presence_stream.get_new_events( UserID.from_string(user), from_key=None, include_offline=False ) # Send to remote destinations. - - # We pull out the presence handler here to break a cyclic - # dependency between the presence router and module API. - presence_handler = self._hs.get_presence_handler() await presence_handler.get_federation_queue().send_presence_to_destinations( presence_events ) diff --git a/synapse/storage/databases/main/presence.py b/synapse/storage/databases/main/presence.py index 9079b5f59203..669a2af8845a 100644 --- a/synapse/storage/databases/main/presence.py +++ b/synapse/storage/databases/main/presence.py @@ -248,7 +248,6 @@ async def add_users_to_send_full_presence_to(self, user_ids: Iterable[str]): Args: user_ids: An iterable of user IDs. """ - # Add user entries to the table, updating the presence_stream_id column if the user already # exists in the table. await self.db_pool.simple_upsert_many( @@ -267,14 +266,6 @@ async def add_users_to_send_full_presence_to(self, user_ids: Iterable[str]): desc="add_users_to_send_full_presence_to", ) - # Add a new entry to the presence stream. Since we use stream tokens to determine whether a - # local user should receive a full snapshot presence when they sync, we need to bump the - # presence stream so that subsequent syncs with no presence activity in between won't result - # in the client receiving multiple full snapshots of presence. - # If we bump the stream ID, then the user will get a higher stream token next sync, and thus - # won't receive another snapshot. - await self.hs.get_presence_handler().resend_current_presence_for_users(user_ids) - async def get_presence_for_all_users( self, include_offline: bool = True, From 726d7058a8f41563c8da620ac32fec61a7565f7a Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Thu, 13 May 2021 10:21:58 +0100 Subject: [PATCH 22/30] Call send_presence_to_destinations properly --- synapse/module_api/__init__.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/synapse/module_api/__init__.py b/synapse/module_api/__init__.py index f0d64e2f2f30..128ef4d5dcf3 100644 --- a/synapse/module_api/__init__.py +++ b/synapse/module_api/__init__.py @@ -434,8 +434,9 @@ async def send_local_online_presence_to(self, users: Iterable[str]) -> None: ) # Send to remote destinations. - await presence_handler.get_federation_queue().send_presence_to_destinations( - presence_events + destination = UserID.from_string(user).domain + presence_handler.get_federation_queue().send_presence_to_destinations( + presence_events, destination ) From e603a77e5ff4e380fa1c6bdd9a499d54219fbdf4 Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Thu, 13 May 2021 14:58:26 +0100 Subject: [PATCH 23/30] Fix presence router test --- tests/events/test_presence_router.py | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/tests/events/test_presence_router.py b/tests/events/test_presence_router.py index 01d257307c6b..875b0d0a114b 100644 --- a/tests/events/test_presence_router.py +++ b/tests/events/test_presence_router.py @@ -302,11 +302,18 @@ def test_send_local_online_presence_to_with_module(self): ) # Check that the expected presence updates were sent - expected_users = [ + # We explicitly compare using sets as we expect that calling + # module_api.send_local_online_presence_to will create a presence + # update that is a duplicate of the specified user's current presence. + # These are sent to clients and will be picked up below, thus we use a + # set to deduplicate. We're just interested that non-offline updates were + # sent out for each user ID. + expected_users = { self.other_user_id, self.presence_receiving_user_one_id, self.presence_receiving_user_two_id, - ] + } + found_users = set() calls = ( self.hs.get_federation_transport_client().send_transaction.call_args_list @@ -326,12 +333,12 @@ def test_send_local_online_presence_to_with_module(self): # EDUs can contain multiple presence updates for presence_update in edu["content"]["push"]: # Check for presence updates that contain the user IDs we're after - expected_users.remove(presence_update["user_id"]) + found_users.add(presence_update["user_id"]) # Ensure that no offline states are being sent out self.assertNotEqual(presence_update["presence"], "offline") - self.assertEqual(len(expected_users), 0) + self.assertEqual(found_users, expected_users) def send_presence_update( From 0bbca77a9871a9a0c302b2996ac1b13336ec68d2 Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Thu, 13 May 2021 18:15:16 +0100 Subject: [PATCH 24/30] Only bump the max presence stream ID once --- synapse/handlers/presence.py | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 985fc56a310f..02b2165b965e 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -330,22 +330,22 @@ async def add_users_to_send_full_presence_to(self, user_ids: Iterable[str]): # If we bump the stream ID, then the user will get a higher stream token next sync, and thus # correctly won't receive a second snapshot. - # Get the current presence state for each user (defaults to offline if not found) - current_presence_for_users = await self.current_state_for_users(user_ids) - - for user_id, current_presence_state in current_presence_for_users.items(): - # Convert the UserPresenceState object into a serializable dict - state = { - "presence": current_presence_state.state, - "status_message": current_presence_state.status_msg, - } + # Get the current presence state for one of the users (defaults to offline if not found) + user_id = yield user_ids + current_presence_state = await self.get_state(user_id) + + # Convert the UserPresenceState object into a serializable dict + state = { + "presence": current_presence_state.state, + "status_message": current_presence_state.status_msg, + } - # Copy the presence state to the tip of the presence stream. + # Copy the presence state to the tip of the presence stream. - # We set force_notify=True here so that this presence update is guaranteed to - # increment the presence stream ID (which resending the current user's presence - # otherwise would not do). - await self.set_state(UserID.from_string(user_id), state, force_notify=True) + # We set force_notify=True here so that this presence update is guaranteed to + # increment the presence stream ID (which resending the current user's presence + # otherwise would not do). + await self.set_state(UserID.from_string(user_id), state, force_notify=True) class _NullContextManager(ContextManager[None]): From 7b8d008a4167302241883bbd62b46b0e0731e29d Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Thu, 13 May 2021 18:20:32 +0100 Subject: [PATCH 25/30] Clarify notice of where ModuleApi.send_local_online_presece_to can be called from --- docs/presence_router_module.md | 4 +++- synapse/module_api/__init__.py | 8 ++++---- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/docs/presence_router_module.md b/docs/presence_router_module.md index 1a6260fd41ca..d2844915dffe 100644 --- a/docs/presence_router_module.md +++ b/docs/presence_router_module.md @@ -29,8 +29,10 @@ which can be given a list of local or remote MXIDs to broadcast known, online us presence to (for those users that the receiving user is considered interested in). It does not include state for users who are currently offline, and it can only be called on workers that support sending federation. Additionally, this method must -only be called from the main process, or from a worker that supports writing to +only be called from the process that has been configured to write to the the [presence stream](https://github.com/matrix-org/synapse/blob/master/docs/workers.md#stream-writers). +By default, this is the main process, but another worker can be configured to do +so. ### Module structure diff --git a/synapse/module_api/__init__.py b/synapse/module_api/__init__.py index 128ef4d5dcf3..d6138c33f956 100644 --- a/synapse/module_api/__init__.py +++ b/synapse/module_api/__init__.py @@ -397,8 +397,8 @@ async def send_local_online_presence_to(self, users: Iterable[str]) -> None: Updates to remote users will be sent immediately, whereas local users will receive them on their next sync attempt. - Note that this method can only be run on the main or worker processes that have the - ability to write to the presence stream. + Note that this method can only be run on the process that is configured to write to the + presence stream. By default this is the main process. """ if ( self._hs.config.worker_app @@ -406,8 +406,8 @@ async def send_local_online_presence_to(self, users: Iterable[str]) -> None: ): raise Exception( "send_local_online_presence_to can only be run " - "on processes that have the ability to write to the" - "presence stream (this includes the main process)", + "on the process that is configured to write to the " + "presence stream (by default this is the main process)", ) local_users = set() From db3befdb4756b11498a8ca5d1231cb2cbedd9b88 Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Fri, 14 May 2021 17:28:35 +0100 Subject: [PATCH 26/30] Fix iterator/iterable, and passing UserID --- synapse/handlers/presence.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 02b2165b965e..440a3af6b8db 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -331,8 +331,8 @@ async def add_users_to_send_full_presence_to(self, user_ids: Iterable[str]): # correctly won't receive a second snapshot. # Get the current presence state for one of the users (defaults to offline if not found) - user_id = yield user_ids - current_presence_state = await self.get_state(user_id) + user_id = next(iter(user_ids)) + current_presence_state = await self.get_state(UserID.from_string(user_id)) # Convert the UserPresenceState object into a serializable dict state = { From 48bedeabf83f9b9e4cb340540241ab25d9584dfc Mon Sep 17 00:00:00 2001 From: Andrew Morgan <1342360+anoadragon453@users.noreply.github.com> Date: Mon, 17 May 2021 14:39:41 +0100 Subject: [PATCH 27/30] Update synapse/module_api/__init__.py Co-authored-by: Erik Johnston --- synapse/module_api/__init__.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/synapse/module_api/__init__.py b/synapse/module_api/__init__.py index d6138c33f956..dd7b5ec5c427 100644 --- a/synapse/module_api/__init__.py +++ b/synapse/module_api/__init__.py @@ -401,8 +401,7 @@ async def send_local_online_presence_to(self, users: Iterable[str]) -> None: presence stream. By default this is the main process. """ if ( - self._hs.config.worker_app - and self._hs._instance_name not in self._hs.config.worker.writers.presence + self._hs._instance_name not in self._hs.config.worker.writers.presence ): raise Exception( "send_local_online_presence_to can only be run " From a9875aa0581e467a4a967a63d6339325de0fdb5b Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Mon, 17 May 2021 14:57:59 +0100 Subject: [PATCH 28/30] Iterable -> Sequence; lint --- synapse/handlers/presence.py | 7 +++++-- synapse/module_api/__init__.py | 4 +--- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 440a3af6b8db..c2dd73d3578f 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -35,6 +35,7 @@ Iterable, List, Optional, + Sequence, Set, Tuple, Union, @@ -308,7 +309,7 @@ async def maybe_send_presence_to_interested_destinations( for destinations, states in hosts_and_states: self._federation.send_presence_to_destinations(states, destinations) - async def add_users_to_send_full_presence_to(self, user_ids: Iterable[str]): + async def add_users_to_send_full_presence_to(self, user_ids: Sequence[str]): """ Adds to the list of users who should receive a full snapshot of presence upon their next sync. Note that this only works for local users. @@ -319,6 +320,9 @@ async def add_users_to_send_full_presence_to(self, user_ids: Iterable[str]): Args: user_ids: The IDs of the local users to send full presence to. """ + # Retrieve one of the users from the given list before we do anything else + user_id = user_ids[0] + # Mark the user as receiving full presence on their next sync await self.store.add_users_to_send_full_presence_to(user_ids) @@ -331,7 +335,6 @@ async def add_users_to_send_full_presence_to(self, user_ids: Iterable[str]): # correctly won't receive a second snapshot. # Get the current presence state for one of the users (defaults to offline if not found) - user_id = next(iter(user_ids)) current_presence_state = await self.get_state(UserID.from_string(user_id)) # Convert the UserPresenceState object into a serializable dict diff --git a/synapse/module_api/__init__.py b/synapse/module_api/__init__.py index dd7b5ec5c427..f73958f76f1d 100644 --- a/synapse/module_api/__init__.py +++ b/synapse/module_api/__init__.py @@ -400,9 +400,7 @@ async def send_local_online_presence_to(self, users: Iterable[str]) -> None: Note that this method can only be run on the process that is configured to write to the presence stream. By default this is the main process. """ - if ( - self._hs._instance_name not in self._hs.config.worker.writers.presence - ): + if self._hs._instance_name not in self._hs.config.worker.writers.presence: raise Exception( "send_local_online_presence_to can only be run " "on the process that is configured to write to the " From 22e1795c56784e3f0866bcf3cf011ab3acbbd47d Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Mon, 17 May 2021 16:45:45 +0100 Subject: [PATCH 29/30] Perform an empty check, switch back to next(iter(x)) --- synapse/handlers/presence.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index c2dd73d3578f..64be1c391e36 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -35,7 +35,6 @@ Iterable, List, Optional, - Sequence, Set, Tuple, Union, @@ -309,7 +308,7 @@ async def maybe_send_presence_to_interested_destinations( for destinations, states in hosts_and_states: self._federation.send_presence_to_destinations(states, destinations) - async def add_users_to_send_full_presence_to(self, user_ids: Sequence[str]): + async def add_users_to_send_full_presence_to(self, user_ids: Collection[str]): """ Adds to the list of users who should receive a full snapshot of presence upon their next sync. Note that this only works for local users. @@ -320,10 +319,14 @@ async def add_users_to_send_full_presence_to(self, user_ids: Sequence[str]): Args: user_ids: The IDs of the local users to send full presence to. """ - # Retrieve one of the users from the given list before we do anything else - user_id = user_ids[0] + # Retrieve one of the users from the given set + if not user_ids: + raise Exception( + "add_users_to_send_full_presence_to must be called with at least one user" + ) + user_id = next(iter(user_ids)) - # Mark the user as receiving full presence on their next sync + # Mark all users as receiving full presence on their next sync await self.store.add_users_to_send_full_presence_to(user_ids) # Add a new entry to the presence stream. Since we use stream tokens to determine whether a From 68073af27fce37bd4d102094f10431d38b2efd2d Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Mon, 17 May 2021 16:47:41 +0100 Subject: [PATCH 30/30] Don't have the same function name in a Handler and Store class It's good code hygiene to not name functions the exact same in different classes - especially for those people who navigate the code via global search :) --- synapse/handlers/presence.py | 4 ++-- synapse/module_api/__init__.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 64be1c391e36..f5a049d75461 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -308,7 +308,7 @@ async def maybe_send_presence_to_interested_destinations( for destinations, states in hosts_and_states: self._federation.send_presence_to_destinations(states, destinations) - async def add_users_to_send_full_presence_to(self, user_ids: Collection[str]): + async def send_full_presence_to_users(self, user_ids: Collection[str]): """ Adds to the list of users who should receive a full snapshot of presence upon their next sync. Note that this only works for local users. @@ -322,7 +322,7 @@ async def add_users_to_send_full_presence_to(self, user_ids: Collection[str]): # Retrieve one of the users from the given set if not user_ids: raise Exception( - "add_users_to_send_full_presence_to must be called with at least one user" + "send_full_presence_to_users must be called with at least one user" ) user_id = next(iter(user_ids)) diff --git a/synapse/module_api/__init__.py b/synapse/module_api/__init__.py index f73958f76f1d..cecdc96bf517 100644 --- a/synapse/module_api/__init__.py +++ b/synapse/module_api/__init__.py @@ -421,7 +421,7 @@ async def send_local_online_presence_to(self, users: Iterable[str]) -> None: if local_users: # Force a presence initial_sync for these users next time they sync. - await presence_handler.add_users_to_send_full_presence_to(local_users) + await presence_handler.send_full_presence_to_users(local_users) for user in remote_users: # Retrieve presence state for currently online users that this user