-
Notifications
You must be signed in to change notification settings - Fork 3.7k
[fix][ml] Fix cursor backlog size to account for individual acks #25089
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
[fix][ml] Fix cursor backlog size to account for individual acks #25089
Conversation
Fixes the issue where cursor.getEstimatedSizeSinceMarkDeletePosition() would not subtract the size of individually acknowledged messages. Problem: - When messages are acknowledged out of order using asyncDelete(), the backlog count correctly decreases - But the backlog size remains inflated because individualDeletedMessages are not accounted for Example: - 5 messages (500 bytes total) - Individual ack of 3 messages - Before fix: backlog size = 500 (incorrect) - After fix: backlog size = 200 (correct) Fix: - Update ManagedCursorImpl.getEstimatedSizeSinceMarkDeletePosition() to calculate and subtract size of individual deleted entries - Uses average entry size to estimate deleted size - Thread-safe with proper locking Unit test: - testEstimatedSizeWithIndividualAcks() verifies the fix
|
/pulsarbot run-failure-checks |
Fixes line 1324 (127 char) and line 1342 (136 char) to be under 120 characters]
lhotari
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please check the comment about avoiding code duplication
| // Get count of individually deleted entries in the backlog range | ||
| long deletedCount = 0; | ||
| lock.readLock().lock(); | ||
| try { | ||
| Range<Position> backlogRange = Range.openClosed(markDeletePosition, ledger.getLastPosition()); | ||
|
|
||
| if (getConfig().isUnackedRangesOpenCacheSetEnabled()) { | ||
| deletedCount = individualDeletedMessages.cardinality( | ||
| backlogRange.lowerEndpoint().getLedgerId(), backlogRange.lowerEndpoint().getEntryId(), | ||
| backlogRange.upperEndpoint().getLedgerId(), backlogRange.upperEndpoint().getEntryId()); | ||
| } else { | ||
| AtomicLong deletedCounter = new AtomicLong(0); | ||
| individualDeletedMessages.forEach((r) -> { | ||
| if (r.isConnected(backlogRange)) { | ||
| Range<Position> intersection = r.intersection(backlogRange); | ||
| long countInRange = ledger.getNumberOfEntries(intersection); | ||
| deletedCounter.addAndGet(countInRange); | ||
| } | ||
| return true; | ||
| }, recyclePositionRangeConverter); | ||
| deletedCount = deletedCounter.get(); | ||
| } | ||
| } finally { | ||
| lock.readLock().unlock(); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This part seems to duplicate logic that currently exists in getNumberOfEntries method. Avoiding code duplication would be preferred. Would it be possible to extract a common method that is used in both getEstimatedSizeSinceMarkDeletePosition and getNumberOfEntries?
Fixes the issue where cursor.getEstimatedSizeSinceMarkDeletePosition() would not subtract the size of individually acknowledged messages.
Problem:
Example:
Fix:
Unit test:
Does this pull request potentially affect one of the following parts:
If the box was checked, please highlight the changes
Documentation
docdoc-requireddoc-not-neededdoc-complete