Skip to content

Commit

Permalink
Merge pull request #3068 from Hywan/feat-roomlist-sorting
Browse files Browse the repository at this point in the history
feat(base): Implement `Client::rooms_stream`
  • Loading branch information
Hywan authored Jun 19, 2024
2 parents 565f974 + 717c68d commit 6340eea
Show file tree
Hide file tree
Showing 6 changed files with 427 additions and 20 deletions.
15 changes: 14 additions & 1 deletion crates/matrix-sdk-base/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ use std::{
use std::{ops::Deref, sync::Arc};

use eyeball::{SharedObservable, Subscriber};
#[cfg(not(target_arch = "wasm32"))]
use eyeball_im::{Vector, VectorDiff};
#[cfg(not(target_arch = "wasm32"))]
use futures_util::Stream;
use matrix_sdk_common::instant::Instant;
#[cfg(feature = "e2e-encryption")]
use matrix_sdk_crypto::{
Expand Down Expand Up @@ -169,6 +173,13 @@ impl BaseClient {
self.store.rooms_filtered(filter)
}

/// Get a stream of all the rooms changes, in addition to the existing
/// rooms.
#[cfg(not(target_arch = "wasm32"))]
pub fn rooms_stream(&self) -> (Vector<Room>, impl Stream<Item = Vec<VectorDiff<Room>>>) {
self.store.rooms_stream()
}

/// Lookup the Room for the given RoomId, or create one, if it didn't exist
/// yet in the store
pub fn get_or_create_room(&self, room_id: &RoomId, room_state: RoomState) -> Room {
Expand Down Expand Up @@ -1668,6 +1679,8 @@ mod tests {
#[cfg(all(feature = "e2e-encryption", feature = "experimental-sliding-sync"))]
#[async_test]
async fn test_when_there_are_no_latest_encrypted_events_decrypting_them_does_nothing() {
use crate::StateChanges;

// Given a room
let user_id = user_id!("@u:u.to");
let room_id = room_id!("!r:u.to");
Expand All @@ -1679,7 +1692,7 @@ mod tests {
assert!(room.latest_event().is_none());

// When I tell it to do some decryption
let mut changes = crate::StateChanges::default();
let mut changes = StateChanges::default();
client.decrypt_latest_events(&room, &mut changes).await;

// Then nothing changed
Expand Down
2 changes: 1 addition & 1 deletion crates/matrix-sdk-base/src/latest_event.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Utilities for working with events to decide whether they are suitable for
//! use as a [crate::Room::latest_event].
#![cfg(feature = "experimental-sliding-sync")]
#![cfg(any(feature = "e2e-encryption", feature = "experimental-sliding-sync"))]

use matrix_sdk_common::deserialized_responses::SyncTimelineEvent;
#[cfg(feature = "e2e-encryption")]
Expand Down
55 changes: 37 additions & 18 deletions crates/matrix-sdk-base/src/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,22 @@ use std::{
sync::{Arc, RwLock as StdRwLock},
};

#[cfg(not(target_arch = "wasm32"))]
use eyeball_im::{Vector, VectorDiff};
#[cfg(not(target_arch = "wasm32"))]
use futures_util::Stream;
use once_cell::sync::OnceCell;

#[cfg(any(test, feature = "testing"))]
#[macro_use]
pub mod integration_tests;
mod observable_map;
mod traits;

#[cfg(feature = "e2e-encryption")]
use matrix_sdk_crypto::store::{DynCryptoStore, IntoCryptoStore};
pub use matrix_sdk_store_encryption::Error as StoreEncryptionError;
use observable_map::ObservableMap;
use ruma::{
events::{
presence::PresenceEvent,
Expand Down Expand Up @@ -139,7 +145,7 @@ pub(crate) struct Store {
/// The current sync token that should be used for the next sync call.
pub(super) sync_token: Arc<RwLock<Option<String>>>,
/// All rooms the store knows about.
rooms: Arc<StdRwLock<BTreeMap<OwnedRoomId, Room>>>,
rooms: Arc<StdRwLock<ObservableMap<OwnedRoomId, Room>>>,
/// A lock to synchronize access to the store, such that data by the sync is
/// never overwritten.
sync_lock: Arc<Mutex<()>>,
Expand All @@ -152,7 +158,7 @@ impl Store {
inner,
session_meta: Default::default(),
sync_token: Default::default(),
rooms: Default::default(),
rooms: Arc::new(StdRwLock::new(ObservableMap::new())),
sync_lock: Default::default(),
}
}
Expand All @@ -173,15 +179,22 @@ impl Store {
session_meta: SessionMeta,
roominfo_update_sender: &broadcast::Sender<RoomInfoUpdate>,
) -> Result<()> {
for info in self.inner.get_room_infos().await? {
let room = Room::restore(
&session_meta.user_id,
self.inner.clone(),
info,
roominfo_update_sender.clone(),
);

self.rooms.write().unwrap().insert(room.room_id().to_owned(), room);
{
let room_infos = self.inner.get_room_infos().await?;

let mut rooms = self.rooms.write().unwrap();

for room_info in room_infos {
let new_room = Room::restore(
&session_meta.user_id,
self.inner.clone(),
room_info,
roominfo_update_sender.clone(),
);
let new_room_id = new_room.room_id().to_owned();

rooms.insert(new_room_id, new_room);
}
}

let token =
Expand All @@ -200,7 +213,7 @@ impl Store {

/// Get all the rooms this store knows about.
pub fn rooms(&self) -> Vec<Room> {
self.rooms.read().unwrap().values().cloned().collect()
self.rooms.read().unwrap().iter().cloned().collect()
}

/// Get all the rooms this store knows about, filtered by state.
Expand All @@ -209,18 +222,25 @@ impl Store {
.read()
.unwrap()
.iter()
.filter(|(_, room)| filter.matches(room.state()))
.map(|(_, room)| room.clone())
.filter(|room| filter.matches(room.state()))
.cloned()
.collect()
}

/// Get a stream of all the rooms changes, in addition to the existing
/// rooms.
#[cfg(not(target_arch = "wasm32"))]
pub fn rooms_stream(&self) -> (Vector<Room>, impl Stream<Item = Vec<VectorDiff<Room>>>) {
self.rooms.read().unwrap().stream()
}

/// Get the room with the given room id.
pub fn room(&self, room_id: &RoomId) -> Option<Room> {
self.rooms.read().unwrap().get(room_id).cloned()
}

/// Lookup the Room for the given RoomId, or create one, if it didn't exist
/// yet in the store.
/// Lookup the `Room` for the given `RoomId`, or create one, if it didn't
/// exist yet in the store
pub fn get_or_create_room(
&self,
room_id: &RoomId,
Expand All @@ -233,8 +253,7 @@ impl Store {
self.rooms
.write()
.unwrap()
.entry(room_id.to_owned())
.or_insert_with(|| {
.get_or_create(room_id, || {
Room::new(user_id, self.inner.clone(), room_id, room_type, roominfo_update_sender)
})
.clone()
Expand Down
Loading

0 comments on commit 6340eea

Please sign in to comment.