From 712e2f6aa48124cfdaec36635a57c2390d460aad Mon Sep 17 00:00:00 2001 From: fanjianye Date: Tue, 16 Dec 2025 10:33:05 +0800 Subject: [PATCH 1/3] fix error in delayedMessagesCount --- .../InMemoryDelayedDeliveryTracker.java | 18 +++-- .../delayed/InMemoryDeliveryTrackerTest.java | 65 +++++++++++++++++++ 2 files changed, 78 insertions(+), 5 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java index ad5ab25fbbf6b..d14f2ff294387 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java @@ -126,10 +126,13 @@ public boolean addMessage(long ledgerId, long entryId, long deliverAt) { } long timestamp = trimLowerBit(deliverAt, timestampPrecisionBitCnt); - delayedMessageMap.computeIfAbsent(timestamp, k -> new Long2ObjectRBTreeMap<>()) - .computeIfAbsent(ledgerId, k -> new Roaring64Bitmap()) - .add(entryId); - delayedMessagesCount.incrementAndGet(); + Roaring64Bitmap roaring64Bitmap = delayedMessageMap + .computeIfAbsent(timestamp, k -> new Long2ObjectRBTreeMap<>()) + .computeIfAbsent(ledgerId, k -> new Roaring64Bitmap()); + if (!roaring64Bitmap.contains(entryId)) { + roaring64Bitmap.add(entryId); + delayedMessagesCount.incrementAndGet(); + } updateTimer(); @@ -200,7 +203,12 @@ public NavigableSet getScheduledMessages(int maxMessages) { delayedMessagesCount.addAndGet(-n); n = 0; } - if (n <= 0) { + if (n == 0) { + break; + } else if (n < 0) { + // should not go into this situation + log.error("[{}] Delayed message tracker getScheduledMessages should not < 0, number is: {}", + dispatcher.getName(), n); break; } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java index c5a564d1b664b..c7834ea2e3c10 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java @@ -274,4 +274,69 @@ public void testDelaySequence(InMemoryDelayedDeliveryTracker tracker) throws Exc tracker.close(); } + @Test(dataProvider = "delayedTracker") + public void testDelayedMessagesCountWithDuplicateEntryId(InMemoryDelayedDeliveryTracker tracker) throws Exception { + assertFalse(tracker.hasMessageAvailable()); + + // case1: addMessage() with duplicate entryId, + // getScheduledMessages() enter "cardinality <= n" and make tracker empty + assertTrue(tracker.addMessage(1, 1, 10)); + assertTrue(tracker.addMessage(1, 2, 20)); + assertTrue(tracker.addMessage(1, 2, 20)); + assertTrue(tracker.addMessage(1, 3, 40)); + assertTrue(tracker.addMessage(1, 4, 50)); + + clockTime.set(50); + assertTrue(tracker.hasMessageAvailable()); + assertEquals(tracker.getNumberOfDelayedMessages(), 4L); + assertEquals(tracker.delayedMessageMap.size(), 4L); + + Set scheduled = tracker.getScheduledMessages(10); + assertEquals(scheduled.size(), 4); + assertEquals(tracker.getNumberOfDelayedMessages(), 0L); + + + + // case2: addMessage() with duplicate entryId, + // getScheduledMessages() enter "cardinality > n" and make tracker empty + clockTime.set(0); + assertTrue(tracker.addMessage(1, 1, 10)); + assertTrue(tracker.addMessage(1, 2, 10)); + assertTrue(tracker.addMessage(1, 2, 10)); + assertTrue(tracker.addMessage(1, 3, 10)); + assertTrue(tracker.addMessage(1, 4, 10)); + + clockTime.set(50); + assertTrue(tracker.hasMessageAvailable()); + assertEquals(tracker.getNumberOfDelayedMessages(), 4L); + assertEquals(tracker.delayedMessageMap.size(), 1L); + + scheduled = tracker.getScheduledMessages(10); + assertEquals(scheduled.size(), 4); + assertEquals(tracker.getNumberOfDelayedMessages(), 0L); + + + + // case3: addMessage() with duplicate entryId, + // getScheduledMessages() make tracker remain half cardinality + clockTime.set(0); + assertTrue(tracker.addMessage(1, 1, 10)); + assertTrue(tracker.addMessage(1, 2, 10)); + assertTrue(tracker.addMessage(1, 2, 10)); + assertTrue(tracker.addMessage(1, 3, 10)); + assertTrue(tracker.addMessage(1, 4, 10)); + + clockTime.set(50); + assertTrue(tracker.hasMessageAvailable()); + assertEquals(tracker.getNumberOfDelayedMessages(), 4L); + assertEquals(tracker.delayedMessageMap.size(), 1L); + + scheduled = tracker.getScheduledMessages(2); + assertEquals(scheduled.size(), 2); + assertEquals(tracker.getNumberOfDelayedMessages(), 2L); + + + tracker.close(); + } + } From ca48bb363027f588c3bd46798d1c0ea0ea297dfe Mon Sep 17 00:00:00 2001 From: fanjianye Date: Tue, 16 Dec 2025 18:04:28 +0800 Subject: [PATCH 2/3] use addLong --- .../pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java index d14f2ff294387..d16803d897eea 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java @@ -130,7 +130,7 @@ public boolean addMessage(long ledgerId, long entryId, long deliverAt) { .computeIfAbsent(timestamp, k -> new Long2ObjectRBTreeMap<>()) .computeIfAbsent(ledgerId, k -> new Roaring64Bitmap()); if (!roaring64Bitmap.contains(entryId)) { - roaring64Bitmap.add(entryId); + roaring64Bitmap.addLong(entryId); delayedMessagesCount.incrementAndGet(); } From 9b562dccf145b01aa4af052db9d3a21aa26882a8 Mon Sep 17 00:00:00 2001 From: fanjianye Date: Wed, 17 Dec 2025 11:38:47 +0800 Subject: [PATCH 3/3] change comment --- .../broker/delayed/InMemoryDeliveryTrackerTest.java | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java index c7834ea2e3c10..cbe81212f41dd 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java @@ -279,7 +279,8 @@ public void testDelayedMessagesCountWithDuplicateEntryId(InMemoryDelayedDelivery assertFalse(tracker.hasMessageAvailable()); // case1: addMessage() with duplicate entryId, - // getScheduledMessages() enter "cardinality <= n" and make tracker empty + // getScheduledMessages() enter multiple timestamp and "cardinality <= n", + // finally make tracker empty assertTrue(tracker.addMessage(1, 1, 10)); assertTrue(tracker.addMessage(1, 2, 20)); assertTrue(tracker.addMessage(1, 2, 20)); @@ -298,7 +299,8 @@ public void testDelayedMessagesCountWithDuplicateEntryId(InMemoryDelayedDelivery // case2: addMessage() with duplicate entryId, - // getScheduledMessages() enter "cardinality > n" and make tracker empty + // getScheduledMessages() enter one timestamp and "cardinality <= n", + // finally make tracker empty clockTime.set(0); assertTrue(tracker.addMessage(1, 1, 10)); assertTrue(tracker.addMessage(1, 2, 10)); @@ -318,7 +320,8 @@ public void testDelayedMessagesCountWithDuplicateEntryId(InMemoryDelayedDelivery // case3: addMessage() with duplicate entryId, - // getScheduledMessages() make tracker remain half cardinality + // getScheduledMessages() enter one timestamp and "cardinality > n", + // finally make tracker remain half cardinality clockTime.set(0); assertTrue(tracker.addMessage(1, 1, 10)); assertTrue(tracker.addMessage(1, 2, 10));