-
Notifications
You must be signed in to change notification settings - Fork 3.7k
[fix][broker] fix delayedMessagesCount error in InMemoryDelayedDeliveryTracker #25076
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
| .computeIfAbsent(timestamp, k -> new Long2ObjectRBTreeMap<>()) | ||
| .computeIfAbsent(ledgerId, k -> new Roaring64Bitmap()); | ||
| if (!roaring64Bitmap.contains(entryId)) { | ||
| roaring64Bitmap.add(entryId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like .addLong should be used in the Roaring64Bitmap API.
| roaring64Bitmap.add(entryId); | |
| roaring64Bitmap.addLong(entryId); |
The .add method works too, but the method signature takes a long array (long...). Perhaps the compiler is able to optimize that, so it might not make a difference.
It's unfortunate that Roaring64Bitmap doesn't have the checkedAdd method as there is in RoaringBitmap. That would eliminate the need for the .contains check.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, unfortunately.
"add" or "addLong" I guess is the same after compile, sure it may be better to use "addLong" directly.
Besides, I think there is no need to consider concurrent situation in InMemoryDelayedDeliveryTracker. I don't see any code point out that concurrent situation would occur.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Threading question: addMessage() and getScheduledMessages() are invoked under synchronized (this) in the dispatcher (e.g. PersistentDispatcherMultipleConsumers#trackDelayedDelivery), but clearDelayedMessages() doesn’t seem synchronized and InMemoryDelayedDeliveryTracker#clear() isn’t synchronized either.
Is clear() guaranteed to be called under the same lock, or should we align with BucketDelayedDeliveryTracker#clear() (synchronized) to avoid concurrent access to delayedMessageMap/bitmaps?
|
btw. this code location was discussed in the review: https://github.com/apache/pulsar/pull/24430/changes#r2156278377 |
Denovo1998
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left some comments.
| updateTimer(); | ||
|
|
||
| checkAndUpdateHighest(deliverAt); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One thought: should updateTimer() and checkAndUpdateHighest(deliverAt) run only when we actually insert a new entryId?
With the current structure, duplicate addMessage() calls still update highestDeliveryTimeTracked / messagesHaveFixedDelay, which could disable the fixed-delay optimization even though the tracker state didn’t change.”
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great catch. I have checked the earliest fixed delay implementation in #16609. In the earliest implementation, the issue is already exist. When duplicate addMessage(), highestDeliveryTimeTracked is ok, but messagesHaveFixedDelay would be set to false incorrectly.
I would check why exist duplicate addMessage() later. And I prefer that we open another pr to fix the additional issue.
| log.error("[{}] Delayed message tracker getScheduledMessages should not < 0, number is: {}", | ||
| dispatcher.getName(), n); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
About the new n < 0 branch: this should be unreachable in normal flow. One potential way to hit it is int overflow from int cardinality = (int) entryIds.getLongCardinality().
Would it be better to keep cardinality as long (and compare cardinality <= (long) n) to eliminate overflow, instead of only logging when n < 0?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right. I think it is another issue and both use long value is better. Do you think we fix it in this pr or you push another pr to fix?
|
|
||
|
|
||
|
|
||
| // case2: addMessage() with duplicate entryId, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In case2 the comment says it enters cardinality > n, but with getScheduledMessages(10) and 4 unique entryIds it should hit the cardinality <= n branch. Could we adjust the comment to match the scenario (case3 seems to be the one exercising cardinality > n)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. have changed the comment
@thetumbled Do you have a chance to review the current PR? |
Maybe we should figure out the reason why duplicate entry IDs are added multiple times if this class does not intentionally allow that behavior. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This pull request fixes a bug in the InMemoryDelayedDeliveryTracker where duplicate message entries could cause incorrect delayed message counts, potentially leading to NPE issues. The fix adds a duplicate check before incrementing the counter and improves error handling for edge cases.
Key changes:
- Add duplicate entry check in
addMessage()usingRoaring64Bitmap.contains()before adding entries - Improve error handling in
getScheduledMessages()to explicitly handle and log then < 0case - Add comprehensive test coverage for duplicate entry scenarios across multiple test cases
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated no comments.
| File | Description |
|---|---|
| pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java | Implements duplicate entry check in addMessage() and enhances error handling in getScheduledMessages() |
| pulsar-broker/src/test/java/org/apache/pulsar/broker/delayed/InMemoryDeliveryTrackerTest.java | Adds comprehensive test method testDelayedMessagesCountWithDuplicateEntryId() covering three scenarios: multiple timestamps with duplicates, single timestamp with duplicates, and partial retrieval with duplicates |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
d12846f to
9b562dc
Compare
Motivation
Occur a NPE issue in delay message. And then find the reason is in delayedMessagesCount. When InMemoryDelayedDeliveryTracker#addMessage(), it don't judge whether the entryId is exist in roaringbitmap, that result in the delayedMessagesCount of the map size is not correct.
Modifications
Verifying this change
Documentation
docdoc-requireddoc-not-neededdoc-complete