Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New temporal batch APIs #6587

Merged
merged 23 commits into from
Jul 4, 2024
Merged

New temporal batch APIs #6587

merged 23 commits into from
Jul 4, 2024

Conversation

jleibs
Copy link
Member

@jleibs jleibs commented Jun 17, 2024

What

This primarily introduces a new logging API for temporal batches. This API is a slightly lower-level than the existing log API but has a fairly familiar feel to it if you've worked with our data-types.

The biggest difference is that it does not currently support Archetypes. Data must be specifically logged using raw components arranged in (possibly partitioned) batches. The main reason for this is that Archetypes aren't required to have matched-length components and as such you would need to provide a per-component parititioning, which starts to look very similar to the manual component-level API.

Note that we automatically wrap regular batches in the correct way to turn them into "mono-batches". Partitions only need to be generated manually to support batch-batches.

This also requires a few helper classes for the 3 TimeBatch types, but otherwise makes use of the existing ComponentBatch constructors.

New python API docstring:

def log_temporal_batch(
    entity_path: str,
    times: Iterable[TimeBatchLike],
    components: Iterable[ComponentBatchLike],
    recording: RecordingStream | None = None,
    strict: bool | None = None,
) -> None:

Directly log a temporal batch of data to Rerun.

Unlike the regular log API, which is row-oriented, this API lets you submit the data
in a columnar form. Each TimeBatchLike and ComponentBatchLike object represents a column
of data that will be sent to Rerun. The lengths of all of these columns must match, and all
data that shares the same index across the different columns will act as a single logical row,
equivalent to a single call to rr.log().

Note that this API ignores any stateful time set on the log stream via the rerun.set_time_* APIs.

When using a regular ComponentBatch input, the batch data will map to single-valued component
instance at each timepoint.

For example, scalars would be logged as:

times = np.arange(0, 64)
scalars = np.sin(times / 10.0)

rr.log_temporal_batch(
    "scalars",
    times=[rr.TimeSequenceBatch("step", times)],
    components=[rr.components.ScalarBatch(scalars)],
)

In the viewer this will show up as 64 individual scalar values, one for each timepoint.

However, it is still possible to log temporal batches of batch data. To do this the source data first must
be created as a single contiguous batch, and can then be partitioned using the .partition() helper on the
ComponentBatch objects.

For example, to log 5 batches of 20 point clouds, first create a batch of 100 (20 * 5) point clouds and then
partition it into 5 batches of 20 point clouds:

times = np.arange(0, 5)
positions = rng.uniform(-5, 5, size=[100, 3])

rr.log_temporal_batch(
    "points",
    times=[rr.TimeSequenceBatch("step", times)],
    components=[
        rr.Points3D.indicator(),
        rr.components.Position3DBatch(positions).partition([20, 20, 20, 20, 20]),
    ],
)

In the viewer this will show up as 5 individual point clouds, one for each timepoint.

TODO

  • Round-tripped snippet for Python + Rust

Checklist

  • I have read and agree to Contributor Guide and the Code of Conduct
  • I've included a screenshot or gif (if applicable)
  • I have tested the web demo (if applicable):
  • The PR title and labels are set such as to maximize their usefulness for the next release's CHANGELOG
  • If applicable, add a new check to the release checklist!

To run all checks from main, comment on the PR with @rerun-bot full-check.

@jleibs jleibs force-pushed the jleibs/temporal_batch branch from e1b1d5a to 475c9d4 Compare June 17, 2024 20:04
@teh-cmc teh-cmc force-pushed the cmc/store_chunks_scratch branch 3 times, most recently from 27dc2b9 to 014d5cb Compare June 18, 2024 09:16
@jleibs jleibs force-pushed the jleibs/temporal_batch branch from 27c8cab to 422b30c Compare June 18, 2024 21:50
@jleibs jleibs added do-not-merge Do not merge this PR 🐍 Python API Python logging API include in changelog labels Jun 18, 2024
@jleibs jleibs marked this pull request as ready for review June 18, 2024 23:06
@teh-cmc teh-cmc self-requested a review June 19, 2024 07:17
Copy link
Member

@teh-cmc teh-cmc left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great.

One question though: is there any particular reason you went with the "FFI-heavy" approach of passing py-dicts around vs. crafting the (Transport)Chunk directly in Python?
I imagine it's because we want to avoid duplicating all the metadata handling and sanity checking code etc. On the one hand I like it, on the other I can't help but wonder how that's gonna go in C++ where the FFI is much more barebones... Eh 🤷.

