Skip to content

Commit af92a5f

Browse files
authored
[fix][broker] Fix concurrency bug in BucketDelayedDeliveryTracker (#25346)
1 parent ff9f1b9 commit af92a5f

1 file changed

Lines changed: 2 additions & 40 deletions

File tree

pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java

Lines changed: 2 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@
4343
import java.util.concurrent.TimeUnit;
4444
import java.util.concurrent.TimeoutException;
4545
import java.util.concurrent.atomic.AtomicLong;
46-
import java.util.concurrent.locks.StampedLock;
4746
import java.util.stream.Collectors;
4847
import javax.annotation.concurrent.ThreadSafe;
4948
import lombok.Getter;
@@ -94,8 +93,6 @@ public static record SnapshotKey(long ledgerId, long entryId) {}
9493

9594
private final AtomicLong numberDelayedMessages = new AtomicLong(0);
9695

97-
// Thread safety locks
98-
private final StampedLock stampedLock = new StampedLock();
9996

10097
@Getter
10198
@VisibleForTesting
@@ -577,24 +574,7 @@ public synchronized boolean hasMessageAvailable() {
577574
}
578575

579576
@Override
580-
protected long nextDeliveryTime() {
581-
// Use optimistic read for frequently called method
582-
long stamp = stampedLock.tryOptimisticRead();
583-
long result = nextDeliveryTimeUnsafe();
584-
585-
586-
if (!stampedLock.validate(stamp)) {
587-
stamp = stampedLock.readLock();
588-
try {
589-
result = nextDeliveryTimeUnsafe();
590-
} finally {
591-
stampedLock.unlockRead(stamp);
592-
}
593-
}
594-
return result;
595-
}
596-
597-
private long nextDeliveryTimeUnsafe() {
577+
protected synchronized long nextDeliveryTime() {
598578
if (lastMutableBucket.isEmpty() && !sharedBucketPriorityQueue.isEmpty()) {
599579
return sharedBucketPriorityQueue.peekN1();
600580
} else if (sharedBucketPriorityQueue.isEmpty() && !lastMutableBucket.isEmpty()) {
@@ -788,25 +768,7 @@ private boolean removeIndexBit(long ledgerId, long entryId) {
788768
.orElse(false);
789769
}
790770

791-
public boolean containsMessage(long ledgerId, long entryId) {
792-
// Try optimistic read first for best performance
793-
long stamp = stampedLock.tryOptimisticRead();
794-
boolean result = containsMessageUnsafe(ledgerId, entryId);
795-
796-
797-
if (!stampedLock.validate(stamp)) {
798-
// Fall back to read lock if validation fails
799-
stamp = stampedLock.readLock();
800-
try {
801-
result = containsMessageUnsafe(ledgerId, entryId);
802-
} finally {
803-
stampedLock.unlockRead(stamp);
804-
}
805-
}
806-
return result;
807-
}
808-
809-
private boolean containsMessageUnsafe(long ledgerId, long entryId) {
771+
public synchronized boolean containsMessage(long ledgerId, long entryId) {
810772
if (lastMutableBucket.containsMessage(ledgerId, entryId)) {
811773
return true;
812774
}

0 commit comments

Comments
 (0)