diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java index ad5ab25fbbf6b..d16803d897eea 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java @@ -126,10 +126,13 @@ public boolean addMessage(long ledgerId, long entryId, long deliverAt) { } long timestamp = trimLowerBit(deliverAt, timestampPrecisionBitCnt); - delayedMessageMap.computeIfAbsent(timestamp, k -> new Long2ObjectRBTreeMap<>()) - .computeIfAbsent(ledgerId, k -> new Roaring64Bitmap()) - .add(entryId); - delayedMessagesCount.incrementAndGet(); + Roaring64Bitmap roaring64Bitmap = delayedMessageMap + .computeIfAbsent(timestamp, k -> new Long2ObjectRBTreeMap<>()) + .computeIfAbsent(ledgerId, k -> new Roaring64Bitmap()); + if (!roaring64Bitmap.contains(entryId)) { + roaring64Bitmap.addLong(entryId); + delayedMessagesCount.incrementAndGet(); + } updateTimer(); @@ -200,7 +203,12 @@ public NavigableSet getScheduledMessages(int maxMessages) { delayedMessagesCount.addAndGet(-n); n = 0; } - if (n <= 0) { + if (n == 0) { + break; + } else if (n < 0) { + // should not go into this situation + log.error("[{}] Delayed message tracker getScheduledMessages should not < 0, number is: {}", + dispatcher.getName(), n); break; } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java index c5a564d1b664b..cbe81212f41dd 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java @@ -274,4 +274,72 @@ public void testDelaySequence(InMemoryDelayedDeliveryTracker tracker) throws Exc tracker.close(); } + @Test(dataProvider = "delayedTracker") + public void testDelayedMessagesCountWithDuplicateEntryId(InMemoryDelayedDeliveryTracker tracker) throws Exception { + assertFalse(tracker.hasMessageAvailable()); + + // case1: addMessage() with duplicate entryId, + // getScheduledMessages() enter multiple timestamp and "cardinality <= n", + // finally make tracker empty + assertTrue(tracker.addMessage(1, 1, 10)); + assertTrue(tracker.addMessage(1, 2, 20)); + assertTrue(tracker.addMessage(1, 2, 20)); + assertTrue(tracker.addMessage(1, 3, 40)); + assertTrue(tracker.addMessage(1, 4, 50)); + + clockTime.set(50); + assertTrue(tracker.hasMessageAvailable()); + assertEquals(tracker.getNumberOfDelayedMessages(), 4L); + assertEquals(tracker.delayedMessageMap.size(), 4L); + + Set scheduled = tracker.getScheduledMessages(10); + assertEquals(scheduled.size(), 4); + assertEquals(tracker.getNumberOfDelayedMessages(), 0L); + + + + // case2: addMessage() with duplicate entryId, + // getScheduledMessages() enter one timestamp and "cardinality <= n", + // finally make tracker empty + clockTime.set(0); + assertTrue(tracker.addMessage(1, 1, 10)); + assertTrue(tracker.addMessage(1, 2, 10)); + assertTrue(tracker.addMessage(1, 2, 10)); + assertTrue(tracker.addMessage(1, 3, 10)); + assertTrue(tracker.addMessage(1, 4, 10)); + + clockTime.set(50); + assertTrue(tracker.hasMessageAvailable()); + assertEquals(tracker.getNumberOfDelayedMessages(), 4L); + assertEquals(tracker.delayedMessageMap.size(), 1L); + + scheduled = tracker.getScheduledMessages(10); + assertEquals(scheduled.size(), 4); + assertEquals(tracker.getNumberOfDelayedMessages(), 0L); + + + + // case3: addMessage() with duplicate entryId, + // getScheduledMessages() enter one timestamp and "cardinality > n", + // finally make tracker remain half cardinality + clockTime.set(0); + assertTrue(tracker.addMessage(1, 1, 10)); + assertTrue(tracker.addMessage(1, 2, 10)); + assertTrue(tracker.addMessage(1, 2, 10)); + assertTrue(tracker.addMessage(1, 3, 10)); + assertTrue(tracker.addMessage(1, 4, 10)); + + clockTime.set(50); + assertTrue(tracker.hasMessageAvailable()); + assertEquals(tracker.getNumberOfDelayedMessages(), 4L); + assertEquals(tracker.delayedMessageMap.size(), 1L); + + scheduled = tracker.getScheduledMessages(2); + assertEquals(scheduled.size(), 2); + assertEquals(tracker.getNumberOfDelayedMessages(), 2L); + + + tracker.close(); + } + }