-
Notifications
You must be signed in to change notification settings - Fork 613
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(logstore): provide more opportunities to read #20546
base: main
Are you sure you want to change the base?
Conversation
@@ -144,6 +150,11 @@ struct FlushedChunkInfo { | |||
} | |||
|
|||
enum WriteFuture<S: LocalStateStore> { | |||
Paused { | |||
duration: Duration, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should store the sleep future instead of the duration. The sleep future is created in the next_event
call and only store in the future returned from next_event
. However, the next_event
future can be easily dropped because of select
, and every time the read future gets ready, the next_event
future will be created again, and the sleep time is reset. Under this circumstance, the write future will easily starved by the read future.
); | ||
// If buffer 90% full, pause the stream for a while, let downstream do some processing | ||
// to avoid flushing. | ||
if buffer.buffer.len() >= self.buffer_size * 9 / 10 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We'd better determine whether to pause by the number of rows rather than the item count.
More importantly, we shouldn't always pause the write future when the buffer is almost full, because in this way, the slowness in the reader side will always block the writer and the up-down stream won't be decoupled.
In my rough design, we should only pause when we transition from a clean in-memory state to flushed state. The clean in-memory state means when no pending data is in storage, and all chunks can be retrieved from the buffer without reading from storage, and the second state is all other circumstances.
More specifically, we may pause for a while only when we were previously in clean in-memory state, and in either of the following scenarios:
- when receive a chunk and will write a chunk to storage
- when receive a checkpoint barrier and going to write all unflushed chunks to storage.
When this happens and we may store the item we receive when we pause, and then after the pause sleep, re-apply the item to buffer and storage.
And when we are not in the clean in-memory state, we don't have to pause.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The clean in-memory state means when no pending data is in storage, and all chunks can be retrieved from the buffer without reading from storage, and the second state is all other circumstances.
Seems like we have consider case where there's historical data being read from the logstore (via read_persisted_log_store
), on top of reading from flushed chunk future.
Not sure if there's a simple way to check if there's data for the logstore in the previous checkpointed epoch, on recovery.
af96993
to
8f8cd46
Compare
.. | ||
} => { | ||
sleep_future.await; | ||
let (opt, stream) = future.await; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When we have passed the sleep_future.await
and gets pending at future.await
, if this future of next_future
is dropped and recreated, we will still be at WriteFuture::Paused
, and then we will poll the sleep_future
again. However, the sleep_future
have been ready, and the behavior of polling it is undefined, may be pending forever, or panic.
stream, | ||
write_state, | ||
); | ||
// If buffer full, pause the stream for a while, let downstream do some processing |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we are implementing the idea proposed in #20546 (comment), we should not always set the write future to paused when we have flushed data, but only pause for once when we transit from clean state to flushed state.
The general logic should be like, when we are handling a message that is going to trigger a flush to storage, (either on a chunk when buffer is full, or on a checkpoint barrier when buffer has unflushed chunk), if the state is currently clean, we may pause for a while, and then reapply the message anyway.
The pause is always triggered by an upstream message. Therefore, we can store the upstream message in WriteFuture::Pause
, and then when the sleep finish, we can yield this upstream message to reapply the message.
The handle logic should be like
let mut clean_state = check initial clean state.
select!{
message = write_future => {
match message {
barrier => {
if clean_state && going to flush some unflushed chunk {
write_future = paused(barrier);
clean_state = false;
} else {
...handle the barrier
}
}
chunk => {
if clean_state && buffer full {
write_future = paused(chunk);
clean_state = false;
} else {
...handle the chunk
}
}
}
}
chunk = read_future {
if !clean_state && now the state is clean {
clean_state = true;
}
...
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Copilot encountered an error and was unable to review this pull request. You can try again by re-requesting a review.
857ff65
to
c588e6f
Compare
Should have fixed the comments, will have a second look tomorrow. I also added another read state, to try to fetch a single chunk, so we know if we are in a dirty or clean state initially. c588e6f. |
Seems I encountered some other bug in unaligned-join. Investigating |
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
At the writer future sleep briefly after:
Checklist
Documentation
Release note