Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Async get event cache prep #13242

Merged
merged 15 commits into from
Jul 15, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/13242.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Use an asynchronous cache wrapper for the get event cache. Contributed by Nick @ Beeper (@fizzadar).
4 changes: 2 additions & 2 deletions synapse/storage/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
from synapse.storage.background_updates import BackgroundUpdater
from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine
from synapse.storage.types import Connection, Cursor
from synapse.util.async_helpers import delay_cancellation
from synapse.util.async_helpers import delay_cancellation, maybe_awaitable
from synapse.util.iterutils import batch_iter

if TYPE_CHECKING:
Expand Down Expand Up @@ -818,7 +818,7 @@ async def _runInteraction() -> R:
)

for after_callback, after_args, after_kwargs in after_callbacks:
after_callback(*after_args, **after_kwargs)
await maybe_awaitable(after_callback(*after_args, **after_kwargs))

return cast(R, result)
except Exception:
Expand Down
4 changes: 2 additions & 2 deletions synapse/storage/databases/main/cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ def _invalidate_caches_for_event(
relates_to: Optional[str],
backfilled: bool,
) -> None:
self._invalidate_get_event_cache(event_id)
self._invalidate_local_get_event_cache(event_id)
self.have_seen_event.invalidate((room_id, event_id))

self.get_latest_event_ids_in_room.invalidate((room_id,))
Expand All @@ -208,7 +208,7 @@ def _invalidate_caches_for_event(
self._events_stream_cache.entity_has_changed(room_id, stream_ordering)

if redacts:
self._invalidate_get_event_cache(redacts)
self._invalidate_local_get_event_cache(redacts)
# Caches which might leak edits must be invalidated for the event being
# redacted.
self.get_relations_for_event.invalidate((redacts,))
Expand Down
4 changes: 2 additions & 2 deletions synapse/storage/databases/main/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -1669,9 +1669,9 @@ def _add_to_cache(
if not row["rejects"] and not row["redacts"]:
to_prefill.append(EventCacheEntry(event=event, redacted_event=None))

def prefill() -> None:
async def prefill() -> None:
for cache_entry in to_prefill:
self.store._get_event_cache.set(
await self.store._get_event_cache.set(
(cache_entry.event.event_id,), cache_entry
)

Expand Down
28 changes: 18 additions & 10 deletions synapse/storage/databases/main/events_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@
from synapse.util import unwrapFirstError
from synapse.util.async_helpers import ObservableDeferred, delay_cancellation
from synapse.util.caches.descriptors import cached, cachedList
from synapse.util.caches.lrucache import LruCache
from synapse.util.caches.lrucache import AsyncLruCache
from synapse.util.iterutils import batch_iter
from synapse.util.metrics import Measure

Expand Down Expand Up @@ -238,7 +238,9 @@ def __init__(
5 * 60 * 1000,
)

self._get_event_cache: LruCache[Tuple[str], EventCacheEntry] = LruCache(
self._get_event_cache: AsyncLruCache[
Tuple[str], EventCacheEntry
] = AsyncLruCache(
cache_name="*getEvent*",
max_size=hs.config.caches.event_cache_size,
)
Expand Down Expand Up @@ -617,7 +619,7 @@ async def _get_events_from_cache_or_db(
Returns:
map from event id to result
"""
event_entry_map = self._get_events_from_cache(
event_entry_map = await self._get_events_from_cache(
event_ids,
)

Expand Down Expand Up @@ -729,12 +731,16 @@ async def get_missing_events_from_db() -> Dict[str, EventCacheEntry]:

return event_entry_map

def _invalidate_get_event_cache(self, event_id: str) -> None:
self._get_event_cache.invalidate((event_id,))
async def _invalidate_get_event_cache(self, event_id: str) -> None:
await self._get_event_cache.invalidate((event_id,))
self._invalidate_local_get_event_cache(event_id)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume that doing the invalidations in this order is important. It might be nice to add some comments explaining why and saying how this works.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This means that we end up with two .invalidate calls on the underlying LruCache. It might be better just to:

Suggested change
self._invalidate_local_get_event_cache(event_id)
self._event_ref.pop(event_id, None)
self._current_event_fetches.pop(event_id, None)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


def _invalidate_local_get_event_cache(self, event_id: str) -> None:
self._get_event_cache.lru_cache.invalidate((event_id,))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels like it's poking into the implementation of AsyncLruCache. It might be better to give AsyncLruCache an invalidate_local_only method or something.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added in 037d84f

self._event_ref.pop(event_id, None)
self._current_event_fetches.pop(event_id, None)

def _get_events_from_cache(
async def _get_events_from_cache(
self, events: Iterable[str], update_metrics: bool = True
) -> Dict[str, EventCacheEntry]:
"""Fetch events from the caches.
Expand All @@ -749,7 +755,7 @@ def _get_events_from_cache(

for event_id in events:
# First check if it's in the event cache
ret = self._get_event_cache.get(
ret = await self._get_event_cache.get(
(event_id,), None, update_metrics=update_metrics
)
if ret:
Expand All @@ -771,7 +777,7 @@ def _get_events_from_cache(

# We add the entry back into the cache as we want to keep
# recently queried events in the cache.
self._get_event_cache.set((event_id,), cache_entry)
await self._get_event_cache.set((event_id,), cache_entry)

return event_map

Expand Down Expand Up @@ -1148,7 +1154,7 @@ async def _get_events_from_db(
event=original_ev, redacted_event=redacted_event
)

self._get_event_cache.set((event_id,), cache_entry)
await self._get_event_cache.set((event_id,), cache_entry)
result_map[event_id] = cache_entry

if not redacted_event:
Expand Down Expand Up @@ -1382,7 +1388,9 @@ async def _have_seen_events_dict(
# if the event cache contains the event, obviously we've seen it.

cache_results = {
(rid, eid) for (rid, eid) in keys if self._get_event_cache.contains((eid,))
(rid, eid)
for (rid, eid) in keys
if await self._get_event_cache.contains((eid,))
}
results = dict.fromkeys(cache_results, True)
remaining = [k for k in keys if k not in cache_results]
Expand Down
2 changes: 1 addition & 1 deletion synapse/storage/databases/main/purge_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ def _purge_history_txn(
self._invalidate_cache_and_stream(
txn, self.have_seen_event, (room_id, event_id)
)
self._invalidate_get_event_cache(event_id)
txn.call_after(self._invalidate_get_event_cache, event_id)

logger.info("[purge] done")

Expand Down
4 changes: 3 additions & 1 deletion synapse/storage/databases/main/roommember.py
Original file line number Diff line number Diff line change
Expand Up @@ -777,7 +777,9 @@ async def _get_joined_users_from_context(
# We don't update the event cache hit ratio as it completely throws off
# the hit ratio counts. After all, we don't populate the cache if we
# miss it here
event_map = self._get_events_from_cache(member_event_ids, update_metrics=False)
event_map = await self._get_events_from_cache(
member_event_ids, update_metrics=False
)

missing_member_event_ids = []
for event_id in member_event_ids:
Expand Down
28 changes: 28 additions & 0 deletions synapse/util/caches/lrucache.py
Original file line number Diff line number Diff line change
Expand Up @@ -730,3 +730,31 @@ def __del__(self) -> None:
# This happens e.g. in the sync code where we have an expiring cache of
# lru caches.
self.clear()


class AsyncLruCache(Generic[KT, VT]):
"""
An asynchronous wrapper around a subset of the LruCache API. On it's own
this doesn't change the behaviour but allows subclasses that utilize
external cache systems that require await behaviour to be created.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
An asynchronous wrapper around a subset of the LruCache API. On it's own
this doesn't change the behaviour but allows subclasses that utilize
external cache systems that require await behaviour to be created.
An asynchronous wrapper around a subset of the LruCache API.
On its own this doesn't change the behaviour but allows subclasses that
utilize external cache systems that require await behaviour to be created.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"""

def __init__(self, *args, **kwargs): # type: ignore
self.lru_cache: LruCache[KT, VT] = LruCache(*args, **kwargs)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we name this to make it clear it is a private attribute?

Suggested change
self.lru_cache: LruCache[KT, VT] = LruCache(*args, **kwargs)
self._lru_cache: LruCache[KT, VT] = LruCache(*args, **kwargs)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


async def get(
self, key: KT, default: Optional[T] = None, update_metrics: bool = True
) -> Optional[VT]:
return self.lru_cache.get(key, update_metrics=update_metrics)

async def set(self, key: KT, value: VT) -> None:
self.lru_cache.set(key, value)

async def invalidate(self, key: KT) -> None:
return self.lru_cache.invalidate(key)

async def contains(self, key: KT) -> bool:
return self.lru_cache.contains(key)

def clear(self) -> None:
self.lru_cache.clear()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is only used in testing, is there a good way to enforce that?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not easily, but maybe we should just make it work properly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep! 8e06803

2 changes: 1 addition & 1 deletion tests/storage/test_purge.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,6 @@ def test_purge_room(self):
)

# The events aren't found.
self.store._invalidate_get_event_cache(create_event.event_id)
self.store._invalidate_local_get_event_cache(create_event.event_id)
self.get_failure(self.store.get_event(create_event.event_id), NotFoundError)
self.get_failure(self.store.get_event(first["event_id"]), NotFoundError)