Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(log-store): rebuild decoupled sink on scale to fix data loss #20615

Merged
merged 2 commits into from
Feb 27, 2025

Conversation

wenym1
Copy link
Contributor

@wenym1 wenym1 commented Feb 26, 2025

I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.

What's changed and what's your intention?

Currently in decoupled sinks, which use KvLogStore, we handle scale in a similar mechanism to online scale in normal streaming jobs. When scale happen, the barrier with scale mutation will arrive at log writer first, and then instead of applying the scale mutation to log reader immediately, the scale mutation will be added to the buffer queue as a special item, and the reader will not apply this item until all previous items before it has been consumed.

However, this works when no sink parallelism exists in the scale. For a sink parallelism that will exit after receiving the barrier with scale mutation, unlike normal streaming jobs, it will exit without waiting for all previous items being consumed. The sink parallelisms that exist previously and start taking charge of the vnodes owned by the exited sink parallelism won't read the pending data of these transferred vnodes from log store, and then these pending data is lost.

The following example shows the data loss

exec1 exec2
start with vnode [1] start with vnode [2]
write chunk1 write chunk2
scale barrier exit change to own vnode [1,2]

When exec1 receives the scale barrier, If the log reader of exec1 has not truncated on chunk1, it will not wait for chunk1 being truncated, and will only write chunk1 to storage and then exit anyway. However, when exec2 receive the scale barrier and start owning a new vnode1, it will not scan the storage on vnode1 for pending data, and therefore chunk1 is lost.

In this PR, we change to reinitialize the log reader during scale. During log reader reinitialization, it will rescan the vnode it owns, so that all pending data in the vnodes can be read. In the example, we will let exec2 scan the storage on vnode1, so that chunk1 won't lose. This is similar to offline scale in normal streaming jobs.

Besides, we also rebuild the sink during scale. This is because, in the lifecycle of a created sink, when receiving a barrier, it may assume that it has received all data at the epoch of the barrier. However, in the scenario above, when receiving the barrier and start owning new vnodes, there might still be pending data in the new vnodes, and if we yield these pending data to the sink, it will break the assumption above. Therefore, to make it simple, we will rebuild the whole sink during scale.

Checklist

  • I have written necessary rustdoc comments.
  • I have added necessary unit tests and integration tests.
  • I have added test labels as necessary.
  • I have added fuzzing tests or opened an issue to track them.
  • My PR contains breaking changes.
  • My PR changes performance-critical code, so I will run (micro) benchmarks and present the results.
  • My PR contains critical fixes that are necessary to be merged into the latest release.

Documentation

  • My PR needs documentation updates.
Release note

Copy link
Contributor Author

wenym1 commented Feb 26, 2025

@wenym1 wenym1 requested review from hzxa21 and kwannoel February 26, 2025 06:29
@wenym1 wenym1 force-pushed the yiming/fix-log-store-scale-data-loss branch from f421f54 to 856ea07 Compare February 26, 2025 08:05
Copy link
Contributor

@kwannoel kwannoel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, do we have test for this? I think it maybe important.

I recently added some tests for arrangement backfill also: #20613 for scale in.

@wenym1
Copy link
Contributor Author

wenym1 commented Feb 26, 2025

LGTM, do we have test for this? I think it maybe important.

I recently added some tests for arrangement backfill also: #20613 for scale in.

Yes. Actually the bug was found in the deterministic scale test for sink.

@wenym1 wenym1 added this pull request to the merge queue Feb 27, 2025
Merged via the queue into main with commit 8eefc2d Feb 27, 2025
28 of 29 checks passed
@wenym1 wenym1 deleted the yiming/fix-log-store-scale-data-loss branch February 27, 2025 04:53
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/fix Bug fix
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants