Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -126,10 +126,13 @@ public boolean addMessage(long ledgerId, long entryId, long deliverAt) {
}

long timestamp = trimLowerBit(deliverAt, timestampPrecisionBitCnt);
delayedMessageMap.computeIfAbsent(timestamp, k -> new Long2ObjectRBTreeMap<>())
.computeIfAbsent(ledgerId, k -> new Roaring64Bitmap())
.add(entryId);
delayedMessagesCount.incrementAndGet();
Roaring64Bitmap roaring64Bitmap = delayedMessageMap
.computeIfAbsent(timestamp, k -> new Long2ObjectRBTreeMap<>())
.computeIfAbsent(ledgerId, k -> new Roaring64Bitmap());
if (!roaring64Bitmap.contains(entryId)) {
roaring64Bitmap.addLong(entryId);
delayedMessagesCount.incrementAndGet();
}

updateTimer();

Expand Down Expand Up @@ -200,7 +203,12 @@ public NavigableSet<Position> getScheduledMessages(int maxMessages) {
delayedMessagesCount.addAndGet(-n);
n = 0;
}
if (n <= 0) {
if (n == 0) {
break;
} else if (n < 0) {
// should not go into this situation
log.error("[{}] Delayed message tracker getScheduledMessages should not < 0, number is: {}",
dispatcher.getName(), n);
Comment on lines +210 to +211
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

About the new n < 0 branch: this should be unreachable in normal flow. One potential way to hit it is int overflow from int cardinality = (int) entryIds.getLongCardinality().

Would it be better to keep cardinality as long (and compare cardinality <= (long) n) to eliminate overflow, instead of only logging when n < 0?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right. I think it is another issue and both use long value is better. Do you think we fix it in this pr or you push another pr to fix?

break;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,4 +274,72 @@ public void testDelaySequence(InMemoryDelayedDeliveryTracker tracker) throws Exc
tracker.close();
}

@Test(dataProvider = "delayedTracker")
public void testDelayedMessagesCountWithDuplicateEntryId(InMemoryDelayedDeliveryTracker tracker) throws Exception {
assertFalse(tracker.hasMessageAvailable());

// case1: addMessage() with duplicate entryId,
// getScheduledMessages() enter multiple timestamp and "cardinality <= n",
// finally make tracker empty
assertTrue(tracker.addMessage(1, 1, 10));
assertTrue(tracker.addMessage(1, 2, 20));
assertTrue(tracker.addMessage(1, 2, 20));
assertTrue(tracker.addMessage(1, 3, 40));
assertTrue(tracker.addMessage(1, 4, 50));

clockTime.set(50);
assertTrue(tracker.hasMessageAvailable());
assertEquals(tracker.getNumberOfDelayedMessages(), 4L);
assertEquals(tracker.delayedMessageMap.size(), 4L);

Set<Position> scheduled = tracker.getScheduledMessages(10);
assertEquals(scheduled.size(), 4);
assertEquals(tracker.getNumberOfDelayedMessages(), 0L);



// case2: addMessage() with duplicate entryId,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case2 the comment says it enters cardinality > n, but with getScheduledMessages(10) and 4 unique entryIds it should hit the cardinality <= n branch. Could we adjust the comment to match the scenario (case3 seems to be the one exercising cardinality > n)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. have changed the comment

// getScheduledMessages() enter one timestamp and "cardinality <= n",
// finally make tracker empty
clockTime.set(0);
assertTrue(tracker.addMessage(1, 1, 10));
assertTrue(tracker.addMessage(1, 2, 10));
assertTrue(tracker.addMessage(1, 2, 10));
assertTrue(tracker.addMessage(1, 3, 10));
assertTrue(tracker.addMessage(1, 4, 10));

clockTime.set(50);
assertTrue(tracker.hasMessageAvailable());
assertEquals(tracker.getNumberOfDelayedMessages(), 4L);
assertEquals(tracker.delayedMessageMap.size(), 1L);

scheduled = tracker.getScheduledMessages(10);
assertEquals(scheduled.size(), 4);
assertEquals(tracker.getNumberOfDelayedMessages(), 0L);



// case3: addMessage() with duplicate entryId,
// getScheduledMessages() enter one timestamp and "cardinality > n",
// finally make tracker remain half cardinality
clockTime.set(0);
assertTrue(tracker.addMessage(1, 1, 10));
assertTrue(tracker.addMessage(1, 2, 10));
assertTrue(tracker.addMessage(1, 2, 10));
assertTrue(tracker.addMessage(1, 3, 10));
assertTrue(tracker.addMessage(1, 4, 10));

clockTime.set(50);
assertTrue(tracker.hasMessageAvailable());
assertEquals(tracker.getNumberOfDelayedMessages(), 4L);
assertEquals(tracker.delayedMessageMap.size(), 1L);

scheduled = tracker.getScheduledMessages(2);
assertEquals(scheduled.size(), 2);
assertEquals(tracker.getNumberOfDelayedMessages(), 2L);


tracker.close();
}

}