From 65e69af0e530dadfe59373887f50e488f78dca01 Mon Sep 17 00:00:00 2001 From: Penghui Li Date: Wed, 17 Dec 2025 14:08:13 -0800 Subject: [PATCH 1/2] [fix][ml] Fix cursor backlog size to account for individual acks Fixes the issue where cursor.getEstimatedSizeSinceMarkDeletePosition() would not subtract the size of individually acknowledged messages. Problem: - When messages are acknowledged out of order using asyncDelete(), the backlog count correctly decreases - But the backlog size remains inflated because individualDeletedMessages are not accounted for Example: - 5 messages (500 bytes total) - Individual ack of 3 messages - Before fix: backlog size = 500 (incorrect) - After fix: backlog size = 200 (correct) Fix: - Update ManagedCursorImpl.getEstimatedSizeSinceMarkDeletePosition() to calculate and subtract size of individual deleted entries - Uses average entry size to estimate deleted size - Thread-safe with proper locking Unit test: - testEstimatedSizeWithIndividualAcks() verifies the fix --- .../mledger/impl/ManagedCursorImpl.java | 62 ++++++++++++++- .../mledger/impl/ManagedCursorTest.java | 75 +++++++++++++++++++ 2 files changed, 136 insertions(+), 1 deletion(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 137be071d672f..12126e7f816ad 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -1282,7 +1282,67 @@ public int getNonContiguousDeletedMessagesRangeSerializedSize() { @Override public long getEstimatedSizeSinceMarkDeletePosition() { - return ledger.estimateBacklogFromPosition(markDeletePosition); + long totalSize = ledger.estimateBacklogFromPosition(markDeletePosition); + + // Need to subtract size of individual deleted messages + if (log.isDebugEnabled()) { + log.debug("[{}] Calculating backlog size for cursor {} from position {}, totalSize: {}", + ledger.getName(), name, markDeletePosition, totalSize); + } + + // Get count of individually deleted entries in the backlog range + long deletedCount = 0; + lock.readLock().lock(); + try { + Range backlogRange = Range.openClosed(markDeletePosition, ledger.getLastPosition()); + + if (getConfig().isUnackedRangesOpenCacheSetEnabled()) { + deletedCount = individualDeletedMessages.cardinality( + backlogRange.lowerEndpoint().getLedgerId(), backlogRange.lowerEndpoint().getEntryId(), + backlogRange.upperEndpoint().getLedgerId(), backlogRange.upperEndpoint().getEntryId()); + } else { + AtomicLong deletedCounter = new AtomicLong(0); + individualDeletedMessages.forEach((r) -> { + if (r.isConnected(backlogRange)) { + Range intersection = r.intersection(backlogRange); + long countInRange = ledger.getNumberOfEntries(intersection); + deletedCounter.addAndGet(countInRange); + } + return true; + }, recyclePositionRangeConverter); + deletedCount = deletedCounter.get(); + } + } finally { + lock.readLock().unlock(); + } + + if (deletedCount == 0) { + return totalSize; + } + + // Estimate size by using average entry size from the backlog range + long totalEntriesInBacklog = ledger.getNumberOfEntries(Range.openClosed(markDeletePosition, ledger.getLastPosition())); + + if (totalEntriesInBacklog <= deletedCount || totalEntriesInBacklog == 0) { + // Should not happen, but avoid division by zero + log.warn("[{}] [{}] Inconsistent state: totalEntriesInBacklog={}, deletedCount={}", + ledger.getName(), name, totalEntriesInBacklog, deletedCount); + return Math.max(0, totalSize); // Return the total size and log the issue + } + + // Calculate average size in the backlog range + long averageSize = totalSize / totalEntriesInBacklog; + + // Subtract size of deleted entries + long deletedSize = deletedCount * averageSize; + long adjustedSize = totalSize - deletedSize; + + if (log.isDebugEnabled()) { + log.debug("[{}] [{}] Adjusted backlog size: totalSize={}, deletedCount={}, averageSize={}, deletedSize={}, adjustedSize={}", + ledger.getName(), name, totalSize, deletedCount, averageSize, deletedSize, adjustedSize); + } + + return adjustedSize; } private long getNumberOfEntriesInBacklog() { diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java index 6a50f7404b509..518213e833e7d 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java @@ -3524,6 +3524,81 @@ public void testEstimatedUnackedSize() throws Exception { assertEquals(cursor.getEstimatedSizeSinceMarkDeletePosition(), 10 * entryData.length); } + /** + * Test that cursor.getEstimatedSizeSinceMarkDeletePosition() correctly accounts for individual + * message deletions (asyncDelete/individual ack). + * + * This verifies the fix: when messages are acknowledged out of order using asyncDelete, + * the backlog size is now correctly adjusted to reflect the individually deleted messages. + */ + @Test(timeOut = 20000) + public void testEstimatedSizeWithIndividualAcks() throws Exception { + ManagedLedger ledger = factory.open("test_cursor_backlog_size_individual_acks"); + ManagedCursor cursor = ledger.openCursor("c1"); + + // Each entry is 100 bytes + byte[] entryData = new byte[100]; + + // Add 5 entries: positions should be 0:0, 0:1, 0:2, 0:3, 0:4 + List positions = new ArrayList<>(); + for (int i = 0; i < 5; i++) { + positions.add(ledger.addEntry(entryData)); + } + + // Initial state: 5 entries * 100 bytes = 500 bytes + assertEquals(cursor.getEstimatedSizeSinceMarkDeletePosition(), 500); + + // Read all entries so they can be acknowledged + List entries = cursor.readEntries(5); + assertEquals(entries.size(), 5); + entries.forEach(Entry::release); + + // Individual acknowledge positions 1, 3, 4 (leaving 0:0 and 0:2 unacknowledged) + AtomicInteger callbackCount = new AtomicInteger(0); + CountDownLatch latch = new CountDownLatch(3); + + DeleteCallback callback = new DeleteCallback() { + @Override + public void deleteComplete(Object ctx) { + callbackCount.incrementAndGet(); + latch.countDown(); + } + + @Override + public void deleteFailed(ManagedLedgerException exception, Object ctx) { + latch.countDown(); + } + }; + + cursor.asyncDelete(positions.get(1), callback, null); + cursor.asyncDelete(positions.get(3), callback, null); + cursor.asyncDelete(positions.get(4), callback, null); + + // Wait for async operations to complete + assertTrue(latch.await(5, TimeUnit.SECONDS), "Deletes should complete"); + assertEquals(callbackCount.get(), 3, "All 3 deletes should succeed"); + + // Get current state + // After fix: should now account for individual deleted messages + long expectedBacklogSize = 200; // 2 remaining entries (0:0, 0:2) * 100 bytes + long actualBacklogSize = cursor.getEstimatedSizeSinceMarkDeletePosition(); + Position markDeletePos = cursor.getMarkDeletedPosition(); + + log.info("Backlog size after individual acks:"); + log.info(" Expected: {}. Actual: {}", expectedBacklogSize, actualBacklogSize); + log.info(" Mark delete position: {}", markDeletePos); + log.info(" Individual deleted: {}", ((ManagedCursorImpl) cursor).getIndividuallyDeletedMessagesSet()); + + // After fix: backlog size should now correctly account for individual deletions + assertEquals(actualBacklogSize, expectedBacklogSize, + "Backlog size should account for individual deletions"); + + // Verify both count and size are correct + assertEquals(cursor.getNumberOfEntriesInBacklog(true), 2, "Backlog count should be 2"); + + ledger.close(); + } + @Test(timeOut = 20000) public void testRecoverCursorAheadOfLastPosition() throws Exception { final String mlName = "my_test_ledger"; From 1a24e0b0cbd86eb16f214a5a24897fb59cc45580 Mon Sep 17 00:00:00 2001 From: Penghui Li Date: Wed, 17 Dec 2025 14:23:29 -0800 Subject: [PATCH 2/2] [fix] Fix checkstyle line length violations Fixes line 1324 (127 char) and line 1342 (136 char) to be under 120 characters] --- .../apache/bookkeeper/mledger/impl/ManagedCursorImpl.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 12126e7f816ad..c0f29ea5c22b7 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -1321,7 +1321,8 @@ public long getEstimatedSizeSinceMarkDeletePosition() { } // Estimate size by using average entry size from the backlog range - long totalEntriesInBacklog = ledger.getNumberOfEntries(Range.openClosed(markDeletePosition, ledger.getLastPosition())); + Range backlogRange = Range.openClosed(markDeletePosition, ledger.getLastPosition()); + long totalEntriesInBacklog = ledger.getNumberOfEntries(backlogRange); if (totalEntriesInBacklog <= deletedCount || totalEntriesInBacklog == 0) { // Should not happen, but avoid division by zero @@ -1338,7 +1339,8 @@ public long getEstimatedSizeSinceMarkDeletePosition() { long adjustedSize = totalSize - deletedSize; if (log.isDebugEnabled()) { - log.debug("[{}] [{}] Adjusted backlog size: totalSize={}, deletedCount={}, averageSize={}, deletedSize={}, adjustedSize={}", + log.debug("[{}] [{}] Adjusted backlog size: totalSize={}, deletedCount={}, averageSize={}, " + + "deletedSize={}, adjustedSize={}", ledger.getName(), name, totalSize, deletedCount, averageSize, deletedSize, adjustedSize); }