(Also yes -- I feel like we've already had this discussion at one point... it's all a big blur)

Comment on lines 161 to 164
} else if Some(value.len()) == expected_length {
let offsets = Offsets::try_from_lengths(std::iter::repeat(1).take(value.len()))
.map_err(|err| ChunkError::Malformed {
reason: format!("Failed to create offsets: {err}"),
})?;
let data_type = ListArray::<i32>::default_datatype(value.data_type().clone());
ListArray::<i32>::try_new(data_type, offsets.into(), value, None).map_err(
|err| ChunkError::Malformed {
reason: format!("Failed to wrap in List array: {err}"),
},
)?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ha. Interesting that this is where this happens.

@teh-cmc teh-cmc force-pushed the cmc/store_chunks_scratch branch from 014d5cb to ad29d5c Compare June 21, 2024 07:47
@teh-cmc teh-cmc force-pushed the jleibs/temporal_batch branch from 8828289 to 43d91cc Compare June 21, 2024 07:48
@teh-cmc teh-cmc force-pushed the cmc/store_chunks_scratch branch from 7e789be to 1e24a6b Compare July 1, 2024 09:47
@teh-cmc teh-cmc force-pushed the jleibs/temporal_batch branch from 43d91cc to fbe8f2a Compare July 1, 2024 10:18
@teh-cmc teh-cmc force-pushed the cmc/store_chunks_scratch branch 2 times, most recently from 6b42567 to d304e97 Compare July 4, 2024 15:15
Base automatically changed from cmc/store_chunks_scratch to main July 4, 2024 15:15
@teh-cmc teh-cmc force-pushed the jleibs/temporal_batch branch from fbe8f2a to 2c083fb Compare July 4, 2024 15:21
@teh-cmc teh-cmc removed the do-not-merge Do not merge this PR label Jul 4, 2024
@teh-cmc
Copy link
Member

teh-cmc commented Jul 4, 2024

@rerun-bot full-check

@teh-cmc teh-cmc closed this Jul 4, 2024
@teh-cmc teh-cmc reopened this Jul 4, 2024
Copy link

github-actions bot commented Jul 4, 2024

@teh-cmc teh-cmc merged commit 0b03162 into main Jul 4, 2024
27 of 29 checks passed
@teh-cmc teh-cmc deleted the jleibs/temporal_batch branch July 4, 2024 15:37
@Famok
Copy link

Famok commented Aug 6, 2024

Hi @jleibs ,

I've been testing the pre-release build (releases).
Adding timeseries as batches works great and is really fast, great work!

Then I tried to add multiple images at once using different versions using rr.components.TensorDataBatch (which i thought would be a fitting equivalent to ScalarBatch) but was unable to get it working.

If it is possible to do in this release, could you please provide an example using log_temporal_batch to log a video / series of images for different times at once? ( video beeing a numpy array of shape(image_count, height, width) or a list[img0,img1 ...] )

What I've tried:

import rerun as rr
import numpy as np
video = np.random.randint(0,255,size=(1000,100,100))
rr.log_temporal_batch(
    "video",
    times=[rr.TimeSequenceBatch("time", list(range(len(video)))[:2])],
    components=[rr.components.TensorDataBatch(video),
                ]
)

--> results in a warning: RerunWarning: log_temporal_batch: ValueError(All times and components in a batch must have the same length. Expected length: 2 but got: 1 for component: rerun.components.TensorData)
rr.log_temporal_batch(

rr.log_temporal_batch(
    "video",
    times=[rr.TimeSequenceBatch("time", list(range(len(video)))[:2])],
    components=[rr.components.TensorDataBatch(
        [
                                rr.TensorData(shape=[TensorDimension(video.shape[0], name="depth"),TensorDimension(video.shape[2], name="height"),TensorDimension(video.shape[1], name="width")],buffer=video),
                                rr.TensorData(shape=[TensorDimension(video.shape[0], name="depth"),TensorDimension(video.shape[2], name="height"),TensorDimension(video.shape[1], name="width")],buffer=video)
                                      ]
                                              ),
                ]
)

--> results in a warning: RerunWarning: TensorDataBatch: ValueError(Tensors do not support batches)

@jleibs
Copy link
Member Author

jleibs commented Aug 6, 2024

@Famok thanks for testing the pre-release! It's very nice to get early feedback like this.

What you are trying to do is conceptually correct, but is currently blocked by:

I'm not sure whether we will manage to get that one done in time for this release. So you may still need to iterate over separate log calls when logging images for 0.18.

I believe the inherent overhead with logging images is such that the performance differences here are more negligible, compared to batch-logging of scalars. That said, I absolutely appreciate the desire to use this pattern to clean up code like this when you are starting with an array of images to begin with.

@Famok
Copy link

Famok commented Aug 9, 2024

@jleibs Thanks for the clarification!

I believe the inherent overhead with logging images is such that the performance differences here are more negligible, compared to batch-logging of scalars. That said, I absolutely appreciate the desire to use this pattern to clean up code like this when you are starting with an array of images to begin with.

This probably depends on the images. In my case they are small (100x100 px) and many of them (>>10k). Therefore I hope to profit ;)

@jleibs
Copy link
Member Author

jleibs commented Aug 9, 2024

This probably depends on the images. In my case they are small (100x100 px) and many of them (>>10k). Therefore I hope to profit ;)

