Skip to content

Commit

Permalink
event cache: listen to all the room updates at once!
Browse files Browse the repository at this point in the history
  • Loading branch information
bnjbvr committed Feb 15, 2024
1 parent a6c1333 commit 11074d8
Show file tree
Hide file tree
Showing 3 changed files with 144 additions and 94 deletions.
228 changes: 139 additions & 89 deletions crates/matrix-sdk-ui/src/event_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@

use std::{collections::BTreeMap, fmt::Debug, sync::Arc};

use matrix_sdk::{sync::RoomUpdate, Client, Room};
use matrix_sdk::{Client, Room};
use matrix_sdk_base::{
deserialized_responses::{AmbiguityChange, SyncTimelineEvent},
sync::{JoinedRoomUpdate, LeftRoomUpdate, Timeline},
Expand All @@ -54,10 +54,13 @@ use ruma::{
};
use tokio::{
spawn,
sync::broadcast::{error::RecvError, Receiver, Sender},
sync::{
broadcast::{error::RecvError, Receiver, Sender},
RwLock,
},
task::JoinHandle,
};
use tracing::{debug, error, trace};
use tracing::{error, trace};

use self::store::{EventCacheStore, MemoryStore};

Expand All @@ -75,11 +78,17 @@ pub enum EventCacheError {
pub type Result<T> = std::result::Result<T, EventCacheError>;

/// Hold handles to the tasks spawn by a [`RoomEventCache`].
struct RoomCacheDropHandles {
pub struct EventCacheDropHandles {
listen_updates_task: JoinHandle<()>,
}

impl Drop for RoomCacheDropHandles {
impl Debug for EventCacheDropHandles {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("EventCacheDropHandles").finish_non_exhaustive()
}
}

impl Drop for EventCacheDropHandles {
fn drop(&mut self) {
self.listen_updates_task.abort();
}
Expand All @@ -89,12 +98,9 @@ impl Drop for RoomCacheDropHandles {
///
/// See also the module-level comment.
pub struct EventCache {
/// Reference to the client used to navigate this cache.
client: Client,
/// Lazily-filled cache of live [`RoomEventCache`], once per room.
by_room: BTreeMap<OwnedRoomId, RoomEventCache>,
/// Backend used for storage.
store: Arc<dyn EventCacheStore>,
inner: Arc<RwLock<EventCacheInner>>,

drop_handles: Arc<EventCacheDropHandles>,
}

impl Debug for EventCache {
Expand All @@ -106,26 +112,97 @@ impl Debug for EventCache {
impl EventCache {
/// Create a new [`EventCache`] for the given client.
pub fn new(client: Client) -> Self {
let mut room_updates_feed = client.subscribe_to_all_room_updates();

let store = Arc::new(MemoryStore::new());
Self { client, by_room: Default::default(), store }
let inner =
Arc::new(RwLock::new(EventCacheInner { client, by_room: Default::default(), store }));

// Spawn the task that will listen to all the room updates at once.
trace!("Spawning the listen task");
let listen_updates_task = spawn({
let inner = inner.clone();

async move {
loop {
match room_updates_feed.recv().await {
Ok(updates) => {
// We received some room updates. Handle them.

// Left rooms.
for (room_id, left_room_update) in updates.leave {
let room = match inner.write().await.for_room(&room_id).await {
Ok(room) => room,
Err(err) => {
error!("can't get left room {room_id}: {err}");
continue;
}
};

if let Err(err) =
room.inner.handle_left_room_update(left_room_update).await
{
error!("handling left room update: {err}");
}
}

// Joined rooms.
for (room_id, joined_room_update) in updates.join {
let room = match inner.write().await.for_room(&room_id).await {
Ok(room) => room,
Err(err) => {
error!("can't get joined room {room_id}: {err}");
continue;
}
};

if let Err(err) =
room.inner.handle_joined_room_update(joined_room_update).await
{
error!("handling joined room update: {err}");
}
}

// Invited rooms.
// TODO: we don't anything with `updates.invite` at
// this point.
}

Err(RecvError::Lagged(_)) => {
// Forget everything we know; we could have missed events, and we have
// no way to reconcile at the moment!
// TODO: implement Smart Matching™,
let mut inner = inner.write().await;
for room_id in inner.by_room.keys() {
if let Err(err) = inner.store.clear_room_events(room_id).await {
error!("unable to clear room after room updates lag: {err}");
}
}
inner.by_room.clear();
}

Err(RecvError::Closed) => {
// The sender has shut down, exit.
break;
}
}
}
}
});

Self { inner, drop_handles: Arc::new(EventCacheDropHandles { listen_updates_task }) }
}

/// Return a room-specific view over the [`EventCache`].
///
/// It may not be found, if the room isn't known to the client.
pub fn for_room(&mut self, room_id: &RoomId) -> Result<RoomEventCache> {
match self.by_room.get(room_id) {
Some(room) => Ok(room.clone()),
None => {
let room = self
.client
.get_room(room_id)
.ok_or_else(|| EventCacheError::RoomNotFound(room_id.to_owned()))?;
let room_event_cache = RoomEventCache::new(room, self.store.clone());
self.by_room.insert(room_id.to_owned(), room_event_cache.clone());
Ok(room_event_cache)
}
}
pub async fn for_room(
&self,
room_id: &RoomId,
) -> Result<(RoomEventCache, Arc<EventCacheDropHandles>)> {
let room = self.inner.write().await.for_room(room_id).await?;

Ok((room, self.drop_handles.clone()))
}

/// Add an initial set of events to the event cache, reloaded from a cache.
Expand All @@ -137,20 +214,51 @@ impl EventCache {
room_id: &RoomId,
events: Vec<SyncTimelineEvent>,
) -> Result<()> {
let room_cache = self.for_room(room_id)?;
let room_cache = self.inner.write().await.for_room(room_id).await?;
room_cache.inner.append_events(events).await?;
Ok(())
}
}

struct EventCacheInner {
/// Reference to the client used to navigate this cache.
client: Client,

/// Lazily-filled cache of live [`RoomEventCache`], once per room.
by_room: BTreeMap<OwnedRoomId, RoomEventCache>,

/// Backend used for storage.
store: Arc<dyn EventCacheStore>,
}

impl EventCacheInner {
/// Return a room-specific view over the [`EventCache`].
///
/// It may not be found, if the room isn't known to the client.
async fn for_room(&mut self, room_id: &RoomId) -> Result<RoomEventCache> {
match self.by_room.get(room_id) {
Some(room) => Ok(room.clone()),
None => {
let room = self
.client
.get_room(room_id)
.ok_or_else(|| EventCacheError::RoomNotFound(room_id.to_owned()))?;
let room_event_cache = RoomEventCache::new(room, self.store.clone());

self.by_room.insert(room_id.to_owned(), room_event_cache.clone());

Ok(room_event_cache)
}
}
}
}

/// A subset of an event cache, for a room.
///
/// Cloning is shallow, and thus is cheap to do.
#[derive(Clone)]
pub struct RoomEventCache {
inner: Arc<RoomEventCacheInner>,

_drop_handles: Arc<RoomCacheDropHandles>,
}

impl Debug for RoomEventCache {
Expand All @@ -162,8 +270,7 @@ impl Debug for RoomEventCache {
impl RoomEventCache {
/// Create a new [`RoomEventCache`] using the given room and store.
fn new(room: Room, store: Arc<dyn EventCacheStore>) -> Self {
let (inner, drop_handles) = RoomEventCacheInner::new(room, store);
Self { inner, _drop_handles: drop_handles }
Self { inner: Arc::new(RoomEventCacheInner::new(room, store)) }
}

/// Subscribe to room updates for this room, after getting the initial list
Expand All @@ -189,14 +296,9 @@ struct RoomEventCacheInner {
impl RoomEventCacheInner {
/// Creates a new cache for a room, and subscribes to room updates, so as
/// to handle new timeline events.
fn new(room: Room, store: Arc<dyn EventCacheStore>) -> (Arc<Self>, Arc<RoomCacheDropHandles>) {
fn new(room: Room, store: Arc<dyn EventCacheStore>) -> Self {
let sender = Sender::new(32);

let room_cache = Arc::new(Self { room, store, sender });

let listen_updates_task = spawn(Self::listen_task(room_cache.clone()));

(room_cache, Arc::new(RoomCacheDropHandles { listen_updates_task }))
Self { room, store, sender }
}

async fn handle_joined_room_update(&self, updates: JoinedRoomUpdate) -> Result<()> {
Expand Down Expand Up @@ -250,58 +352,6 @@ impl RoomEventCacheInner {
Ok(())
}

async fn listen_task(this: Arc<Self>) {
// TODO for prototyping, i'm spawning a new task to get the room updates.
// Ideally we'd have something like the whole sync update, a generalisation of
// the room update.
trace!("Spawning the listen task");

let mut update_receiver = this.room.client().subscribe_to_room_updates(this.room.room_id());

loop {
match update_receiver.recv().await {
Ok(update) => {
trace!("Listen task received an update");

match update {
RoomUpdate::Left { updates, .. } => {
if let Err(err) = this.handle_left_room_update(updates).await {
error!("handling left room update: {err}");
}
}
RoomUpdate::Joined { updates, .. } => {
if let Err(err) = this.handle_joined_room_update(updates).await {
error!("handling joined room update: {err}");
}
}
RoomUpdate::Invited { .. } => {
// We don't do anything for invited rooms at this
// point. TODO should
// we?
}
}
}

Err(RecvError::Closed) => {
// The loop terminated successfully.
debug!("Listen task closed");
break;
}

Err(RecvError::Lagged(_)) => {
// Since we've lagged behind updates to this room, we might be out of
// sync with the events, leading to potentially lost events. Play it
// safe here, and clear the cache. It's fine because we can retrigger
// backpagination from the last event at any time, if needs be.
debug!("Listen task lagged, clearing room");
if let Err(err) = this.store.clear_room_events(this.room.room_id()).await {
error!("unable to clear room after room updates lag: {err}");
}
}
}
}
}

/// Append a set of events to the room cache and storage, notifying
/// observers.
async fn append_events(&self, events: Vec<SyncTimelineEvent>) -> Result<()> {
Expand Down
6 changes: 3 additions & 3 deletions crates/matrix-sdk-ui/src/timeline/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,9 +132,9 @@ impl TimelineBuilder {
)
)]
pub async fn build(self) -> crate::event_cache::Result<Timeline> {
let Self { room, mut event_cache, prev_token, settings } = self;
let Self { room, event_cache, prev_token, settings } = self;

let room_event_cache = event_cache.for_room(room.room_id())?;
let (room_event_cache, event_cache_drop) = event_cache.for_room(room.room_id()).await?;
let (events, mut event_subscriber) = room_event_cache.subscribe().await?;

let has_events = !events.is_empty();
Expand Down Expand Up @@ -309,7 +309,7 @@ impl TimelineBuilder {
room_update_join_handle,
ignore_user_list_update_join_handle,
room_key_from_backups_join_handle,
_event_cache: room_event_cache,
_event_cache_drop_handle: event_cache_drop,
}),
};

Expand Down
4 changes: 2 additions & 2 deletions crates/matrix-sdk-ui/src/timeline/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ use tokio::sync::{mpsc::Sender, Mutex, Notify};
use tracing::{debug, error, info, instrument, trace, warn};

use self::futures::SendAttachment;
use crate::event_cache::RoomEventCache;
use crate::event_cache::EventCacheDropHandles;

mod builder;
mod error;
Expand Down Expand Up @@ -816,7 +816,7 @@ struct TimelineDropHandle {
room_update_join_handle: JoinHandle<()>,
ignore_user_list_update_join_handle: JoinHandle<()>,
room_key_from_backups_join_handle: JoinHandle<()>,
_event_cache: RoomEventCache,
_event_cache_drop_handle: Arc<EventCacheDropHandles>,
}

impl Drop for TimelineDropHandle {
Expand Down

0 comments on commit 11074d8

Please sign in to comment.