Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] maxMessagePublishBufferSizeInMB permits leak can stall and timeout connections #23921

Open
3 tasks done
lhotari opened this issue Feb 3, 2025 · 2 comments
Open
3 tasks done
Assignees
Labels
type/bug The PR fixed a bug or issue reported a bug

Comments

@lhotari
Copy link
Member

lhotari commented Feb 3, 2025

Search before asking

  • I searched in the issues and found nothing similar.

Read release policy

  • I understand that unsupported versions don't get bug fixes. I will attempt to reproduce the issue on a supported version of Pulsar client and Pulsar broker.

Version

master branch code analysis

Minimal reproduce step

There's currently an issue that the org.apache.pulsar.broker.service.ServerCnx#completedSendOperation might not get called in error cases.
The impact of this is that message publishing could stop for all connections using a particular IO thread.

The broker maxMessagePublishBufferSizeInMB limit is split into a maxPendingBytesPerThread limit:

this.maxPendingBytesPerThread = conf.getMaxMessagePublishBufferSizeInMB() * 1024L * 1024L
/ conf.getNumIOThreads();

The pending bytes is incremented in sending:

PendingBytesPerThreadTracker.getInstance().incrementPublishBytes(msgSize, maxPendingBytesPerThread);

It is decremented in ServerCnx#completedSendOperation method:

