-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[fix][broker] Consumer stuck when delete subscription __compaction failed #23980
base: master
Are you sure you want to change the base?
Conversation
@@ -245,6 +245,7 @@ public static boolean isDedupCursorName(String name) { | |||
private static final Long COMPACTION_NEVER_RUN = -0xfebecffeL; | |||
private volatile CompletableFuture<Long> currentCompaction = CompletableFuture.completedFuture( | |||
COMPACTION_NEVER_RUN); | |||
private volatile AtomicBoolean disablingCompaction = new AtomicBoolean(false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private volatile AtomicBoolean disablingCompaction = new AtomicBoolean(false); | |
private final AtomicBoolean disablingCompaction = new AtomicBoolean(false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, I have changed the modifier
if (e != null) { | ||
log.warn("[{}][{}] Last compaction task failed", topic, subscriptionName); | ||
// Avoid concurrently execute compaction and unsubscribing. | ||
synchronized (this) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there really a need for a synchronized block here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it is. It prevents the modifying of the variable currentCompaction
.
AtomicBoolean disablingCompaction = | ||
WhiteboxImpl.getInternalState(persistentTopic, "disablingCompaction"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please avoid using reflection/WhiteboxImpl
. A better approach is to have a default access method which is annotation with @VisibleForTesting
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed
WhiteboxImpl.getInternalState(persistentTopic, "currentCompaction"); | ||
persistentTopic.triggerCompaction(); | ||
CompletableFuture<Long> currentCompaction2 = | ||
WhiteboxImpl.getInternalState(persistentTopic, "currentCompaction"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Again, avoid reflection/WhiteboxImpl
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed
// Avoid concurrently execute compaction and unsubscribing. | ||
synchronized (this) { | ||
if (!disablingCompaction.compareAndSet(false, true)) { | ||
unsubscribeFuture.completeExceptionally( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should be protected with if (!unsubscribeFuture.isDone()) {
so that creating the exception could be avoided.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you mention the code line? I could not find it.
Motivation
Background
__compaction
Issue 1: consumer will stuck if deleting cursor(the step 2 above) failed
You can reproduce the issue by the test
testReadMsgsAfterDisableCompaction(true)
issue 2
The compaction task can concurrently execute by deleting the cursor
__compaction
Modifications
Documentation
doc
doc-required
doc-not-needed
doc-complete
Matching PR in forked repository
PR in forked repository: x