Interesting. That makes sense. Even without ImageBatch support I think we will have a way to do this directly with an arrow constructor. I'll try to see if I can come up with some example code.

@jleibs
Copy link
Member Author

jleibs commented Aug 9, 2024

@Famok here's an example of manually creating an ImageBuffersBatch using pyarrow. This may still change a bit syntactically before the release, but should give you the basic idea.

from __future__ import annotations

import numpy as np
import pyarrow as pa
import rerun as rr

rr.init("rerun_example_send_columns", spawn=True)

COUNT = 64
WIDTH = 100
HEIGHT = 50
CHANNELS = 3

# Create our time
times = np.arange(0, COUNT)

# Create a batch of images
rng = np.random.default_rng(12345)
image_batch = rng.uniform(0, 255, size=[COUNT, HEIGHT, WIDTH, CHANNELS]).astype(dtype=np.uint8)

# Log the ImageFormat once, as static
format_static = rr.components.ImageFormat(width=WIDTH, height=HEIGHT, color_model="RGB", channel_datatype="U8")
rr.log("image", [format_static], static=True)

# Manually create an ImageBuffersBatch
image_buffers = (row.tobytes() for row in image_batch.reshape(COUNT, -1))
raw_arrow = pa.array(image_buffers, type=rr.components.ImageBufferType())
buffers_batch = rr.components.ImageBufferBatch(raw_arrow)

# Need to manually partition the batch to avoid false-negative on batch->column promotion
buffers_column = buffers_batch.partition([1] * COUNT)

rr.send_columns(
    "image",
    times=[rr.TimeSequenceColumn("step", times)],
    components=[rr.Image.indicator(), buffers_column],
)

@Famok
Copy link

Famok commented Aug 10, 2024

@jleibs Thank you so much, I'll give this a try on monday! Can this also be found in the docs somewhere?

@Famok
Copy link

Famok commented Aug 12, 2024

@jleibs I tried running your code, but it seems like the pre-release build is missing some stuff:
rr.version() --> 'rerun_py 0.18.0-alpha.1+dev [rustc 1.76.0 (07dca489a 2024-02-04), LLVM 17.0.6] x86_64-pc-windows-msvc main a93faab, built 2024-08-05T13:00:24Z'

    format_static = rr.components.ImageFormat(width=WIDTH, height=HEIGHT, color_model="RGB", channel_datatype="U8")
AttributeError: module 'rerun.components' has no attribute 'ImageFormat'

also this:

    buffers_batch = rr.components.ImageBufferBatch(raw_arrow)
AttributeError: module 'rerun.components' has no attribute 'ImageBufferBatch'

as well as:

    raw_arrow = pa.array(image_buffers, type=rr.components.ImageBufferType())
AttributeError: module 'rerun.components' has no attribute 'ImageBufferType'

@Wumpf
Copy link
Member

Wumpf commented Aug 12, 2024

@Famok the api has shifted around quite a bit again and your dev build doesn't have the latest changes yet it seems. Try again with the latest pre-release. That's 1dfb41c as of writing.
Jeremy's snippet worked fine from me with a build from main so I'm confident that will work as well

@Wumpf
Copy link
Member

Wumpf commented Aug 12, 2024

I was able to significantly improve the ergonomics for this #7155
I'm not 100% sure, but I think it's also as fast as what @jleibs previously posted

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Logging API for "temporal batches"
4 participants