Skip to content

Commit 3cd534f

Browse files
authored
MVP Support for inline-rendering of Rerun within jupyter notebooks (#1798) (#1834) (#1844)
* Introduce the ability to push an rrd binary via iframe.contentWindow.postMessage * New API to output the current buffered messages as a cell in jupyter * Example notebook with the cube demo * Track that we need to send another recording msg after draining the backlog * Dynamically resolve the app location based on git commit. Allow override to use self-hosted assets * Add some crude timeout logic in case the iframe fails to load * Don't persist app state in notebooks * Introduce new MemoryRecording for use with Jupyter notebooks (#1834) * Refactor the relationship between the assorted web / websocket servers (#1844) * Rename RemoteViewerServer to WebViewerSink * CLI arguments for specifying ports * Proper typing for the ports
1 parent d33dab6 commit 3cd534f

32 files changed

+1104
-130
lines changed

Cargo.lock

+7-1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/re_log_encoding/src/encoder.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ impl<W: std::io::Write> Encoder<W> {
9090

9191
pub fn encode<'a>(
9292
messages: impl Iterator<Item = &'a LogMsg>,
93-
write: impl std::io::Write,
93+
write: &mut impl std::io::Write,
9494
) -> Result<(), EncodeError> {
9595
let mut encoder = Encoder::new(write)?;
9696
for message in messages {

crates/re_log_encoding/src/stream_rrd_from_http.rs

+49-5
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
use std::sync::Arc;
2+
13
use re_log_types::LogMsg;
24

35
pub fn stream_rrd_from_http_to_channel(url: String) -> re_smart_channel::Receiver<LogMsg> {
@@ -6,14 +8,14 @@ pub fn stream_rrd_from_http_to_channel(url: String) -> re_smart_channel::Receive
68
});
79
stream_rrd_from_http(
810
url,
9-
Box::new(move |msg| {
11+
Arc::new(move |msg| {
1012
tx.send(msg).ok();
1113
}),
1214
);
1315
rx
1416
}
1517

16-
pub fn stream_rrd_from_http(url: String, on_msg: Box<dyn Fn(LogMsg) + Send>) {
18+
pub fn stream_rrd_from_http(url: String, on_msg: Arc<dyn Fn(LogMsg) + Send + Sync>) {
1719
re_log::debug!("Downloading .rrd file from {url:?}…");
1820

1921
// TODO(emilk): stream the http request, progressively decoding the .rrd file.
@@ -36,9 +38,50 @@ pub fn stream_rrd_from_http(url: String, on_msg: Box<dyn Fn(LogMsg) + Send>) {
3638
});
3739
}
3840

41+
#[cfg(target_arch = "wasm32")]
42+
mod web_event_listener {
43+
use js_sys::Uint8Array;
44+
use re_log_types::LogMsg;
45+
use std::sync::Arc;
46+
use wasm_bindgen::{closure::Closure, JsCast, JsValue};
47+
use web_sys::MessageEvent;
48+
49+
/// Install an event-listener on `window` which will decode the incoming event as an rrd
50+
///
51+
/// From javascript you can send an rrd using:
52+
/// ``` ignore
53+
/// var rrd = new Uint8Array(...); // Get an RRD from somewhere
54+
/// window.postMessage(rrd, "*")
55+
/// ```
56+
pub fn stream_rrd_from_event_listener(on_msg: Arc<dyn Fn(LogMsg) + Send>) {
57+
let window = web_sys::window().expect("no global `window` exists");
58+
let closure =
59+
Closure::wrap(Box::new(
60+
move |event: JsValue| match event.dyn_into::<MessageEvent>() {
61+
Ok(message_event) => {
62+
let uint8_array = Uint8Array::new(&message_event.data());
63+
let result: Vec<u8> = uint8_array.to_vec();
64+
65+
crate::stream_rrd_from_http::decode_rrd(result, on_msg.clone());
66+
}
67+
Err(js_val) => {
68+
re_log::error!("Incoming event was not a MessageEvent. {:?}", js_val);
69+
}
70+
},
71+
) as Box<dyn FnMut(_)>);
72+
window
73+
.add_event_listener_with_callback("message", closure.as_ref().unchecked_ref())
74+
.unwrap();
75+
closure.forget();
76+
}
77+
}
78+
79+
#[cfg(target_arch = "wasm32")]
80+
pub use web_event_listener::stream_rrd_from_event_listener;
81+
3982
#[cfg(not(target_arch = "wasm32"))]
4083
#[allow(clippy::needless_pass_by_value)] // must match wasm version
41-
fn decode_rrd(rrd_bytes: Vec<u8>, on_msg: Box<dyn Fn(LogMsg) + Send>) {
84+
fn decode_rrd(rrd_bytes: Vec<u8>, on_msg: Arc<dyn Fn(LogMsg) + Send>) {
4285
match crate::decoder::Decoder::new(rrd_bytes.as_slice()) {
4386
Ok(decoder) => {
4487
for msg in decoder {
@@ -61,15 +104,16 @@ fn decode_rrd(rrd_bytes: Vec<u8>, on_msg: Box<dyn Fn(LogMsg) + Send>) {
61104
#[cfg(target_arch = "wasm32")]
62105
mod web_decode {
63106
use re_log_types::LogMsg;
107+
use std::sync::Arc;
64108

65-
pub fn decode_rrd(rrd_bytes: Vec<u8>, on_msg: Box<dyn Fn(LogMsg) + Send>) {
109+
pub fn decode_rrd(rrd_bytes: Vec<u8>, on_msg: Arc<dyn Fn(LogMsg) + Send>) {
66110
wasm_bindgen_futures::spawn_local(decode_rrd_async(rrd_bytes, on_msg));
67111
}
68112

69113
/// Decodes the file in chunks, with an yield between each chunk.
70114
///
71115
/// This is cooperative multi-tasking.
72-
async fn decode_rrd_async(rrd_bytes: Vec<u8>, on_msg: Box<dyn Fn(LogMsg) + Send>) {
116+
async fn decode_rrd_async(rrd_bytes: Vec<u8>, on_msg: Arc<dyn Fn(LogMsg) + Send>) {
73117
let mut last_yield = instant::Instant::now();
74118

75119
match crate::decoder::Decoder::new(rrd_bytes.as_slice()) {

crates/re_sdk/src/lib.rs

+3-1
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,9 @@ pub mod demo_util;
4949
/// This is how you select whether the log stream ends up
5050
/// sent over TCP, written to file, etc.
5151
pub mod sink {
52-
pub use crate::log_sink::{disabled, BufferedSink, LogSink, TcpSink};
52+
pub use crate::log_sink::{
53+
disabled, BufferedSink, LogSink, MemorySink, MemorySinkStorage, TcpSink,
54+
};
5355

5456
#[cfg(not(target_arch = "wasm32"))]
5557
pub use re_log_encoding::{FileSink, FileSinkError};

crates/re_sdk/src/log_sink.rs

+47
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,53 @@ impl LogSink for BufferedSink {
7676
}
7777
}
7878

79+
/// Store log messages directly in memory
80+
///
81+
/// Although very similar to `BufferedSink` this sink is a real-endpoint. When creating
82+
/// a new sink the logged messages stay with the `MemorySink` (`drain_backlog` does nothing).
83+
///
84+
/// Additionally the raw storage can be accessed and used to create an in-memory RRD.
85+
/// This is useful for things like the inline rrd-viewer in Jupyter notebooks.
86+
#[derive(Default)]
87+
pub struct MemorySink(MemorySinkStorage);
88+
89+
impl MemorySink {
90+
/// Access the raw `MemorySinkStorage`
91+
pub fn buffer(&self) -> MemorySinkStorage {
92+
self.0.clone()
93+
}
94+
}
95+
96+
impl LogSink for MemorySink {
97+
fn send(&self, msg: LogMsg) {
98+
self.0.lock().push(msg);
99+
}
100+
101+
fn send_all(&self, mut messages: Vec<LogMsg>) {
102+
self.0.lock().append(&mut messages);
103+
}
104+
}
105+
106+
/// The storage used by [`MemorySink`]
107+
#[derive(Default, Clone)]
108+
pub struct MemorySinkStorage(std::sync::Arc<parking_lot::Mutex<Vec<LogMsg>>>);
109+
110+
///
111+
impl MemorySinkStorage {
112+
/// Lock the contained buffer
113+
fn lock(&self) -> parking_lot::MutexGuard<'_, Vec<LogMsg>> {
114+
self.0.lock()
115+
}
116+
117+
/// Convert the stored messages into an in-memory Rerun log file
118+
pub fn rrd_as_bytes(&self) -> Result<Vec<u8>, re_log_encoding::encoder::EncodeError> {
119+
let messages = self.lock();
120+
let mut buffer = std::io::Cursor::new(Vec::new());
121+
re_log_encoding::encoder::encode(messages.iter(), &mut buffer)?;
122+
Ok(buffer.into_inner())
123+
}
124+
}
125+
79126
// ----------------------------------------------------------------------------
80127

81128
/// Stream log messages to a Rerun TCP server.

crates/re_smart_channel/src/lib.rs

+6-1
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,11 @@ pub enum Source {
1818
/// Streaming an `.rrd` file over http.
1919
RrdHttpStream { url: String },
2020

21+
/// Loading an `.rrd` file from a `postMessage` js event
22+
///
23+
/// Only applicable to web browser iframes
24+
RrdWebEventListener,
25+
2126
/// The source is the logging sdk directly, same process.
2227
Sdk,
2328

@@ -36,7 +41,7 @@ pub enum Source {
3641
impl Source {
3742
pub fn is_network(&self) -> bool {
3843
match self {
39-
Self::File { .. } | Self::Sdk => false,
44+
Self::File { .. } | Self::Sdk | Self::RrdWebEventListener => false,
4045
Self::RrdHttpStream { .. } | Self::WsClient { .. } | Self::TcpServer { .. } => true,
4146
}
4247
}

crates/re_viewer/Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ winapi = "0.3.9"
117117
[target.'cfg(target_arch = "wasm32")'.dependencies]
118118
console_error_panic_hook = "0.1.6"
119119
wasm-bindgen-futures = "0.4"
120-
120+
web-sys = { version = "0.3.52", features = ["Window"] }
121121

122122
[build-dependencies]
123123
re_build_build_info.workspace = true

crates/re_viewer/src/app.rs

+6-3
Original file line numberDiff line numberDiff line change
@@ -640,6 +640,9 @@ fn wait_screen_ui(ui: &mut egui::Ui, rx: &Receiver<LogMsg>) {
640640
re_smart_channel::Source::RrdHttpStream { url } => {
641641
ui.strong(format!("Loading {url}…"));
642642
}
643+
re_smart_channel::Source::RrdWebEventListener => {
644+
ready_and_waiting(ui, "Waiting for logging data…");
645+
}
643646
re_smart_channel::Source::Sdk => {
644647
ready_and_waiting(ui, "Waiting for logging data from SDK");
645648
}
@@ -1885,9 +1888,9 @@ fn new_recording_confg(
18851888
let play_state = match data_source {
18861889
// Play files from the start by default - it feels nice and alive./
18871890
// RrdHttpStream downloads the whole file before decoding it, so we treat it the same as a file.
1888-
re_smart_channel::Source::File { .. } | re_smart_channel::Source::RrdHttpStream { .. } => {
1889-
PlayState::Playing
1890-
}
1891+
re_smart_channel::Source::File { .. }
1892+
| re_smart_channel::Source::RrdHttpStream { .. }
1893+
| re_smart_channel::Source::RrdWebEventListener => PlayState::Playing,
18911894

18921895
// Live data - follow it!
18931896
re_smart_channel::Source::Sdk

crates/re_viewer/src/remote_viewer_app.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ impl RemoteViewerApp {
5656
}
5757
}
5858
Err(err) => {
59-
re_log::error!("Failed to parse message: {}", re_error::format(&err));
59+
re_log::error!("Failed to parse message: {err}");
6060
std::ops::ControlFlow::Break(())
6161
}
6262
}

crates/re_viewer/src/viewer_analytics.rs

+1
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,7 @@ impl ViewerAnalytics {
184184
let data_source = match data_source {
185185
re_smart_channel::Source::File { .. } => "file", // .rrd
186186
re_smart_channel::Source::RrdHttpStream { .. } => "http",
187+
re_smart_channel::Source::RrdWebEventListener { .. } => "web_event",
187188
re_smart_channel::Source::Sdk => "sdk", // show()
188189
re_smart_channel::Source::WsClient { .. } => "ws_client", // spawn()
189190
re_smart_channel::Source::TcpServer { .. } => "tcp_server", // connect()

crates/re_viewer/src/web.rs

+32-3
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use eframe::wasm_bindgen::{self, prelude::*};
2+
use std::sync::Arc;
23

34
use re_memory::AccountingAllocator;
45

@@ -54,7 +55,30 @@ pub async fn start(
5455
let egui_ctx = cc.egui_ctx.clone();
5556
re_log_encoding::stream_rrd_from_http::stream_rrd_from_http(
5657
url,
57-
Box::new(move |msg| {
58+
Arc::new(move |msg| {
59+
egui_ctx.request_repaint(); // wake up ui thread
60+
tx.send(msg).ok();
61+
}),
62+
);
63+
64+
Box::new(crate::App::from_receiver(
65+
build_info,
66+
&app_env,
67+
startup_options,
68+
re_ui,
69+
cc.storage,
70+
rx,
71+
std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)),
72+
))
73+
}
74+
EndpointCategory::WebEventListener => {
75+
// Process an rrd when it's posted via `window.postMessage`
76+
let (tx, rx) = re_smart_channel::smart_channel(
77+
re_smart_channel::Source::RrdWebEventListener,
78+
);
79+
let egui_ctx = cc.egui_ctx.clone();
80+
re_log_encoding::stream_rrd_from_http::stream_rrd_from_event_listener(
81+
Arc::new(move |msg| {
5882
egui_ctx.request_repaint(); // wake up ui thread
5983
tx.send(msg).ok();
6084
}),
@@ -95,13 +119,18 @@ enum EndpointCategory {
95119

96120
/// A remote Rerun server.
97121
WebSocket(String),
122+
123+
/// An eventListener for rrd posted from containing html
124+
WebEventListener,
98125
}
99126

100127
fn categorize_uri(mut uri: String) -> EndpointCategory {
101128
if uri.starts_with("http") || uri.ends_with(".rrd") {
102129
EndpointCategory::HttpRrd(uri)
103-
} else if uri.starts_with("ws") {
130+
} else if uri.starts_with("ws:") || uri.starts_with("wss:") {
104131
EndpointCategory::WebSocket(uri)
132+
} else if uri.starts_with("web_event:") {
133+
EndpointCategory::WebEventListener
105134
} else {
106135
// If this is sometyhing like `foo.com` we can't know what it is until we connect to it.
107136
// We could/should connect and see what it is, but for now we just take a wild guess instead:
@@ -119,7 +148,7 @@ fn get_url(info: &eframe::IntegrationInfo) -> String {
119148
url = param.clone();
120149
}
121150
if url.is_empty() {
122-
re_ws_comms::default_server_url(&info.web_info.location.hostname)
151+
re_ws_comms::server_url(&info.web_info.location.hostname, Default::default())
123152
} else {
124153
url
125154
}

crates/re_web_viewer_server/Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -46,11 +46,11 @@ analytics = ["dep:re_analytics"]
4646
[dependencies]
4747
re_log.workspace = true
4848

49-
anyhow.workspace = true
5049
ctrlc.workspace = true
5150
document-features = "0.2"
5251
futures-util = "0.3"
5352
hyper = { version = "0.14", features = ["full"] }
53+
thiserror.workspace = true
5454
tokio = { workspace = true, default-features = false, features = [
5555
"macros",
5656
"rt-multi-thread",

0 commit comments

Comments
 (0)