Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Prune old rows in user_ips tables. #6098

Merged
merged 8 commits into from
Sep 26, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/6098.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add support for pruning old rows in `user_ips` table.
6 changes: 6 additions & 0 deletions docs/sample_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,12 @@ listeners:
#
redaction_retention_period: 7d

# How long to track users' last seen time and IPs in the database.
#
# Defaults to `28d`. Set to `null` to disable clearing out of old rows.
#
#user_ips_max_age: 14d


## TLS ##

Expand Down
13 changes: 13 additions & 0 deletions synapse/config/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,13 @@ def read_config(self, config, **kwargs):
else:
self.redaction_retention_period = None

# How long to keep entries in the `users_ips` table.
user_ips_max_age = config.get("user_ips_max_age", "28d")
if user_ips_max_age is not None:
self.user_ips_max_age = self.parse_duration(user_ips_max_age)
else:
self.user_ips_max_age = None

# Options to disable HS
self.hs_disabled = config.get("hs_disabled", False)
self.hs_disabled_message = config.get("hs_disabled_message", "")
Expand Down Expand Up @@ -736,6 +743,12 @@ def generate_config_section(
# Defaults to `7d`. Set to `null` to disable.
#
redaction_retention_period: 7d

# How long to track users' last seen time and IPs in the database.
#
# Defaults to `28d`. Set to `null` to disable clearing out of old rows.
#
#user_ips_max_age: 14d
"""
% locals()
)
Expand Down
33 changes: 31 additions & 2 deletions synapse/metrics/background_process_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

import logging
import threading
from asyncio import iscoroutine
from functools import wraps

import six

Expand Down Expand Up @@ -173,7 +175,7 @@ def run_as_background_process(desc, func, *args, **kwargs):

Args:
desc (str): a description for this background process type
func: a function, which may return a Deferred
func: a function, which may return a Deferred or a coroutine
args: positional args for func
kwargs: keyword args for func

Expand All @@ -197,7 +199,17 @@ def run():
_background_processes.setdefault(desc, set()).add(proc)

try:
yield func(*args, **kwargs)
result = func(*args, **kwargs)

# We probably don't have an ensureDeferred in our call stack to handle
# coroutine results, so we need to ensureDeferred here.
#
# But we need this check because ensureDeferred doesn't like being
# called on immediate values (as opposed to Deferreds or coroutines).
if iscoroutine(result):
result = defer.ensureDeferred(result)

return (yield result)
except Exception:
logger.exception("Background process '%s' threw an exception", desc)
finally:
Expand All @@ -208,3 +220,20 @@ def run():

with PreserveLoggingContext():
return run()


def wrap_as_background_process(desc):
"""Decorator that wraps a function that gets called as a background
process.

Equivalent of calling the function with `run_as_background_process`
"""

def wrap_as_background_process_inner(func):
@wraps(func)
def wrap_as_background_process_inner_2(*args, **kwargs):
return run_as_background_process(desc, func, *args, **kwargs)

return wrap_as_background_process_inner_2

return wrap_as_background_process_inner
22 changes: 21 additions & 1 deletion synapse/storage/background_updates.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,14 +140,34 @@ def has_completed_background_updates(self):
"background_updates",
keyvalues=None,
retcol="1",
desc="check_background_updates",
desc="has_completed_background_updates",
)
if not updates:
self._all_done = True
return True

return False

async def has_completed_background_update(self, update_name) -> bool:
"""Check if the given background update has finished running.
"""

if self._all_done:
return True

if update_name in self._background_update_queue:
return False

update_exists = await self._simple_select_one_onecol(
"background_updates",
keyvalues={"update_name": update_name},
retcol="1",
desc="has_completed_background_update",
allow_none=True,
)

return not update_exists

@defer.inlineCallbacks
def do_next_background_update(self, desired_duration_ms):
"""Does some amount of work on the next queued background update
Expand Down
62 changes: 54 additions & 8 deletions synapse/storage/client_ips.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

from twisted.internet import defer

from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.metrics.background_process_metrics import wrap_as_background_process
from synapse.util.caches import CACHE_SIZE_FACTOR

from . import background_updates
Expand All @@ -42,6 +42,8 @@ def __init__(self, db_conn, hs):

super(ClientIpStore, self).__init__(db_conn, hs)

self.user_ips_max_age = hs.config.user_ips_max_age

self.register_background_index_update(
"user_ips_device_index",
index_name="user_ips_device_id",
Expand Down Expand Up @@ -100,6 +102,9 @@ def __init__(self, db_conn, hs):
"before", "shutdown", self._update_client_ips_batch
)

if self.user_ips_max_age:
self._clock.looping_call(self._prune_old_user_ips, 5 * 1000)

@defer.inlineCallbacks
def _remove_user_ip_nonunique(self, progress, batch_size):
def f(conn):
Expand Down Expand Up @@ -319,20 +324,19 @@ def insert_client_ip(

self._batch_row_update[key] = (user_agent, device_id, now)

@wrap_as_background_process("update_client_ips")
def _update_client_ips_batch(self):

# If the DB pool has already terminated, don't try updating
if not self.hs.get_db_pool().running:
return

def update():
to_update = self._batch_row_update
self._batch_row_update = {}
return self.runInteraction(
"_update_client_ips_batch", self._update_client_ips_batch_txn, to_update
)
to_update = self._batch_row_update
self._batch_row_update = {}

return run_as_background_process("update_client_ips", update)
return self.runInteraction(
"_update_client_ips_batch", self._update_client_ips_batch_txn, to_update
)

def _update_client_ips_batch_txn(self, txn, to_update):
if "user_ips" in self._unsafe_to_upsert_tables or (
Expand Down Expand Up @@ -496,3 +500,45 @@ def _devices_last_seen_update_txn(txn):
yield self._end_background_update("devices_last_seen")

return updated

@wrap_as_background_process("prune_old_user_ips")
async def _prune_old_user_ips(self):
"""Removes entries in user IPs older than the configured period.
"""

if self.user_ips_max_age is None:
# Nothing to do
return

if not await self.has_completed_background_update("devices_last_seen"):
# Only start pruning if we have finished populating the devices
# last seen info.
return

# We do a slightly funky SQL delete to ensure we don't try and delete
# too much at once (as the table may be very large from before we
# started pruning).
#
# This works by finding the max last_seen that is less than the given
# time, but has no more than N rows before it, deleting all rows with
# a lesser last_seen time. (We COALESCE so that the sub-SELECT always
# returns exactly one row).
sql = """
DELETE FROM user_ips
WHERE last_seen <= (
SELECT COALESCE(MAX(last_seen), -1)
FROM (
SELECT last_seen FROM user_ips
WHERE last_seen <= ?
ORDER BY last_seen ASC
LIMIT 5000
) AS u
)
"""

timestamp = self.clock.time_msec() - self.user_ips_max_age

def _prune_old_user_ips_txn(txn):
txn.execute(sql, (timestamp,))

await self.runInteraction("_prune_old_user_ips", _prune_old_user_ips_txn)
71 changes: 71 additions & 0 deletions tests/storage/test_client_ips.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,77 @@ def test_devices_last_seen_bg_update(self):
r,
)

def test_old_user_ips_pruned(self):
# First make sure we have completed all updates.
while not self.get_success(self.store.has_completed_background_updates()):
self.get_success(self.store.do_next_background_update(100), by=0.1)

# Insert a user IP
user_id = "@user:id"
self.get_success(
self.store.insert_client_ip(
user_id, "access_token", "ip", "user_agent", "device_id"
)
)

# Force persisting to disk
self.reactor.advance(200)

# We should see that in the DB
result = self.get_success(
self.store._simple_select_list(
table="user_ips",
keyvalues={"user_id": user_id},
retcols=["access_token", "ip", "user_agent", "device_id", "last_seen"],
desc="get_user_ip_and_agents",
)
)

self.assertEqual(
result,
[
{
"access_token": "access_token",
"ip": "ip",
"user_agent": "user_agent",
"device_id": "device_id",
"last_seen": 0,
}
],
)

# Now advance by a couple of months
self.reactor.advance(60 * 24 * 60 * 60)

# We should get no results.
result = self.get_success(
self.store._simple_select_list(
table="user_ips",
keyvalues={"user_id": user_id},
retcols=["access_token", "ip", "user_agent", "device_id", "last_seen"],
desc="get_user_ip_and_agents",
)
)

self.assertEqual(result, [])

# But we should still get the correct values for the device
result = self.get_success(
self.store.get_last_client_ip_by_device(user_id, "device_id")
)

r = result[(user_id, "device_id")]
self.assertDictContainsSubset(
{
"user_id": user_id,
"device_id": "device_id",
"ip": "ip",
"user_agent": "user_agent",
"last_seen": 0,
},
r,
)


class ClientIpAuthTestCase(unittest.HomeserverTestCase):

Expand Down