From def62471073e9472668699b70d05e2aa92b00d3f Mon Sep 17 00:00:00 2001 From: gaozhangmin Date: Fri, 19 Dec 2025 18:14:14 +0800 Subject: [PATCH] [improve][broker]Improve ManagedLedger search position by offset --- .../bookkeeper/mledger/ManagedLedger.java | 11 + .../mledger/impl/ManagedLedgerImpl.java | 218 ++++++++++++- .../intercept/ManagedLedgerInterceptor.java | 1 + .../ManagedLedgerInterceptorImpl.java | 1 + ...CustomizedManagedLedgerStorageForTest.java | 5 + .../ManagedLedgerInterceptorImplTest.java | 304 ++++++++++++++++++ .../MLTransactionSequenceIdGenerator.java | 5 + .../jcloud/impl/MockManagedLedger.java | 5 + 8 files changed, 543 insertions(+), 7 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java index 8fb083bcd026c..a50fe75a29faa 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java @@ -711,6 +711,17 @@ default void skipNonRecoverableLedger(long ledgerId){} * */ CompletableFuture asyncFindPosition(Predicate predicate); + /** + * Optimized find position by offset with predicate. + * Uses first entry index metadata to skip ledgers where the target offset + * is smaller than the ledger's first entry index, improving performance. + * + * @param offset the target offset to find + * @param predicate the predicate to test entries + * @return CompletableFuture the position where the predicate matches + */ + CompletableFuture asyncFindPosition(long offset, Predicate predicate); + /** * Get the ManagedLedgerInterceptor for ManagedLedger. * */ diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 4b278cf6664d4..53788dbaf2468 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -157,9 +157,12 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { private static final long MegaByte = 1024 * 1024; + private static final String FIRST_ENTRY_INDEX_PROPS = "firstEntryIndex"; protected static final int AsyncOperationTimeoutSeconds = 30; + // Store the firstEntryIndex calculated during ledger creation for use in createComplete callback + private volatile Long pendingLedgerFirstEntryIndex; protected final BookKeeper bookKeeper; protected final String name; private final Map ledgerMetadata; @@ -459,9 +462,14 @@ public void operationComplete(ManagedLedgerInfo mlInfo, Stat stat) { if (State.Terminated.equals(state)) { currentLedger = lh; } + LedgerInfo oldInfo = ledgers.get(id); + List oldProperties = oldInfo.getPropertiesList(); + Map propertiesMap = new HashMap<>(); + oldProperties.forEach(kv -> propertiesMap.put(kv.getKey(), kv.getValue())); LedgerInfo info = LedgerInfo.newBuilder().setLedgerId(id) .setEntries(lh.getLastAddConfirmed() + 1).setSize(lh.getLength()) .setTimestamp(clock.millis()).build(); + info = updateLedgerProps(info, propertiesMap); ledgers.put(id, info); if (managedLedgerInterceptor != null) { managedLedgerInterceptor @@ -631,9 +639,19 @@ public void operationFailed(MetaStoreException e) { } } - LedgerInfo info = LedgerInfo.newBuilder().setLedgerId(lh.getId()).setTimestamp(0).build(); - ledgers.put(lh.getId(), info); - + if (managedLedgerInterceptor != null) { + long firstEntryIndex = managedLedgerInterceptor.getIndex() + 1; + Map newPropertiesMap = new HashMap<>(); + newPropertiesMap.put(FIRST_ENTRY_INDEX_PROPS, Long.toString(firstEntryIndex)); + LedgerInfo info = LedgerInfo.newBuilder().setLedgerId(lh.getId()).setTimestamp(0).build(); + info = updateLedgerProps(info, newPropertiesMap); + ledgers.put(lh.getId(), info); + log.info("[{}] InitializeBookKeeper " + + "Creating ledger with first entry index: {}", name, firstEntryIndex); + } else { + LedgerInfo info = LedgerInfo.newBuilder().setLedgerId(lh.getId()).setTimestamp(0).build(); + ledgers.put(lh.getId(), info); + } // Save it back to ensure all nodes exist store.asyncUpdateLedgerIds(name, getManagedLedgerInfo(), ledgersStat, storeLedgersCb); }); @@ -1741,9 +1759,23 @@ public synchronized void createComplete(int rc, final LedgerHandle lh, Object ct // Empty the list of pending requests and make all of them fail clearPendingAddEntries(status); lastLedgerCreationFailureTimestamp = clock.millis(); + // Clear the stored firstEntryIndex on failure + pendingLedgerFirstEntryIndex = null; } else { log.info("[{}] Created new ledger {}", name, lh.getId()); LedgerInfo newLedger = LedgerInfo.newBuilder().setLedgerId(lh.getId()).setTimestamp(0).build(); + // Use the stored firstEntryIndex instead of recalculating + if (pendingLedgerFirstEntryIndex != null) { + Map newPropertiesMap = new HashMap<>(); + newPropertiesMap.put(FIRST_ENTRY_INDEX_PROPS, Long.toString(pendingLedgerFirstEntryIndex)); + newLedger = updateLedgerProps(newLedger, newPropertiesMap); + log.info("[{}] CreateComplete Creating ledger with stored first entry index: {}", + name, pendingLedgerFirstEntryIndex); + // Clear the stored value after use + pendingLedgerFirstEntryIndex = null; + } + final LedgerInfo createdNewLedger = newLedger; + final MetaStoreCallback cb = new MetaStoreCallback() { @Override public void operationComplete(Void v, Stat stat) { @@ -1753,7 +1785,7 @@ public void operationComplete(Void v, Stat stat) { ledgersStat = stat; synchronized (ManagedLedgerImpl.this) { LedgerHandle originalCurrentLedger = currentLedger; - ledgers.put(lh.getId(), newLedger); + ledgers.put(lh.getId(), createdNewLedger); currentLedger = lh; currentLedgerTimeoutTriggered = new AtomicBoolean(); currentLedgerEntries = 0; @@ -1805,7 +1837,7 @@ public void operationFailed(MetaStoreException e) { } }; - updateLedgersListAfterRollover(cb, newLedger); + updateLedgersListAfterRollover(cb, createdNewLedger); } } @@ -1949,9 +1981,16 @@ synchronized void ledgerClosed(final LedgerHandle lh, Long lastAddConfirmed) { log.debug("[{}] Ledger has been closed id={} entries={}", name, lh.getId(), entriesInLedger); } if (entriesInLedger > 0) { - LedgerInfo info = LedgerInfo.newBuilder().setLedgerId(lh.getId()).setEntries(entriesInLedger) + LedgerInfo oldInfo = ledgers.get(lh.getId()); + LedgerInfo newLedgerInfo = LedgerInfo.newBuilder().setLedgerId(lh.getId()).setEntries(entriesInLedger) .setSize(lh.getLength()).setTimestamp(clock.millis()).build(); - ledgers.put(lh.getId(), info); + if (oldInfo != null) { + List oldProperties = oldInfo.getPropertiesList(); + Map newPropertiesMap = new HashMap<>(); + oldProperties.forEach(kv -> newPropertiesMap.put(kv.getKey(), kv.getValue())); + newLedgerInfo = updateLedgerProps(newLedgerInfo, newPropertiesMap); + } + ledgers.put(lh.getId(), newLedgerInfo); } else { // The last ledger was empty, so we can discard it ledgers.remove(lh.getId()); @@ -2022,6 +2061,148 @@ public void closeComplete(int rc, LedgerHandle lh, Object o) { } } + /** + * Optimized version of asyncFindPosition for offset-based searches. + * Uses getFirstEntryIndexFromLedgerMetadataAsync to skip ledgers where the target offset + * is smaller than the ledger's first entry index. + * + * @param offset the target offset to find + * @param predicate the predicate to test entries + * @return CompletableFuture the position where the predicate matches + */ + public CompletableFuture asyncFindPosition(long offset, Predicate predicate) { + CompletableFuture future = new CompletableFuture<>(); + + if (ledgers.isEmpty()) { + future.complete(null); + return future; + } + + // Find the appropriate starting ledger by checking first entry indices + findStartingLedgerForOffset(offset) + .thenCompose(startLedgerId -> { + if (startLedgerId == null) { + // No suitable ledger found, return first position + Long firstLedgerId = ledgers.firstKey(); + Position startPosition = firstLedgerId == null ? null : PositionFactory.create(firstLedgerId, 0); + return CompletableFuture.completedFuture(startPosition); + } + + // Use the optimized starting position + Position startPosition = PositionFactory.create(startLedgerId, 0); + return asyncFindPosition(startPosition, predicate); + }) + .whenComplete((position, throwable) -> { + if (throwable != null) { + future.completeExceptionally(throwable); + } else { + future.complete(position); + } + }); + + return future; + } + + /** + * Find the first ledger where we should start searching based on the target offset. + */ + private CompletableFuture findStartingLedgerForOffset(long targetOffset) { + CompletableFuture result = new CompletableFuture<>(); + + if (ledgers.isEmpty()) { + result.complete(null); + return result; + } + + // Check ledgers in order to find the first one where firstEntryIndex <= targetOffset + checkLedgersRecursively(ledgers.navigableKeySet().iterator(), targetOffset, result); + + return result; + } + + private void checkLedgersRecursively(java.util.Iterator ledgerIterator, + long targetOffset, + CompletableFuture result) { + if (!ledgerIterator.hasNext()) { + // No suitable ledger found, use the last ledger + result.complete(ledgers.isEmpty() ? null : ledgers.lastKey()); + return; + } + + Long ledgerId = ledgerIterator.next(); + // Check if this ledger has firstEntryIndex property + asyncGetLedgerProperty(ledgerId, FIRST_ENTRY_INDEX_PROPS) + .whenComplete((firstEntryIndexStr, throwable) -> { + if (throwable != null) { + log.warn("[{}] Failed to get first entry index for ledger {}, " + + "using this ledger as starting point", + name, ledgerId, throwable); + result.complete(ledgerId); + return; + } + + // If firstEntryIndex property is missing, we cannot optimize and should use this ledger + if (firstEntryIndexStr == null || firstEntryIndexStr.isEmpty()) { + log.debug("[{}] First entry index missing for ledger {}, using this ledger as starting point", + name, ledgerId); + result.complete(ledgerId); + return; + } + + long firstEntryIndex; + try { + firstEntryIndex = Long.parseLong(firstEntryIndexStr); + } catch (NumberFormatException e) { + log.warn("[{}] Invalid first entry index value '{}' for ledger {}, " + + "using this ledger as starting point", + name, firstEntryIndexStr, ledgerId, e); + result.complete(ledgerId); + return; + } + + if (firstEntryIndex <= targetOffset) { + // Found a suitable ledger, but need to check if the next ledger might be better + if (ledgerIterator.hasNext()) { + Long nextLedgerId = ledgers.higherKey(ledgerId); + if (nextLedgerId != null) { + asyncGetLedgerProperty(nextLedgerId, FIRST_ENTRY_INDEX_PROPS) + .whenComplete((nextFirstIndexStr, nextThrowable) -> { + if (nextThrowable != null + || nextFirstIndexStr == null + || nextFirstIndexStr.isEmpty()) { + // Next ledger has no firstEntryIndex or error, use current ledger + result.complete(ledgerId); + return; + } + + try { + long nextFirstIndex = Long.parseLong(nextFirstIndexStr); + if (nextFirstIndex > targetOffset) { + // Next ledger is not suitable, use current ledger + result.complete(ledgerId); + } else { + // Continue checking next ledgers + checkLedgersRecursively(ledgerIterator, targetOffset, result); + } + } catch (NumberFormatException e) { + // Next ledger has invalid firstEntryIndex, use current ledger + result.complete(ledgerId); + } + }); + } else { + result.complete(ledgerId); + } + } else { + result.complete(ledgerId); + } + } else { + // This ledger's first entry is greater than target, use previous ledger if exists + Long prevLedgerId = ledgers.lowerKey(ledgerId); + result.complete(prevLedgerId != null ? prevLedgerId : ledgerId); + } + }); + } + @Override public CompletableFuture asyncFindPosition(Predicate predicate) { @@ -2032,6 +2213,12 @@ public CompletableFuture asyncFindPosition(Predicate predicate) future.complete(null); return future; } + return asyncFindPosition(startPosition, predicate); + + } + + private CompletableFuture asyncFindPosition(Position startPosition, Predicate predicate) { + CompletableFuture future = new CompletableFuture<>(); AsyncCallbacks.FindEntryCallback findEntryCallback = new AsyncCallbacks.FindEntryCallback() { @Override public void findEntryComplete(Position position, Object ctx) { @@ -4561,6 +4748,11 @@ protected void asyncCreateLedger(BookKeeper bookKeeper, ManagedLedgerConfig conf return; } } + if (managedLedgerInterceptor != null) { + long firstEntryIndex = managedLedgerInterceptor.getIndex() + 1; + // Store the firstEntryIndex for use in createComplete callback + this.pendingLedgerFirstEntryIndex = firstEntryIndex; + } createdLedgerCustomMetadata = finalMetadata; try { bookKeeper.asyncCreateLedger(config.getEnsembleSize(), config.getWriteQuorumSize(), @@ -5145,4 +5337,16 @@ boolean shouldCacheAddedEntry() { // Avoid caching entries if no cursor has been created return getActiveCursors().shouldCacheAddedEntry(); } + + private LedgerInfo updateLedgerProps(LedgerInfo ledgerInfo, + Map propertiesMap) { + List oldProperties = ledgerInfo.getPropertiesList(); + Map newPropertiesMap = new HashMap<>(); + oldProperties.forEach(kv -> newPropertiesMap.put(kv.getKey(), kv.getValue())); + newPropertiesMap.putAll(propertiesMap); + List newProperties = newPropertiesMap.entrySet().stream() + .map(e -> MLDataFormats.KeyValue.newBuilder() + .setKey(e.getKey()).setValue(e.getValue()).build()).toList(); + return ledgerInfo.toBuilder().clearProperties().addAllProperties(newProperties).build(); + } } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/intercept/ManagedLedgerInterceptor.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/intercept/ManagedLedgerInterceptor.java index 0ca6fa9dd866c..a7b307964a8ed 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/intercept/ManagedLedgerInterceptor.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/intercept/ManagedLedgerInterceptor.java @@ -75,6 +75,7 @@ default void afterFailedAddEntry(int numberOfMessages){ */ void onManagedLedgerPropertiesInitialize(Map propertiesMap); + long getIndex(); /** * A handle for reading the last ledger entry. */ diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImpl.java index a86b49d627ff8..46170f48eb3bf 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImpl.java @@ -71,6 +71,7 @@ public ManagedLedgerInterceptorImpl(Set brokerEn } } + @Override public long getIndex() { long index = -1; diff --git a/pulsar-broker/src/test/java/org/apache/bookkeeper/mledger/impl/CustomizedManagedLedgerStorageForTest.java b/pulsar-broker/src/test/java/org/apache/bookkeeper/mledger/impl/CustomizedManagedLedgerStorageForTest.java index 56ff0ba65924d..e3173658eb710 100644 --- a/pulsar-broker/src/test/java/org/apache/bookkeeper/mledger/impl/CustomizedManagedLedgerStorageForTest.java +++ b/pulsar-broker/src/test/java/org/apache/bookkeeper/mledger/impl/CustomizedManagedLedgerStorageForTest.java @@ -610,6 +610,11 @@ public CompletableFuture asyncFindPosition(Predicate predicate) return delegate.asyncFindPosition(predicate); } + @Override + public CompletableFuture asyncFindPosition(long offset, Predicate predicate) { + return delegate.asyncFindPosition(offset, predicate); + } + @Override public ManagedLedgerInterceptor getManagedLedgerInterceptor() { return delegate.getManagedLedgerInterceptor(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImplTest.java index f2e956ac0067a..8f7b302caf42f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImplTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/ManagedLedgerInterceptorImplTest.java @@ -21,6 +21,7 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotEquals; import static org.testng.Assert.assertNotNull; +import static org.testng.AssertJUnit.assertNull; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import java.nio.charset.StandardCharsets; @@ -45,6 +46,7 @@ import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor; +import org.apache.bookkeeper.mledger.proto.MLDataFormats; import org.apache.bookkeeper.test.MockedBookKeeperTestCase; import org.apache.pulsar.common.api.proto.BrokerEntryMetadata; import org.apache.pulsar.common.intercept.BrokerEntryMetadataInterceptor; @@ -527,4 +529,306 @@ public void release(ByteBuf processedPayload) { ledger.close(); } + @Test + public void testFindPositionByOffset() throws Exception { + final int mockBatchSize = 2; + final int maxEntriesPerLedger = 5; + int maxSequenceIdPerLedger = mockBatchSize * maxEntriesPerLedger; + ManagedLedgerInterceptor interceptor = + new ManagedLedgerInterceptorImpl(getBrokerEntryMetadataInterceptors(), null); + + ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig(); + managedLedgerConfig.setManagedLedgerInterceptor(interceptor); + managedLedgerConfig.setMaxEntriesPerLedger(5); + + ManagedLedger ledger = factory.open("my_ml_broker_entry_offset_test_ledger", managedLedgerConfig); + ManagedCursor cursor = ledger.openCursor("c1"); + + // Add entries to first ledger + long firstLedgerId = -1; + for (int i = 0; i < maxEntriesPerLedger; i++) { + firstLedgerId = + ledger.addEntry("dummy-entry".getBytes(StandardCharsets.UTF_8), mockBatchSize).getLedgerId(); + } + + assertEquals(ledger.getManagedLedgerInterceptor().getIndex(), 9); + + MLDataFormats.ManagedLedgerInfo.LedgerInfo firstLedgerInfo = + ledger.getLedgersInfo().get(firstLedgerId); + Assert.assertEquals(firstLedgerInfo.getPropertiesCount(), 1); + Assert.assertEquals(Long.parseLong(ledger.asyncGetLedgerProperty(firstLedgerId, + "firstEntryIndex").get()), 0); + + // Test finding position by offset using the new method + Position position = null; + for (long offset = 0; offset <= ledger.getManagedLedgerInterceptor().getIndex(); offset++) { + position = ledger.asyncFindPosition(offset, new IndexSearchPredicate(offset)).get(); + assertEquals(position.getEntryId(), (offset % maxSequenceIdPerLedger) / mockBatchSize); + } + + // Roll over to second ledger + long secondLedgerId = -1; + for (int i = 0; i < maxEntriesPerLedger; i++) { + secondLedgerId = + ledger.addEntry("dummy-entry".getBytes(StandardCharsets.UTF_8), mockBatchSize).getLedgerId(); + } + + MLDataFormats.ManagedLedgerInfo.LedgerInfo secondLedgerInfo = + ledger.getLedgersInfo().get(secondLedgerId); + Assert.assertEquals(secondLedgerInfo.getPropertiesCount(), 1); + Assert.assertEquals(Long.parseLong(ledger.asyncGetLedgerProperty(secondLedgerId, + "firstEntryIndex").get()), 10); + + assertEquals(ledger.getManagedLedgerInterceptor().getIndex(), 19); + assertNotEquals(firstLedgerId, secondLedgerId); + + // Test with both ledgers + for (long offset = 0; offset <= ledger.getManagedLedgerInterceptor().getIndex(); offset++) { + position = ledger.asyncFindPosition(offset, new IndexSearchPredicate(offset)).get(); + assertEquals(position.getEntryId(), (offset % maxSequenceIdPerLedger) / mockBatchSize); + } + + // Test edge cases + // Test offset 0 (should find first position) + position = ledger.asyncFindPosition(0, new IndexSearchPredicate(0)).get(); + assertEquals(position.getEntryId(), 0); + + // Test offset beyond current index (should handle gracefully) + long beyondIndex = ledger.getManagedLedgerInterceptor().getIndex() + 100; + position = ledger.asyncFindPosition(beyondIndex, new IndexSearchPredicate(beyondIndex)).get(); + assertNotNull(position); + + + // Reopen ledger and test again + ledger.close(); + @Cleanup("shutdown") + ManagedLedgerFactoryImpl factory2 = new ManagedLedgerFactoryImpl(metadataStore, bkc); + ledger = factory2.open("my_ml_broker_entry_offset_test_ledger", managedLedgerConfig); + + + // Add more entries to third ledger + long thirdLedgerId = -1; + for (int i = 0; i < maxEntriesPerLedger; i++) { + thirdLedgerId = + ledger.addEntry("dummy-entry".getBytes(StandardCharsets.UTF_8), mockBatchSize).getLedgerId(); + } + MLDataFormats.ManagedLedgerInfo.LedgerInfo thirdLedgerInfo = + ledger.getLedgersInfo().get(thirdLedgerId); + Assert.assertEquals(thirdLedgerInfo.getPropertiesCount(), 1); + Assert.assertEquals(Long.parseLong(ledger.asyncGetLedgerProperty(thirdLedgerId, + "firstEntryIndex").get()), 20); + assertEquals(ledger.getManagedLedgerInterceptor().getIndex(), 29); + assertNotEquals(secondLedgerId, thirdLedgerId); + + // Test with all three ledgers after reopen + for (long offset = 0; offset <= ledger.getManagedLedgerInterceptor().getIndex(); offset++) { + position = ledger.asyncFindPosition(offset, new IndexSearchPredicate(offset)).get(); + assertEquals(position.getEntryId(), (offset % maxSequenceIdPerLedger) / mockBatchSize); + } + + cursor.close(); + ledger.close(); + } + + @Test + public void testFindPositionByOffsetWithMissingFirstEntryIndex() throws Exception { + final int mockBatchSize = 2; + final int maxEntriesPerLedger = 5; + int maxSequenceIdPerLedger = mockBatchSize * maxEntriesPerLedger; + final String ledgerName = "my_ml_test_missing_firstEntryIndex"; + + ManagedLedgerInterceptor interceptor = + new ManagedLedgerInterceptorImpl(getBrokerEntryMetadataInterceptors(), null); + + ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig(); + managedLedgerConfig.setManagedLedgerInterceptor(interceptor); + managedLedgerConfig.setMaxEntriesPerLedger(5); + + ManagedLedger ledger = factory.open(ledgerName, managedLedgerConfig); + ManagedCursor cursor = ledger.openCursor("c1"); + + // Add entries to create three ledgers + long firstLedgerId = -1; + for (int i = 0; i < maxEntriesPerLedger; i++) { + firstLedgerId = ledger.addEntry("dummy-entry".getBytes(StandardCharsets.UTF_8), + mockBatchSize).getLedgerId(); + } + + long secondLedgerId = -1; + for (int i = 0; i < maxEntriesPerLedger; i++) { + secondLedgerId = ledger.addEntry("dummy-entry".getBytes(StandardCharsets.UTF_8), + mockBatchSize).getLedgerId(); + } + + long thirdLedgerId = -1; + for (int i = 0; i < maxEntriesPerLedger; i++) { + thirdLedgerId = ledger.addEntry("dummy-entry".getBytes(StandardCharsets.UTF_8), + mockBatchSize).getLedgerId(); + } + + assertEquals(((ManagedLedgerInterceptorImpl) ledger.getManagedLedgerInterceptor()).getIndex(), 29); + + // Verify initial state - all ledgers should have firstEntryIndex property + assertEquals(Long.parseLong(ledger.asyncGetLedgerProperty(firstLedgerId, "firstEntryIndex").get()), 0); + assertEquals(Long.parseLong(ledger.asyncGetLedgerProperty(secondLedgerId, "firstEntryIndex").get()), 10); + assertEquals(Long.parseLong(ledger.asyncGetLedgerProperty(thirdLedgerId, "firstEntryIndex").get()), 20); + + // Test normal case before removing properties + Position position = ledger.asyncFindPosition(15, new IndexSearchPredicate(15)).get(); + assertEquals(position.getEntryId(), (15 % maxSequenceIdPerLedger) / mockBatchSize); + ledger.asyncRemoveLedgerProperty(firstLedgerId, "firstEntryIndex").get(); + + ledger.close(); + + // Reopen and test + @Cleanup("shutdown") + ManagedLedgerFactoryImpl factory1 = new ManagedLedgerFactoryImpl(metadataStore, bkc); + ledger = factory1.open(ledgerName, managedLedgerConfig); + + // Verify property was removed + String firstLedgerProperty = ledger.asyncGetLedgerProperty(firstLedgerId, "firstEntryIndex").get(); + assertNull(firstLedgerProperty); + assertEquals(ledger.getManagedLedgerInterceptor().getIndex(), 29); + + + // Test that position finding still works correctly after removing first ledger's firstEntryIndex + for (long offset = 0; offset <= ledger.getManagedLedgerInterceptor().getIndex(); offset++) { + position = ledger.asyncFindPosition(offset, new IndexSearchPredicate(offset)).get(); + assertEquals(position.getEntryId(), (offset % maxSequenceIdPerLedger) / mockBatchSize, + "Failed at offset " + offset + " after removing first ledger's firstEntryIndex"); + } + ledger.asyncRemoveLedgerProperty(secondLedgerId, "firstEntryIndex").get(); + + ledger.close(); + + + // Reopen and test + @Cleanup("shutdown") + ManagedLedgerFactoryImpl factory2 = new ManagedLedgerFactoryImpl(metadataStore, bkc); + ledger = factory2.open(ledgerName, managedLedgerConfig); + + // Verify property was removed + String secondLedgerProperty = ledger.asyncGetLedgerProperty(secondLedgerId, "firstEntryIndex").get(); + assertNull(secondLedgerProperty); + assertEquals(ledger.getManagedLedgerInterceptor().getIndex(), 29); + + // Test that position finding still works correctly after removing second ledger's firstEntryIndex + for (long offset = 0; offset <= ledger.getManagedLedgerInterceptor().getIndex(); offset++) { + position = ledger.asyncFindPosition(offset, new IndexSearchPredicate(offset)).get(); + assertEquals(position.getEntryId(), (offset % maxSequenceIdPerLedger) / mockBatchSize, + "Failed at offset " + offset + " after removing second ledger's firstEntryIndex" + + " Excepted: " + (offset % maxSequenceIdPerLedger) / mockBatchSize + + ", but got: " + position); + } + ledger.asyncRemoveLedgerProperty(thirdLedgerId, "firstEntryIndex").get(); + + ledger.close(); + + // Reopen and test + @Cleanup("shutdown") + ManagedLedgerFactoryImpl factory3 = new ManagedLedgerFactoryImpl(metadataStore, bkc); + ledger = factory3.open(ledgerName, managedLedgerConfig); + + // Verify property was removed + String thirdLedgerProperty = ledger.asyncGetLedgerProperty(thirdLedgerId, "firstEntryIndex").get(); + assertNull(thirdLedgerProperty); + + // Verify all properties are now missing + firstLedgerProperty = ledger.asyncGetLedgerProperty(firstLedgerId, "firstEntryIndex").get(); + assertNull(firstLedgerProperty); + secondLedgerProperty = ledger.asyncGetLedgerProperty(secondLedgerId, "firstEntryIndex").get(); + assertNull(secondLedgerProperty); + + assertEquals(ledger.getManagedLedgerInterceptor().getIndex(), 29); + + // Test that position finding still works correctly even with all firstEntryIndex properties removed + for (long offset = 0; offset <= ledger.getManagedLedgerInterceptor().getIndex(); offset++) { + position = ledger.asyncFindPosition(offset, new IndexSearchPredicate(offset)).get(); + assertEquals(position.getEntryId(), (offset % maxSequenceIdPerLedger) / mockBatchSize, + "Failed at offset " + offset + " after removing all ledgers' firstEntryIndex"); + } + + // Test edge cases with all properties missing + // Test offset 0 (should still find first position) + position = ledger.asyncFindPosition(0, new IndexSearchPredicate(0)).get(); + assertEquals(position.getEntryId(), 0); + + // Test middle offset + position = ledger.asyncFindPosition(15, new IndexSearchPredicate(15)).get(); + assertEquals(position.getEntryId(), (15 % maxSequenceIdPerLedger) / mockBatchSize); + + // Test last offset + position = ledger.asyncFindPosition(29, new IndexSearchPredicate(29)).get(); + assertEquals(position.getEntryId(), (29 % maxSequenceIdPerLedger) / mockBatchSize); + + cursor.close(); + ledger.close(); + } + + @Test + public void testSetFirstEntryIndex() throws Exception { + final int mockBatchSize = 2; + final int maxEntriesPerLedger = 5; + int maxSequenceIdPerLedger = mockBatchSize * maxEntriesPerLedger; + ManagedLedgerInterceptor interceptor = + new ManagedLedgerInterceptorImpl(getBrokerEntryMetadataInterceptors(), null); + + + ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig(); + managedLedgerConfig.setManagedLedgerInterceptor(interceptor); + managedLedgerConfig.setMaxEntriesPerLedger(5); + + ManagedLedger ledger = factory.open("my_ml_broker_entry_metadata_test_ledger", managedLedgerConfig); + ManagedCursor cursor = ledger.openCursor("c1"); + + long firstLedgerId = -1; + for (int i = 0; i < maxEntriesPerLedger; i++) { + firstLedgerId = + ledger.addEntry("dummy-entry".getBytes(StandardCharsets.UTF_8), mockBatchSize).getLedgerId(); + } + + assertEquals(((ManagedLedgerInterceptorImpl) ledger.getManagedLedgerInterceptor()).getIndex(), 9); + MLDataFormats.ManagedLedgerInfo.LedgerInfo firstLedgerInfo = + ledger.getLedgersInfo().get(firstLedgerId); + Assert.assertEquals(firstLedgerInfo.getPropertiesCount(), 1); + Assert.assertEquals(Long.parseLong(ledger.asyncGetLedgerProperty(firstLedgerId, "firstEntryIndex").get()), 0); + + // roll over ledger + long secondLedgerId = -1; + for (int i = 0; i < maxEntriesPerLedger; i++) { + secondLedgerId = + ledger.addEntry("dummy-entry".getBytes(StandardCharsets.UTF_8), mockBatchSize).getLedgerId(); + } + assertEquals(ledger.getManagedLedgerInterceptor().getIndex(), 19); + assertNotEquals(firstLedgerId, secondLedgerId); + MLDataFormats.ManagedLedgerInfo.LedgerInfo secondLedgerInfo = + ledger.getLedgersInfo().get(secondLedgerId); + Assert.assertEquals(secondLedgerInfo.getPropertiesCount(), 1); + Assert.assertEquals(Long.parseLong(ledger.asyncGetLedgerProperty(secondLedgerId, "firstEntryIndex").get()), 10); + + // reopen ledger + ledger.close(); + // / Reopen the same managed-ledger + @Cleanup("shutdown") + ManagedLedgerFactoryImpl factory2 = new ManagedLedgerFactoryImpl(metadataStore, bkc); + ledger = factory2.open("my_ml_broker_entry_metadata_test_ledger", managedLedgerConfig); + Assert.assertEquals(Long.parseLong(ledger.asyncGetLedgerProperty(firstLedgerId, "firstEntryIndex").get()), 0); + Assert.assertEquals(Long.parseLong(ledger.asyncGetLedgerProperty(secondLedgerId, "firstEntryIndex").get()), 10); + + long thirdLedgerId = -1; + for (int i = 0; i < maxEntriesPerLedger; i++) { + thirdLedgerId = + ledger.addEntry("dummy-entry".getBytes(StandardCharsets.UTF_8), mockBatchSize).getLedgerId(); + } + assertEquals(ledger.getManagedLedgerInterceptor().getIndex(), 29); + assertNotEquals(secondLedgerId, thirdLedgerId); + MLDataFormats.ManagedLedgerInfo.LedgerInfo thirdLedgerInfo = + ledger.getLedgersInfo().get(thirdLedgerId); + Assert.assertEquals(thirdLedgerInfo.getPropertiesCount(), 1); + Assert.assertEquals(Long.parseLong(ledger.asyncGetLedgerProperty(thirdLedgerId, "firstEntryIndex").get()), 20); + + cursor.close(); + ledger.close(); + } + } diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionSequenceIdGenerator.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionSequenceIdGenerator.java index a6605046eeff6..3792d90b33dfd 100644 --- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionSequenceIdGenerator.java +++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionSequenceIdGenerator.java @@ -90,4 +90,9 @@ long generateSequenceId() { long getCurrentSequenceId() { return sequenceId.get(); } + + @Override + public long getIndex() { + return -1L; + } } diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/MockManagedLedger.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/MockManagedLedger.java index d4f07ed9f9018..e250d0dc084e1 100644 --- a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/MockManagedLedger.java +++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/MockManagedLedger.java @@ -352,6 +352,11 @@ public CompletableFuture asyncFindPosition(Predicate predicate) return CompletableFuture.completedFuture(null); } + @Override + public CompletableFuture asyncFindPosition(long offset, Predicate predicate) { + return CompletableFuture.completedFuture(null); + } + @Override public ManagedLedgerInterceptor getManagedLedgerInterceptor() { return null;