-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[improve][broker] Don't call ManagedLedger#asyncAddEntry in Netty I/O thread #23983
base: master
Are you sure you want to change the base?
[improve][broker] Don't call ManagedLedger#asyncAddEntry in Netty I/O thread #23983
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
An extra context switch for each entry is costly, especially when you have many small entries and little or no batching. That's why we put it on the same thread.
If the interceptor needs to do expensive work, maybe only the interceptor part should be done in a different thread, though it shouldn't affect it when we don't use interceptor.
@merlimat The thread switching was added in PR #9039, already in December 2020. The reason to make this change is related to a performance concern of #23940 changes which removed the thread switching. pulsar/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java Lines 796 to 826 in ee5b13a
In Pulsar use cases, synchronization on CPU intensive operations (or blocking IO operations) in Netty IO threads could cause performance regressions. In this case, it would impact use cases where there's a large number of producers producing to a single topic. Before #23940, the code looks like this: pulsar/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java Lines 796 to 810 in 7a79c78
btw. In the Pulsar code base, we have a problem in how IO threads are used. IO threads are used to process work that shouldn't be handled with IO threads at all. I have created an issue #23865. There should be a separate thread pool for running blocking operations and CPU intensive synchronized operations. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great work @BewareMyPower. Some comments added in this first pass.
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
Show resolved
Hide resolved
ledger.getExecutor().execute(() -> ledger.asyncAddEntry(buffer, (int) publishContext.getNumberOfMessages(), | ||
this, publishContext)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should be an internal concern of the ManagedLedger implementation's asyncAddEntry
method. One reason for this is that there's multiple asyncAddEntry
signatures and we'd like to add them all in order.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should restore the synchronized
keyword to the asyncAddEntry
method to make it thread safe as it is before
Instead of that, I still think the synchronization should be performed from the caller. asyncAddEntry
only needs to synchronize it with other asyncAddEntry
or addEntry
method calls. It does not need to synchronize with other managed ledger's synchronized methods. Let me improve the apiNotes
parts
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are 2 different aspects to consider: thread safety and ordering.
Regarding "I still think the synchronization should be performed from the caller":
In Java, synchronization is not only about performing operations one by one under a mutually exclusive lock. "Visibility" is an important aspect of Java thread safety. That's why it doesn't make sense for callers to synchronize calls to asyncAddEntry
since all callers would need to use the same lock for both ordering and thread safety.
Snippet from "Java Concurrency in Practice", Chapter 2 "Thread safety":
It is a common mistake to assume that synchronization needs to be used only when writing to shared variables; this is simply not true.
For each mutable state variable that may be accessed by more than one thread, all accesses to that variable must be performed with the same lock held. In this case, we say that the variable is guarded by that lock.
How a ManagedLedger implementation achieves ordering guarantees and thread safety is an internal implementation detail. In the case of ManagedLedger, it doesn't make sense to delegate the responsibility of thread safety to the caller.
Another downside of the ledger.getExecutor().execute
solution is that it exposes internal implementation details that callers of the API must be aware of. This is not great API design when such implementation details are exposed.
I think we need to consider a different solution. I can see some code examples in #23940 of what needs to be solved. To me, it seems this could be solved with the Object ctx
parameter by passing a ctx
that is also understood by the interceptor. @BewareMyPower, would you be able to research that type of solution instead?
@merlimat @lhotari to correct it, this is the very early behavior introduced in #1521. This PR intends to decouple After that, all write operations from Pulsar client will still keep the original behavior that switches to managed ledger's executor to call However, regarding the downstream, for example, in my Kafka protocol handler implementation, The comment here makes sense to a certain extent, but it might be a new topic (e.g. thread switching vs. synchronized) to discuss, which is beyond the scope of this PR. At least, the existing thread switching approach can already achieve high publish performance, which is verified by many benchmarks. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please check the review comments
} catch (Throwable throwable) { | ||
if (!added) { | ||
addOperation.failed(ManagedLedgerException.getManagedLedgerException(throwable)); | ||
} // else: all elements of `pendingAddEntries` will fail in another thread | ||
} | ||
} | ||
|
||
protected void beforeAddEntryToQueue(State state) throws ManagedLedgerException { | ||
protected void beforeAddEntryToQueue() throws ManagedLedgerException { | ||
final var state = STATE_UPDATER.get(this); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
using the STATE_UPDATER doesn't make any difference for plain reads of the field value.
protected void afterAddEntryToQueue(State state, OpAddEntry addOperation) throws ManagedLedgerException { | ||
// TODO: does this method really need to be synchronized? | ||
protected synchronized void afterAddEntryToQueue(OpAddEntry addOperation) throws ManagedLedgerException { | ||
final var state = STATE_UPDATER.get(this); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
using the STATE_UPDATER doesn't make any difference for plain reads of the field value.
ledger.getExecutor().execute(() -> ledger.asyncAddEntry(buffer, (int) publishContext.getNumberOfMessages(), | ||
this, publishContext)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are 2 different aspects to consider: thread safety and ordering.
Regarding "I still think the synchronization should be performed from the caller":
In Java, synchronization is not only about performing operations one by one under a mutually exclusive lock. "Visibility" is an important aspect of Java thread safety. That's why it doesn't make sense for callers to synchronize calls to asyncAddEntry
since all callers would need to use the same lock for both ordering and thread safety.
Snippet from "Java Concurrency in Practice", Chapter 2 "Thread safety":
It is a common mistake to assume that synchronization needs to be used only when writing to shared variables; this is simply not true.
For each mutable state variable that may be accessed by more than one thread, all accesses to that variable must be performed with the same lock held. In this case, we say that the variable is guarded by that lock.
How a ManagedLedger implementation achieves ordering guarantees and thread safety is an internal implementation detail. In the case of ManagedLedger, it doesn't make sense to delegate the responsibility of thread safety to the caller.
Another downside of the ledger.getExecutor().execute
solution is that it exposes internal implementation details that callers of the API must be aware of. This is not great API design when such implementation details are exposed.
I think we need to consider a different solution. I can see some code examples in #23940 of what needs to be solved. To me, it seems this could be solved with the Object ctx
parameter by passing a ctx
that is also understood by the interceptor. @BewareMyPower, would you be able to research that type of solution instead?
Motivation
#23940 brings a behavior change that the core logic of
ManagedLedger#asyncAddEntry
now won't switch threads, which means it will be executed directly in Netty I/O thread viaPersistentTopic#asyncAddEntry
.The
beforeAddEntry
method calls theintercept
andinterceptWithNumberOfMessages
methods for all broker entry interceptors and prepends a new broker entry metadata buffer on the original buffer (though it's just a composite buffer).There is a risk that when many producers send messages to the same managed ledger concurrently, the process of
asyncAddEntry
might block the Netty I/O thread for some time and cause the performance regression.Modifications
In
PersistentTopic#publishMessage
, expose thegetExecutor()
method forManagedLedger
and executeManagedLedger#asyncAddEntry
in that executor. The change of #12606 is moved toPersistentTopic
as well that the buffer is retained before switching to another thread.After that, only synchronize
afterAddEntryToQueue
with other synchronized methods ofManagedLedgerImpl
. P.S. actually I don't thinksynchronized
is needed here but the logic is not trivial likebeforeAddEntryToQueue
andbeforeAddEntry
, so I still retain it as synchronized.ManagedLedgerImpl#asyncAddEntry
still doesn't switch the thread, so it would still be possible for the downstream application to synchronizeasyncAddEntry
, either by adding a lock (e.g.synchronized
) or executing this method is a single thread.Documentation
doc
doc-required
doc-not-needed
doc-complete
Matching PR in forked repository
PR in forked repository: BewareMyPower#40