Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1282,7 +1282,69 @@ public int getNonContiguousDeletedMessagesRangeSerializedSize() {

@Override
public long getEstimatedSizeSinceMarkDeletePosition() {
return ledger.estimateBacklogFromPosition(markDeletePosition);
long totalSize = ledger.estimateBacklogFromPosition(markDeletePosition);

// Need to subtract size of individual deleted messages
if (log.isDebugEnabled()) {
log.debug("[{}] Calculating backlog size for cursor {} from position {}, totalSize: {}",
ledger.getName(), name, markDeletePosition, totalSize);
}

// Get count of individually deleted entries in the backlog range
long deletedCount = 0;
lock.readLock().lock();
try {
Range<Position> backlogRange = Range.openClosed(markDeletePosition, ledger.getLastPosition());

if (getConfig().isUnackedRangesOpenCacheSetEnabled()) {
deletedCount = individualDeletedMessages.cardinality(
backlogRange.lowerEndpoint().getLedgerId(), backlogRange.lowerEndpoint().getEntryId(),
backlogRange.upperEndpoint().getLedgerId(), backlogRange.upperEndpoint().getEntryId());
} else {
AtomicLong deletedCounter = new AtomicLong(0);
individualDeletedMessages.forEach((r) -> {
if (r.isConnected(backlogRange)) {
Range<Position> intersection = r.intersection(backlogRange);
long countInRange = ledger.getNumberOfEntries(intersection);
deletedCounter.addAndGet(countInRange);
}
return true;
}, recyclePositionRangeConverter);
deletedCount = deletedCounter.get();
}
} finally {
lock.readLock().unlock();
}
Comment on lines +1293 to +1317
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This part seems to duplicate logic that currently exists in getNumberOfEntries method. Avoiding code duplication would be preferred. Would it be possible to extract a common method that is used in both getEstimatedSizeSinceMarkDeletePosition and getNumberOfEntries?


if (deletedCount == 0) {
return totalSize;
}

// Estimate size by using average entry size from the backlog range
Range<Position> backlogRange = Range.openClosed(markDeletePosition, ledger.getLastPosition());
long totalEntriesInBacklog = ledger.getNumberOfEntries(backlogRange);

if (totalEntriesInBacklog <= deletedCount || totalEntriesInBacklog == 0) {
// Should not happen, but avoid division by zero
log.warn("[{}] [{}] Inconsistent state: totalEntriesInBacklog={}, deletedCount={}",
ledger.getName(), name, totalEntriesInBacklog, deletedCount);
return Math.max(0, totalSize); // Return the total size and log the issue
}

// Calculate average size in the backlog range
long averageSize = totalSize / totalEntriesInBacklog;

// Subtract size of deleted entries
long deletedSize = deletedCount * averageSize;
long adjustedSize = totalSize - deletedSize;

if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Adjusted backlog size: totalSize={}, deletedCount={}, averageSize={}, "
+ "deletedSize={}, adjustedSize={}",
ledger.getName(), name, totalSize, deletedCount, averageSize, deletedSize, adjustedSize);
}

return adjustedSize;
}

private long getNumberOfEntriesInBacklog() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3524,6 +3524,81 @@ public void testEstimatedUnackedSize() throws Exception {
assertEquals(cursor.getEstimatedSizeSinceMarkDeletePosition(), 10 * entryData.length);
}

/**
* Test that cursor.getEstimatedSizeSinceMarkDeletePosition() correctly accounts for individual
* message deletions (asyncDelete/individual ack).
*
* This verifies the fix: when messages are acknowledged out of order using asyncDelete,
* the backlog size is now correctly adjusted to reflect the individually deleted messages.
*/
@Test(timeOut = 20000)
public void testEstimatedSizeWithIndividualAcks() throws Exception {
ManagedLedger ledger = factory.open("test_cursor_backlog_size_individual_acks");
ManagedCursor cursor = ledger.openCursor("c1");

// Each entry is 100 bytes
byte[] entryData = new byte[100];

// Add 5 entries: positions should be 0:0, 0:1, 0:2, 0:3, 0:4
List<Position> positions = new ArrayList<>();
for (int i = 0; i < 5; i++) {
positions.add(ledger.addEntry(entryData));
}

// Initial state: 5 entries * 100 bytes = 500 bytes
assertEquals(cursor.getEstimatedSizeSinceMarkDeletePosition(), 500);

// Read all entries so they can be acknowledged
List<Entry> entries = cursor.readEntries(5);
assertEquals(entries.size(), 5);
entries.forEach(Entry::release);

// Individual acknowledge positions 1, 3, 4 (leaving 0:0 and 0:2 unacknowledged)
AtomicInteger callbackCount = new AtomicInteger(0);
CountDownLatch latch = new CountDownLatch(3);

DeleteCallback callback = new DeleteCallback() {
@Override
public void deleteComplete(Object ctx) {
callbackCount.incrementAndGet();
latch.countDown();
}

@Override
public void deleteFailed(ManagedLedgerException exception, Object ctx) {
latch.countDown();
}
};

cursor.asyncDelete(positions.get(1), callback, null);
cursor.asyncDelete(positions.get(3), callback, null);
cursor.asyncDelete(positions.get(4), callback, null);

// Wait for async operations to complete
assertTrue(latch.await(5, TimeUnit.SECONDS), "Deletes should complete");
assertEquals(callbackCount.get(), 3, "All 3 deletes should succeed");

// Get current state
// After fix: should now account for individual deleted messages
long expectedBacklogSize = 200; // 2 remaining entries (0:0, 0:2) * 100 bytes
long actualBacklogSize = cursor.getEstimatedSizeSinceMarkDeletePosition();
Position markDeletePos = cursor.getMarkDeletedPosition();

log.info("Backlog size after individual acks:");
log.info(" Expected: {}. Actual: {}", expectedBacklogSize, actualBacklogSize);
log.info(" Mark delete position: {}", markDeletePos);
log.info(" Individual deleted: {}", ((ManagedCursorImpl) cursor).getIndividuallyDeletedMessagesSet());

// After fix: backlog size should now correctly account for individual deletions
assertEquals(actualBacklogSize, expectedBacklogSize,
"Backlog size should account for individual deletions");

// Verify both count and size are correct
assertEquals(cursor.getNumberOfEntriesInBacklog(true), 2, "Backlog count should be 2");

ledger.close();
}

@Test(timeOut = 20000)
public void testRecoverCursorAheadOfLastPosition() throws Exception {
final String mlName = "my_test_ledger";
Expand Down
Loading