Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(logstore): provide more opportunities to read #20546

Open
wants to merge 10 commits into
base: main
Choose a base branch
from

Conversation

kwannoel
Copy link
Contributor

@kwannoel kwannoel commented Feb 20, 2025

I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.

What's changed and what's your intention?

At the writer future sleep briefly after:

  1. Yielding barrier. This will give read future a brief chance to read some records.
  2. Chunk buffer is 90% full. This will give read future a brief chance to read some records, and avoid logstore flush.

Checklist

  • I have written necessary rustdoc comments.
  • I have added necessary unit tests and integration tests.
  • I have added test labels as necessary.
  • I have added fuzzing tests or opened an issue to track them.
  • My PR contains breaking changes.
  • My PR changes performance-critical code, so I will run (micro) benchmarks and present the results.
  • My PR contains critical fixes that are necessary to be merged into the latest release.

Documentation

  • My PR needs documentation updates.
Release note

Copy link
Contributor Author

This stack of pull requests is managed by Graphite. Learn more about stacking.

@kwannoel kwannoel changed the title add paused variant feat(logstore): provide more opportunities to read Feb 20, 2025
@kwannoel kwannoel marked this pull request as ready for review February 20, 2025 08:44
@kwannoel kwannoel requested a review from wenym1 February 20, 2025 08:44
@@ -144,6 +150,11 @@ struct FlushedChunkInfo {
}

enum WriteFuture<S: LocalStateStore> {
Paused {
duration: Duration,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should store the sleep future instead of the duration. The sleep future is created in the next_event call and only store in the future returned from next_event. However, the next_event future can be easily dropped because of select, and every time the read future gets ready, the next_event future will be created again, and the sleep time is reset. Under this circumstance, the write future will easily starved by the read future.

);
// If buffer 90% full, pause the stream for a while, let downstream do some processing
// to avoid flushing.
if buffer.buffer.len() >= self.buffer_size * 9 / 10 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'd better determine whether to pause by the number of rows rather than the item count.

More importantly, we shouldn't always pause the write future when the buffer is almost full, because in this way, the slowness in the reader side will always block the writer and the up-down stream won't be decoupled.

In my rough design, we should only pause when we transition from a clean in-memory state to flushed state. The clean in-memory state means when no pending data is in storage, and all chunks can be retrieved from the buffer without reading from storage, and the second state is all other circumstances.

More specifically, we may pause for a while only when we were previously in clean in-memory state, and in either of the following scenarios:

  • when receive a chunk and will write a chunk to storage
  • when receive a checkpoint barrier and going to write all unflushed chunks to storage.

When this happens and we may store the item we receive when we pause, and then after the pause sleep, re-apply the item to buffer and storage.

And when we are not in the clean in-memory state, we don't have to pause.

Copy link
Contributor Author

@kwannoel kwannoel Feb 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The clean in-memory state means when no pending data is in storage, and all chunks can be retrieved from the buffer without reading from storage, and the second state is all other circumstances.

Seems like we have consider case where there's historical data being read from the logstore (via read_persisted_log_store), on top of reading from flushed chunk future.

Not sure if there's a simple way to check if there's data for the logstore in the previous checkpointed epoch, on recovery.

@kwannoel kwannoel force-pushed the kwannoel/pass-chunks branch 2 times, most recently from af96993 to 8f8cd46 Compare February 27, 2025 01:55
@kwannoel kwannoel requested a review from wenym1 February 27, 2025 02:06
..
} => {
sleep_future.await;
let (opt, stream) = future.await;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When we have passed the sleep_future.await and gets pending at future.await, if this future of next_future is dropped and recreated, we will still be at WriteFuture::Paused, and then we will poll the sleep_future again. However, the sleep_future have been ready, and the behavior of polling it is undefined, may be pending forever, or panic.

stream,
write_state,
);
// If buffer full, pause the stream for a while, let downstream do some processing
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we are implementing the idea proposed in #20546 (comment), we should not always set the write future to paused when we have flushed data, but only pause for once when we transit from clean state to flushed state.

The general logic should be like, when we are handling a message that is going to trigger a flush to storage, (either on a chunk when buffer is full, or on a checkpoint barrier when buffer has unflushed chunk), if the state is currently clean, we may pause for a while, and then reapply the message anyway.

The pause is always triggered by an upstream message. Therefore, we can store the upstream message in WriteFuture::Pause, and then when the sleep finish, we can yield this upstream message to reapply the message.

The handle logic should be like

let mut clean_state = check initial clean state.
select!{
    message = write_future => {
        match message {
            barrier => {
                if clean_state && going to flush some unflushed chunk {
                    write_future = paused(barrier);
                    clean_state = false;
                } else {
                    ...handle the barrier
                }
            }
            chunk => {
                if clean_state && buffer full {
                    write_future = paused(chunk);
                    clean_state = false;
                } else {
                    ...handle the chunk
                }
            }
        }
    }
    chunk = read_future {
        if !clean_state && now the state is clean {
            clean_state = true;
        }
        ...
    }
}

@xxchan xxchan requested a review from Copilot February 27, 2025 05:55

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copilot encountered an error and was unable to review this pull request. You can try again by re-requesting a review.

@kwannoel kwannoel force-pushed the kwannoel/pass-chunks branch from 857ff65 to c588e6f Compare February 27, 2025 16:22
@kwannoel
Copy link
Contributor Author

Should have fixed the comments, will have a second look tomorrow.

I also added another read state, to try to fetch a single chunk, so we know if we are in a dirty or clean state initially. c588e6f.

@kwannoel
Copy link
Contributor Author

kwannoel commented Feb 28, 2025

Seems I encountered some other bug in unaligned-join. Investigating

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants