-
-
Notifications
You must be signed in to change notification settings - Fork 2.1k
Fix unread counts on large servers #13140
Changes from 2 commits
b855e10
479ad28
6da1660
2a3f686
a8c1583
5cd2dce
9edec7e
4421a0e
e81fa1f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
Fix unread counts for users on large servers. Introduced in v1.62.0rc1. |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -861,11 +861,13 @@ def _handle_new_receipts_for_notifs_txn(self, txn: LoggingTransaction) -> bool: | |
retcol="stream_id", | ||
) | ||
|
||
max_receipts_stream_id = self._receipts_id_gen.get_current_token() | ||
|
||
sql = """ | ||
SELECT r.stream_id, r.room_id, r.user_id, e.stream_ordering | ||
FROM receipts_linearized AS r | ||
INNER JOIN events AS e USING (event_id) | ||
WHERE r.stream_id > ? AND user_id LIKE ? | ||
WHERE ? < r.stream_id AND r.stream_id <= ? AND user_id LIKE ? | ||
ORDER BY r.stream_id ASC | ||
LIMIT ? | ||
""" | ||
|
@@ -884,6 +886,13 @@ def _handle_new_receipts_for_notifs_txn(self, txn: LoggingTransaction) -> bool: | |
) | ||
rows = txn.fetchall() | ||
|
||
old_rotate_stream_ordering = self.db_pool.simple_select_one_onecol_txn( | ||
txn, | ||
table="event_push_summary_stream_ordering", | ||
keyvalues={}, | ||
retcol="stream_ordering", | ||
) | ||
|
||
# For each new read receipt we delete push actions from before it and | ||
# recalculate the summary. | ||
for _, room_id, user_id, stream_ordering in rows: | ||
|
@@ -902,13 +911,6 @@ def _handle_new_receipts_for_notifs_txn(self, txn: LoggingTransaction) -> bool: | |
(room_id, user_id, stream_ordering), | ||
) | ||
|
||
old_rotate_stream_ordering = self.db_pool.simple_select_one_onecol_txn( | ||
txn, | ||
table="event_push_summary_stream_ordering", | ||
keyvalues={}, | ||
retcol="stream_ordering", | ||
) | ||
|
||
notif_count, unread_count = self._get_notif_unread_count_for_user_room( | ||
txn, room_id, user_id, stream_ordering, old_rotate_stream_ordering | ||
) | ||
|
@@ -927,18 +929,16 @@ def _handle_new_receipts_for_notifs_txn(self, txn: LoggingTransaction) -> bool: | |
|
||
# We always update `event_push_summary_last_receipt_stream_id` to | ||
# ensure that we don't rescan the same receipts for remote users. | ||
# | ||
# This requires repeatable read to be safe, as we need the | ||
# `MAX(stream_id)` to not include any new rows that have been committed | ||
# since the start of the transaction (since those rows won't have been | ||
# returned by the query above). Alternatively we could query the max | ||
# stream ID at the start of the transaction and bound everything by | ||
# that. | ||
Comment on lines
-931
to
-936
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what was the reason for removing this comment, ooi? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We're no longer reading the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ah duh yes. thanks. |
||
txn.execute( | ||
""" | ||
UPDATE event_push_summary_last_receipt_stream_id | ||
SET stream_id = (SELECT COALESCE(MAX(stream_id), 0) FROM receipts_linearized) | ||
""" | ||
|
||
upper_limit = max_receipts_stream_id | ||
if len(rows) >= limit: | ||
upper_limit = rows[-1][0] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It'd be good to have a quick comment explaining what this is doing. I believe it's the crux of the fix here - limiting how far we update the last receipt stream ID in case we didn't process all of them? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
|
||
self.db_pool.simple_update_txn( | ||
txn, | ||
table="event_push_summary_last_receipt_stream_id", | ||
keyvalues={}, | ||
updatevalues={"stream_id": upper_limit}, | ||
) | ||
|
||
return len(rows) < limit | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I moved this out of the loop as no point requerying every iteration