public void completedSendOperation(boolean isNonPersistentTopic, int msgSize) {
PendingBytesPerThreadTracker.getInstance().decrementPublishBytes(msgSize, resumeThresholdPendingBytesPerThread);

If the call to decrement is missing, there will be a leak which will eventually cause all message publishing to stop for all connections using a particular IO thread.

The leak happens here:

public synchronized void addFailed(ManagedLedgerException exception, Object ctx) {
/* If the topic is being transferred(in the Releasing bundle state),
we don't want to forcefully close topic here.
Instead, we will rely on the service unit state channel's bundle(topic) transfer protocol.
At the end of the transfer protocol, at Owned state, the source broker should close the topic properly.
*/
if (transferring) {
if (log.isDebugEnabled()) {
log.debug("[{}] Failed to persist msg in store: {} while transferring.",
topic, exception.getMessage(), exception);
}
return;
}
PublishContext callback = (PublishContext) ctx;
if (exception instanceof ManagedLedgerFencedException) {
// If the managed ledger has been fenced, we cannot continue using it. We need to close and reopen
close();

There should be a call to MessagePublishContext#completed for all exception cases. ServerCnx#completedSendOperation gets called for exception path in MessagePublishContext#completed here:

public void completed(Exception exception, long ledgerId, long entryId) {
if (exception != null) {
final ServerError serverError = getServerError(exception);
producer.cnx.execute(() -> {
// if the topic is transferring, we don't send error code to the clients.
if (producer.getTopic().isTransferring()) {
if (log.isDebugEnabled()) {
log.debug("[{}] Received producer exception: {} while transferring.",
producer.getTopic().getName(), exception.getMessage(), exception);
}
} else if (!(exception instanceof TopicClosedException)) {
// For TopicClosed exception there's no need to send explicit error, since the client was
// already notified
// For TopicClosingOrDeleting exception, a notification will be sent separately
long callBackSequenceId = Math.max(highestSequenceId, sequenceId);
producer.cnx.getCommandSender().sendSendError(producer.producerId, callBackSequenceId,
serverError, exception.getMessage());
}
producer.cnx.completedSendOperation(producer.isNonPersistentTopic, msgSize);

The other exception cases contain the required call to callback.completed which will call ServerCnx#completedSendOperation:

if (exception instanceof ManagedLedgerAlreadyClosedException) {
if (log.isDebugEnabled()) {
log.debug("[{}] Failed to persist msg in store: {}", topic, exception.getMessage());
}
callback.completed(new TopicClosedException(exception), -1, -1);
return;
} else {
log.warn("[{}] Failed to persist msg in store: {}", topic, exception.getMessage());
}
if (exception instanceof ManagedLedgerTerminatedException && !isMigrated()) {
// Signal the producer that this topic is no longer available
callback.completed(new TopicTerminatedException(exception), -1, -1);
} else {
// Use generic persistence exception
callback.completed(new PersistenceException(exception), -1, -1);
}

What did you expect to see?

There shouldn't be a leak in maxPendingBytesPerThread permits which eventually leads to message publishing stopping for all connections using a particular IO thread.

What did you see instead?

Based on the analysis of the code, there's a leak.

Anything else?

This might be related to issue #23920

A heap dump could be used to check if the issue applies. This can be done by searching org.apache.pulsar.broker.service.ServerCnx$PendingBytesPerThreadTracker instances in the heap dump and checking the pendingBytes and limitExceeded field values.

Are you willing to submit a PR?

  • I'm willing to submit a PR!
@lhotari lhotari added the type/bug The PR fixed a bug or issue reported a bug label Feb 3, 2025
@lhotari lhotari self-assigned this Feb 3, 2025
@lhotari
Copy link
Member Author

lhotari commented Feb 3, 2025

This is also a location where permits can leak:

if (lowestSequenceId > highestSequenceId) {
cnx.execute(() -> {
cnx.getCommandSender().sendSendError(producerId, highestSequenceId, ServerError.MetadataError,
"Invalid lowest or highest sequence id");
cnx.completedSendOperation(isNonPersistentTopic, headersAndPayload.readableBytes());
});
return;
}

headersAndPayload is already released and might be recycled when the code gets executed.

@lhotari
Copy link
Member Author

lhotari commented Feb 3, 2025

similar problems in all of these completedSendOperation calls:

if (!isShadowTopic && position != null) {
cnx.execute(() -> {
cnx.getCommandSender().sendSendError(producerId, sequenceId, ServerError.NotAllowedError,
"Only shadow topic supports sending messages with messageId");
cnx.completedSendOperation(isNonPersistentTopic, headersAndPayload.readableBytes());
});
return false;
}
if (isShadowTopic && position == null) {
cnx.execute(() -> {
cnx.getCommandSender().sendSendError(producerId, sequenceId, ServerError.NotAllowedError,
"Cannot send messages to a shadow topic");
cnx.completedSendOperation(isNonPersistentTopic, headersAndPayload.readableBytes());
});
return false;
}
if (isClosed) {
cnx.execute(() -> {
cnx.getCommandSender().sendSendError(producerId, sequenceId, ServerError.PersistenceError,
"Producer is closed");
cnx.completedSendOperation(isNonPersistentTopic, headersAndPayload.readableBytes());
});
return false;
}
if (!verifyChecksum(headersAndPayload)) {
cnx.execute(() -> {
cnx.getCommandSender().sendSendError(producerId, sequenceId, ServerError.ChecksumError,
"Checksum failed on the broker");
cnx.completedSendOperation(isNonPersistentTopic, headersAndPayload.readableBytes());
});
return false;
}
if (topic.isEncryptionRequired()) {
headersAndPayload.markReaderIndex();
MessageMetadata msgMetadata = Commands.parseMessageMetadata(headersAndPayload);
headersAndPayload.resetReaderIndex();
int encryptionKeysCount = msgMetadata.getEncryptionKeysCount();
// Check whether the message is encrypted or not
if (encryptionKeysCount < 1) {
log.warn("[{}] Messages must be encrypted", getTopic().getName());
cnx.execute(() -> {
cnx.getCommandSender().sendSendError(producerId, sequenceId, ServerError.MetadataError,
"Messages must be encrypted");
cnx.completedSendOperation(isNonPersistentTopic, headersAndPayload.readableBytes());
});
return false;
}
}

Well these contain another additional problem. The "permits" get returned without being even acquired.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/bug The PR fixed a bug or issue reported a bug
Projects
None yet
Development

No branches or pull requests

1 participant