diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index 137be071d672f..c0f29ea5c22b7 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -1282,7 +1282,69 @@ public int getNonContiguousDeletedMessagesRangeSerializedSize() { @Override public long getEstimatedSizeSinceMarkDeletePosition() { - return ledger.estimateBacklogFromPosition(markDeletePosition); + long totalSize = ledger.estimateBacklogFromPosition(markDeletePosition); + + // Need to subtract size of individual deleted messages + if (log.isDebugEnabled()) { + log.debug("[{}] Calculating backlog size for cursor {} from position {}, totalSize: {}", + ledger.getName(), name, markDeletePosition, totalSize); + } + + // Get count of individually deleted entries in the backlog range + long deletedCount = 0; + lock.readLock().lock(); + try { + Range backlogRange = Range.openClosed(markDeletePosition, ledger.getLastPosition()); + + if (getConfig().isUnackedRangesOpenCacheSetEnabled()) { + deletedCount = individualDeletedMessages.cardinality( + backlogRange.lowerEndpoint().getLedgerId(), backlogRange.lowerEndpoint().getEntryId(), + backlogRange.upperEndpoint().getLedgerId(), backlogRange.upperEndpoint().getEntryId()); + } else { + AtomicLong deletedCounter = new AtomicLong(0); + individualDeletedMessages.forEach((r) -> { + if (r.isConnected(backlogRange)) { + Range intersection = r.intersection(backlogRange); + long countInRange = ledger.getNumberOfEntries(intersection); + deletedCounter.addAndGet(countInRange); + } + return true; + }, recyclePositionRangeConverter); + deletedCount = deletedCounter.get(); + } + } finally { + lock.readLock().unlock(); + } + + if (deletedCount == 0) { + return totalSize; + } + + // Estimate size by using average entry size from the backlog range + Range backlogRange = Range.openClosed(markDeletePosition, ledger.getLastPosition()); + long totalEntriesInBacklog = ledger.getNumberOfEntries(backlogRange); + + if (totalEntriesInBacklog <= deletedCount || totalEntriesInBacklog == 0) { + // Should not happen, but avoid division by zero + log.warn("[{}] [{}] Inconsistent state: totalEntriesInBacklog={}, deletedCount={}", + ledger.getName(), name, totalEntriesInBacklog, deletedCount); + return Math.max(0, totalSize); // Return the total size and log the issue + } + + // Calculate average size in the backlog range + long averageSize = totalSize / totalEntriesInBacklog; + + // Subtract size of deleted entries + long deletedSize = deletedCount * averageSize; + long adjustedSize = totalSize - deletedSize; + + if (log.isDebugEnabled()) { + log.debug("[{}] [{}] Adjusted backlog size: totalSize={}, deletedCount={}, averageSize={}, " + + "deletedSize={}, adjustedSize={}", + ledger.getName(), name, totalSize, deletedCount, averageSize, deletedSize, adjustedSize); + } + + return adjustedSize; } private long getNumberOfEntriesInBacklog() { diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java index 6a50f7404b509..518213e833e7d 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java @@ -3524,6 +3524,81 @@ public void testEstimatedUnackedSize() throws Exception { assertEquals(cursor.getEstimatedSizeSinceMarkDeletePosition(), 10 * entryData.length); } + /** + * Test that cursor.getEstimatedSizeSinceMarkDeletePosition() correctly accounts for individual + * message deletions (asyncDelete/individual ack). + * + * This verifies the fix: when messages are acknowledged out of order using asyncDelete, + * the backlog size is now correctly adjusted to reflect the individually deleted messages. + */ + @Test(timeOut = 20000) + public void testEstimatedSizeWithIndividualAcks() throws Exception { + ManagedLedger ledger = factory.open("test_cursor_backlog_size_individual_acks"); + ManagedCursor cursor = ledger.openCursor("c1"); + + // Each entry is 100 bytes + byte[] entryData = new byte[100]; + + // Add 5 entries: positions should be 0:0, 0:1, 0:2, 0:3, 0:4 + List positions = new ArrayList<>(); + for (int i = 0; i < 5; i++) { + positions.add(ledger.addEntry(entryData)); + } + + // Initial state: 5 entries * 100 bytes = 500 bytes + assertEquals(cursor.getEstimatedSizeSinceMarkDeletePosition(), 500); + + // Read all entries so they can be acknowledged + List entries = cursor.readEntries(5); + assertEquals(entries.size(), 5); + entries.forEach(Entry::release); + + // Individual acknowledge positions 1, 3, 4 (leaving 0:0 and 0:2 unacknowledged) + AtomicInteger callbackCount = new AtomicInteger(0); + CountDownLatch latch = new CountDownLatch(3); + + DeleteCallback callback = new DeleteCallback() { + @Override + public void deleteComplete(Object ctx) { + callbackCount.incrementAndGet(); + latch.countDown(); + } + + @Override + public void deleteFailed(ManagedLedgerException exception, Object ctx) { + latch.countDown(); + } + }; + + cursor.asyncDelete(positions.get(1), callback, null); + cursor.asyncDelete(positions.get(3), callback, null); + cursor.asyncDelete(positions.get(4), callback, null); + + // Wait for async operations to complete + assertTrue(latch.await(5, TimeUnit.SECONDS), "Deletes should complete"); + assertEquals(callbackCount.get(), 3, "All 3 deletes should succeed"); + + // Get current state + // After fix: should now account for individual deleted messages + long expectedBacklogSize = 200; // 2 remaining entries (0:0, 0:2) * 100 bytes + long actualBacklogSize = cursor.getEstimatedSizeSinceMarkDeletePosition(); + Position markDeletePos = cursor.getMarkDeletedPosition(); + + log.info("Backlog size after individual acks:"); + log.info(" Expected: {}. Actual: {}", expectedBacklogSize, actualBacklogSize); + log.info(" Mark delete position: {}", markDeletePos); + log.info(" Individual deleted: {}", ((ManagedCursorImpl) cursor).getIndividuallyDeletedMessagesSet()); + + // After fix: backlog size should now correctly account for individual deletions + assertEquals(actualBacklogSize, expectedBacklogSize, + "Backlog size should account for individual deletions"); + + // Verify both count and size are correct + assertEquals(cursor.getNumberOfEntriesInBacklog(true), 2, "Backlog count should be 2"); + + ledger.close(); + } + @Test(timeOut = 20000) public void testRecoverCursorAheadOfLastPosition() throws Exception { final String mlName = "my_test_ledger";