Skip to content

Commit d162126

Browse files
fix: concurrent modification of processing receievd messages (googleapis#1807)
* fix: concurrent modification of processing receievd messages * Removing synchronized keyword, and making outstandingReceipts into a concurrentMap * Removing synchronized keyword for notifyAckSuccess and failure as well * fixing lint * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md --------- Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
1 parent e179b94 commit d162126

File tree

1 file changed

+4
-5
lines changed

1 file changed

+4
-5
lines changed

google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/MessageDispatcher.java

+4-5
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
import java.util.ArrayList;
3232
import java.util.HashMap;
3333
import java.util.Iterator;
34-
import java.util.LinkedHashMap;
3534
import java.util.List;
3635
import java.util.Map;
3736
import java.util.Map.Entry;
@@ -92,8 +91,8 @@ class MessageDispatcher {
9291
private final LinkedBlockingQueue<AckRequestData> pendingAcks = new LinkedBlockingQueue<>();
9392
private final LinkedBlockingQueue<AckRequestData> pendingNacks = new LinkedBlockingQueue<>();
9493
private final LinkedBlockingQueue<AckRequestData> pendingReceipts = new LinkedBlockingQueue<>();
95-
private final LinkedHashMap<String, ReceiptCompleteData> outstandingReceipts =
96-
new LinkedHashMap<String, ReceiptCompleteData>();
94+
private final ConcurrentMap<String, ReceiptCompleteData> outstandingReceipts =
95+
new ConcurrentHashMap<String, ReceiptCompleteData>();
9796
private final AtomicInteger messageDeadlineSeconds = new AtomicInteger();
9897
private final AtomicBoolean extendDeadline = new AtomicBoolean(true);
9998
private final Lock jobLock;
@@ -411,7 +410,7 @@ void processReceivedMessages(List<ReceivedMessage> messages) {
411410
processBatch(outstandingBatch);
412411
}
413412

414-
synchronized void notifyAckSuccess(AckRequestData ackRequestData) {
413+
void notifyAckSuccess(AckRequestData ackRequestData) {
415414

416415
if (outstandingReceipts.containsKey(ackRequestData.getAckId())) {
417416
outstandingReceipts.get(ackRequestData.getAckId()).notifyReceiptComplete();
@@ -437,7 +436,7 @@ synchronized void notifyAckSuccess(AckRequestData ackRequestData) {
437436
}
438437
}
439438

440-
synchronized void notifyAckFailed(AckRequestData ackRequestData) {
439+
void notifyAckFailed(AckRequestData ackRequestData) {
441440
outstandingReceipts.remove(ackRequestData.getAckId());
442441
}
443442

0 commit comments

Comments
 (0)