Skip to content

Commit 90431bd

Browse files
committed
[fix][broker] Support large number of unack message store for cursor recovery
Fix test
1 parent 95bd1d1 commit 90431bd

9 files changed

Lines changed: 248 additions & 14 deletions

File tree

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -505,8 +505,10 @@ public int getMaxUnackedRangesToPersistInMetadataStore() {
505505
return maxUnackedRangesToPersistInMetadataStore;
506506
}
507507

508-
public void setMaxUnackedRangesToPersistInMetadataStore(int maxUnackedRangesToPersistInMetadataStore) {
508+
public ManagedLedgerConfig setMaxUnackedRangesToPersistInMetadataStore(
509+
int maxUnackedRangesToPersistInMetadataStore) {
509510
this.maxUnackedRangesToPersistInMetadataStore = maxUnackedRangesToPersistInMetadataStore;
511+
return this;
510512
}
511513

512514
/**

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java

Lines changed: 58 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@
5959
import java.util.concurrent.locks.ReentrantReadWriteLock;
6060
import java.util.function.Function;
6161
import java.util.function.Predicate;
62+
import java.util.stream.Collectors;
6263
import java.util.stream.LongStream;
6364
import org.apache.bookkeeper.client.AsyncCallback.CloseCallback;
6465
import org.apache.bookkeeper.client.AsyncCallback.OpenCallback;
@@ -91,12 +92,15 @@
9192
import org.apache.bookkeeper.mledger.ScanOutcome;
9293
import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback;
9394
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
95+
import org.apache.bookkeeper.mledger.proto.MLDataFormats.LongListMap;
9496
import org.apache.bookkeeper.mledger.proto.MLDataFormats.LongProperty;
9597
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedCursorInfo;
9698
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
9799
import org.apache.bookkeeper.mledger.proto.MLDataFormats.MessageRange;
98100
import org.apache.bookkeeper.mledger.proto.MLDataFormats.PositionInfo;
101+
import org.apache.bookkeeper.mledger.proto.MLDataFormats.PositionInfo.Builder;
99102
import org.apache.bookkeeper.mledger.proto.MLDataFormats.StringProperty;
103+
import org.apache.commons.lang3.mutable.MutableInt;
100104
import org.apache.commons.lang3.tuple.Pair;
101105
import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats;
102106
import org.apache.pulsar.common.util.DateFormatter;
@@ -606,9 +610,7 @@ protected void recoverFromLedger(final ManagedCursorInfo info, final VoidCallbac
606610
}
607611

608612
Position position = PositionFactory.create(positionInfo.getLedgerId(), positionInfo.getEntryId());
609-
if (positionInfo.getIndividualDeletedMessagesCount() > 0) {
610-
recoverIndividualDeletedMessages(positionInfo.getIndividualDeletedMessagesList());
611-
}
613+
recoverIndividualDeletedMessages(positionInfo);
612614
if (getConfig().isDeletionAtBatchIndexLevelEnabled()
613615
&& positionInfo.getBatchedEntryDeletionIndexInfoCount() > 0) {
614616
recoverBatchDeletedIndexes(positionInfo.getBatchedEntryDeletionIndexInfoList());
@@ -627,6 +629,45 @@ protected void recoverFromLedger(final ManagedCursorInfo info, final VoidCallbac
627629
}
628630
}
629631

632+
public void recoverIndividualDeletedMessages(PositionInfo positionInfo) {
633+
if (positionInfo.getIndividualDeletedMessagesCount() > 0) {
634+
recoverIndividualDeletedMessages(positionInfo.getIndividualDeletedMessagesList());
635+
} else if (positionInfo.getIndividualDeletedMessageRangesCount() > 0) {
636+
List<LongListMap> rangeList = positionInfo.getIndividualDeletedMessageRangesList();
637+
try {
638+
Map<Long, long[]> rangeMap = rangeList.stream().collect(Collectors.toMap(LongListMap::getKey,
639+
list -> list.getValuesList().stream().mapToLong(i -> i).toArray()));
640+
individualDeletedMessages.build(rangeMap);
641+
} catch (Exception e) {
642+
log.warn("[{}]-{} Failed to recover individualDeletedMessages from serialized data", ledger.getName(),
643+
name, e);
644+
}
645+
}
646+
}
647+
648+
private List<LongListMap> buildLongPropertiesMap(Map<Long, long[]> properties) {
649+
if (properties.isEmpty()) {
650+
return Collections.emptyList();
651+
}
652+
List<LongListMap> longListMap = new ArrayList<>();
653+
MutableInt serializedSize = new MutableInt();
654+
properties.forEach((id, ranges) -> {
655+
if (ranges == null || ranges.length <= 0) {
656+
return;
657+
}
658+
org.apache.bookkeeper.mledger.proto.MLDataFormats.LongListMap.Builder lmBuilder = LongListMap.newBuilder()
659+
.setKey(id);
660+
for (long range : ranges) {
661+
lmBuilder.addValues(range);
662+
}
663+
LongListMap lm = lmBuilder.build();
664+
longListMap.add(lm);
665+
serializedSize.add(lm.getSerializedSize());
666+
});
667+
individualDeletedMessagesSerializedSize = serializedSize.toInteger();
668+
return longListMap;
669+
}
670+
630671
private void recoverIndividualDeletedMessages(List<MLDataFormats.MessageRange> individualDeletedMessagesList) {
631672
lock.writeLock().lock();
632673
try {
@@ -3125,12 +3166,23 @@ private List<MLDataFormats.BatchedEntryDeletionIndexInfo> buildBatchEntryDeletio
31253166

31263167
void persistPositionToLedger(final LedgerHandle lh, MarkDeleteEntry mdEntry, final VoidCallback callback) {
31273168
Position position = mdEntry.newPosition;
3128-
PositionInfo pi = PositionInfo.newBuilder().setLedgerId(position.getLedgerId())
3169+
Builder piBuilder = PositionInfo.newBuilder().setLedgerId(position.getLedgerId())
31293170
.setEntryId(position.getEntryId())
3130-
.addAllIndividualDeletedMessages(buildIndividualDeletedMessageRanges())
31313171
.addAllBatchedEntryDeletionIndexInfo(buildBatchEntryDeletionIndexInfoList())
3132-
.addAllProperties(buildPropertiesMap(mdEntry.properties)).build();
3172+
.addAllProperties(buildPropertiesMap(mdEntry.properties));
31333173

3174+
Map<Long, long[]> internalRanges = null;
3175+
try {
3176+
internalRanges = individualDeletedMessages.toRanges(getConfig().getMaxUnackedRangesToPersist());
3177+
} catch (Exception e) {
3178+
log.warn("[{}]-{} Failed to serialize individualDeletedMessages", ledger.getName(), name, e);
3179+
}
3180+
if (internalRanges != null && !internalRanges.isEmpty()) {
3181+
piBuilder.addAllIndividualDeletedMessageRanges(buildLongPropertiesMap(internalRanges));
3182+
} else {
3183+
piBuilder.addAllIndividualDeletedMessages(buildIndividualDeletedMessageRanges());
3184+
}
3185+
PositionInfo pi = piBuilder.build();
31343186

31353187
if (log.isDebugEnabled()) {
31363188
log.debug("[{}] Cursor {} Appending to ledger={} position={}", ledger.getName(), name, lh.getId(),

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/RangeSetWrapper.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import java.util.ArrayList;
2525
import java.util.Collection;
2626
import java.util.List;
27+
import java.util.Map;
2728
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
2829
import org.apache.pulsar.common.util.collections.LongPairRangeSet;
2930
import org.apache.pulsar.common.util.collections.OpenLongPairRangeSet;
@@ -142,6 +143,16 @@ public Range<T> lastRange() {
142143
return rangeSet.lastRange();
143144
}
144145

146+
@Override
147+
public Map<Long, long[]> toRanges(int maxRanges) {
148+
return rangeSet.toRanges(maxRanges);
149+
}
150+
151+
@Override
152+
public void build(Map<Long, long[]> internalRange) {
153+
rangeSet.build(internalRange);
154+
}
155+
145156
@Override
146157
public int cardinality(long lowerKey, long lowerValue, long upperKey, long upperValue) {
147158
return rangeSet.cardinality(lowerKey, lowerValue, upperKey, upperValue);
@@ -176,4 +187,22 @@ public boolean isDirtyLedgers(long ledgerId) {
176187
public String toString() {
177188
return rangeSet.toString();
178189
}
190+
191+
@Override
192+
public int hashCode() {
193+
return rangeSet.hashCode();
194+
}
195+
196+
@Override
197+
public boolean equals(Object obj) {
198+
if (!(obj instanceof RangeSetWrapper)) {
199+
return false;
200+
}
201+
if (this == obj) {
202+
return true;
203+
}
204+
@SuppressWarnings("rawtypes")
205+
RangeSetWrapper set = (RangeSetWrapper) obj;
206+
return this.rangeSet.equals(set.rangeSet);
207+
}
179208
}

managed-ledger/src/main/proto/MLDataFormats.proto

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,13 +82,19 @@ message PositionInfo {
8282

8383
// Store which index in the batch message has been deleted
8484
repeated BatchedEntryDeletionIndexInfo batchedEntryDeletionIndexInfo = 5;
85+
repeated LongListMap individualDeletedMessageRanges = 6;
8586
}
8687

8788
message NestedPositionInfo {
8889
required int64 ledgerId = 1;
8990
required int64 entryId = 2;
9091
}
9192

93+
message LongListMap {
94+
required int64 key = 1;
95+
repeated int64 values = 2;
96+
}
97+
9298
message MessageRange {
9399
required NestedPositionInfo lowerEndpoint = 1;
94100
required NestedPositionInfo upperEndpoint = 2;

managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3223,7 +3223,7 @@ public void testOutOfOrderDeletePersistenceIntoLedgerWithClose() throws Exceptio
32233223
managedLedgerConfig.setMaxUnackedRangesToPersistInMetadataStore(10);
32243224
ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open(ledgerName, managedLedgerConfig);
32253225

3226-
ManagedCursorImpl c1 = (ManagedCursorImpl) ledger.openCursor(cursorName);
3226+
final ManagedCursorImpl c1 = (ManagedCursorImpl) ledger.openCursor(cursorName);
32273227

32283228
List<Position> addedPositions = new ArrayList<>();
32293229
for (int i = 0; i < totalAddEntries; i++) {
@@ -3269,7 +3269,8 @@ public void operationFailed(MetaStoreException e) {
32693269
LedgerEntry entry = seq.nextElement();
32703270
PositionInfo positionInfo;
32713271
positionInfo = PositionInfo.parseFrom(entry.getEntry());
3272-
individualDeletedMessagesCount.set(positionInfo.getIndividualDeletedMessagesCount());
3272+
c1.recoverIndividualDeletedMessages(positionInfo);
3273+
individualDeletedMessagesCount.set(c1.getIndividuallyDeletedMessagesSet().asRanges().size());
32733274
} catch (Exception e) {
32743275
}
32753276
latch.countDown();
@@ -3286,12 +3287,12 @@ public void operationFailed(MetaStoreException e) {
32863287
@Cleanup("shutdown")
32873288
ManagedLedgerFactory factory2 = new ManagedLedgerFactoryImpl(metadataStore, bkc);
32883289
ledger = (ManagedLedgerImpl) factory2.open(ledgerName, managedLedgerConfig);
3289-
c1 = (ManagedCursorImpl) ledger.openCursor("c1");
3290+
ManagedCursorImpl reopenCursor = (ManagedCursorImpl) ledger.openCursor("c1");
32903291
// verify cursor has been recovered
3291-
assertEquals(c1.getNumberOfEntriesInBacklog(false), totalAddEntries / 2);
3292+
assertEquals(reopenCursor.getNumberOfEntriesInBacklog(false), totalAddEntries / 2);
32923293

32933294
// try to read entries which should only read non-deleted positions
3294-
List<Entry> entries = c1.readEntries(totalAddEntries);
3295+
List<Entry> entries = reopenCursor.readEntries(totalAddEntries);
32953296
assertEquals(entries.size(), totalAddEntries / 2);
32963297
}
32973298

managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerBkTest.java

Lines changed: 45 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import static org.testng.Assert.assertNotNull;
2424
import static org.testng.Assert.assertTrue;
2525
import static org.testng.Assert.fail;
26-
import io.netty.buffer.ByteBuf;
2726

2827
import java.nio.charset.StandardCharsets;
2928
import java.util.ArrayList;
@@ -34,7 +33,7 @@
3433
import java.util.concurrent.TimeUnit;
3534
import java.util.concurrent.atomic.AtomicBoolean;
3635
import java.util.concurrent.atomic.AtomicReference;
37-
import lombok.Cleanup;
36+
3837
import org.apache.bookkeeper.client.BookKeeper;
3938
import org.apache.bookkeeper.client.BookKeeperTestClient;
4039
import org.apache.bookkeeper.client.api.DigestType;
@@ -53,9 +52,13 @@
5352
import org.apache.bookkeeper.mledger.util.ThrowableToStringUtil;
5453
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
5554
import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats;
55+
import org.apache.pulsar.common.util.collections.LongPairRangeSet;
5656
import org.awaitility.Awaitility;
5757
import org.testng.annotations.Test;
5858

59+
import io.netty.buffer.ByteBuf;
60+
import lombok.Cleanup;
61+
5962
public class ManagedLedgerBkTest extends BookKeeperClusterTestCase {
6063

6164
public ManagedLedgerBkTest() {
@@ -587,4 +590,44 @@ public void testPeriodicRollover() throws Exception {
587590
Awaitility.await().until(() -> cursorImpl.getCursorLedger() != currentLedgerId);
588591
}
589592

593+
/**
594+
* This test validates that cursor serializes and deserializes individual-ack list from the bk-ledger.
595+
*
596+
* @throws Exception
597+
*/
598+
@Test
599+
public void testUnackmessagesAndRecovery() throws Exception {
600+
ManagedLedgerFactoryConfig factoryConf = new ManagedLedgerFactoryConfig();
601+
factoryConf.setMaxCacheSize(0);
602+
603+
ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(metadataStore, bkc, factoryConf);
604+
605+
ManagedLedgerConfig config = new ManagedLedgerConfig().setEnsembleSize(1).setWriteQuorumSize(1)
606+
.setAckQuorumSize(1).setMetadataEnsembleSize(1).setMetadataWriteQuorumSize(1)
607+
.setMaxUnackedRangesToPersistInMetadataStore(1).setMaxEntriesPerLedger(5).setMetadataAckQuorumSize(1);
608+
ManagedLedger ledger = factory.open("my_test_unack_messages", config);
609+
ManagedCursorImpl cursor = (ManagedCursorImpl) ledger.openCursor("c1");
610+
611+
int totalEntries = 100;
612+
for (int i = 0; i < totalEntries; i++) {
613+
Position p = ledger.addEntry("entry".getBytes());
614+
if (i % 2 == 0) {
615+
cursor.delete(p);
616+
}
617+
}
618+
619+
LongPairRangeSet<Position> unackMessagesBefore = cursor.getIndividuallyDeletedMessagesSet();
620+
621+
ledger.close();
622+
623+
// open and recover cursor
624+
ledger = factory.open("my_test_unack_messages", config);
625+
cursor = (ManagedCursorImpl) ledger.openCursor("c1");
626+
627+
LongPairRangeSet<Position> unackMessagesAfter = cursor.getIndividuallyDeletedMessagesSet();
628+
assertTrue(unackMessagesBefore.equals(unackMessagesAfter));
629+
630+
ledger.close();
631+
factory.shutdown();
632+
}
590633
}

pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSet.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,16 +18,21 @@
1818
*/
1919
package org.apache.pulsar.common.util.collections;
2020

21+
import static java.util.BitSet.valueOf;
2122
import static java.util.Objects.requireNonNull;
2223
import com.google.common.collect.BoundType;
2324
import com.google.common.collect.Range;
2425
import java.util.ArrayList;
2526
import java.util.BitSet;
27+
import java.util.HashMap;
2628
import java.util.List;
29+
import java.util.Map;
2730
import java.util.Map.Entry;
2831
import java.util.NavigableMap;
32+
import java.util.Objects;
2933
import java.util.concurrent.ConcurrentSkipListMap;
3034
import java.util.concurrent.atomic.AtomicBoolean;
35+
import java.util.concurrent.atomic.AtomicInteger;
3136
import org.apache.commons.lang.mutable.MutableInt;
3237

3338
/**
@@ -253,6 +258,42 @@ public Range<T> lastRange() {
253258
return Range.openClosed(consumer.apply(lastSet.getKey(), lower), consumer.apply(lastSet.getKey(), upper));
254259
}
255260

261+
@Override
262+
public Map<Long, long[]> toRanges(int maxRanges) {
263+
Map<Long, long[]> internalBitSetMap = new HashMap<>();
264+
AtomicInteger rangeCount = new AtomicInteger();
265+
rangeBitSetMap.forEach((id, bmap) -> {
266+
if (rangeCount.getAndAdd(bmap.cardinality()) > maxRanges) {
267+
return;
268+
}
269+
internalBitSetMap.put(id, bmap.toLongArray());
270+
});
271+
return internalBitSetMap;
272+
}
273+
274+
@Override
275+
public void build(Map<Long, long[]> internalRange) {
276+
internalRange.forEach((id, ranges) -> rangeBitSetMap.put(id, valueOf(ranges)));
277+
}
278+
279+
@Override
280+
public int hashCode() {
281+
return Objects.hashCode(rangeBitSetMap);
282+
}
283+
284+
@Override
285+
public boolean equals(Object obj) {
286+
if (!(obj instanceof ConcurrentOpenLongPairRangeSet)) {
287+
return false;
288+
}
289+
if (this == obj) {
290+
return true;
291+
}
292+
@SuppressWarnings("rawtypes")
293+
ConcurrentOpenLongPairRangeSet set = (ConcurrentOpenLongPairRangeSet) obj;
294+
return this.rangeBitSetMap.equals(set.rangeBitSetMap);
295+
}
296+
256297
@Override
257298
public int cardinality(long lowerKey, long lowerValue, long upperKey, long upperValue) {
258299
NavigableMap<Long, BitSet> subMap = rangeBitSetMap.subMap(lowerKey, true, upperKey, true);

pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/LongPairRangeSet.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.util.Collection;
2626
import java.util.Iterator;
2727
import java.util.List;
28+
import java.util.Map;
2829
import java.util.NoSuchElementException;
2930
import java.util.Set;
3031
import lombok.EqualsAndHashCode;
@@ -136,6 +137,19 @@ public interface LongPairRangeSet<T extends Comparable<T>> {
136137
*/
137138
Range<T> lastRange();
138139

140+
default Map<Long, long[]> toRanges(int maxRanges) {
141+
throw new UnsupportedOperationException();
142+
}
143+
144+
/**
145+
* Build {@link LongPairRangeSet} using internal ranges returned by {@link #toRanges(int)} .
146+
*
147+
* @param ranges
148+
*/
149+
default void build(Map<Long, long[]> ranges) {
150+
throw new UnsupportedOperationException();
151+
}
152+
139153
/**
140154
* Return the number bit sets to true from lower (inclusive) to upper (inclusive).
141155
*/

0 commit comments

Comments
 (0)