fix(log-store): rebuild decoupled sink on scale to fix data loss #20615
+192
−138
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
Currently in decoupled sinks, which use KvLogStore, we handle scale in a similar mechanism to online scale in normal streaming jobs. When scale happen, the barrier with scale mutation will arrive at log writer first, and then instead of applying the scale mutation to log reader immediately, the scale mutation will be added to the buffer queue as a special item, and the reader will not apply this item until all previous items before it has been consumed.
However, this works when no sink parallelism exists in the scale. For a sink parallelism that will exit after receiving the barrier with scale mutation, unlike normal streaming jobs, it will exit without waiting for all previous items being consumed. The sink parallelisms that exist previously and start taking charge of the vnodes owned by the exited sink parallelism won't read the pending data of these transferred vnodes from log store, and then these pending data is lost.
The following example shows the data loss
When exec1 receives the scale barrier, If the log reader of exec1 has not truncated on chunk1, it will not wait for chunk1 being truncated, and will only write chunk1 to storage and then exit anyway. However, when exec2 receive the scale barrier and start owning a new vnode1, it will not scan the storage on vnode1 for pending data, and therefore chunk1 is lost.
In this PR, we change to reinitialize the log reader during scale. During log reader reinitialization, it will rescan the vnode it owns, so that all pending data in the vnodes can be read. In the example, we will let exec2 scan the storage on vnode1, so that chunk1 won't lose. This is similar to offline scale in normal streaming jobs.
Besides, we also rebuild the sink during scale. This is because, in the lifecycle of a created sink, when receiving a barrier, it may assume that it has received all data at the epoch of the barrier. However, in the scenario above, when receiving the barrier and start owning new vnodes, there might still be pending data in the new vnodes, and if we yield these pending data to the sink, it will break the assumption above. Therefore, to make it simple, we will rebuild the whole sink during scale.
Checklist
Documentation
Release note