Skip to content

Commit 900b31b

Browse files
TakaHiR07fanjianye
authored andcommitted
[fix][broker] fix getMaxReadPosition in TransactionBufferDisable should return latest (apache#24898)
Co-authored-by: fanjianye <fanjianye@bigo.sg> (cherry picked from commit b297f1f) (cherry picked from commit 0c9af0a)
1 parent 0448a00 commit 900b31b

File tree

2 files changed

+9
-2
lines changed

2 files changed

+9
-2
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4089,6 +4089,12 @@ public CompletableFuture<Position> getLastDispatchablePosition() {
40894089
if (lastDispatchablePosition != null) {
40904090
return CompletableFuture.completedFuture(lastDispatchablePosition);
40914091
}
4092+
Position lastPosition;
4093+
if (transactionBuffer instanceof TransactionBufferDisable) {
4094+
lastPosition = getLastPosition();
4095+
} else {
4096+
lastPosition = getMaxReadPosition();
4097+
}
40924098
return ledger.getLastDispatchablePosition(entry -> {
40934099
MessageMetadata md = Commands.parseMessageMetadata(entry.getDataBuffer());
40944100
// If a messages has marker will filter by AbstractBaseDispatcher.filterEntriesForConsumer
@@ -4100,7 +4106,7 @@ public CompletableFuture<Position> getLastDispatchablePosition() {
41004106
return !isTxnAborted(txnID, entry.getPosition());
41014107
}
41024108
return true;
4103-
}, getMaxReadPosition()).thenApply(position -> {
4109+
}, lastPosition).thenApply(position -> {
41044110
// Update lastDispatchablePosition to the given position
41054111
updateLastDispatchablePosition(position);
41064112
return position;

pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.util.concurrent.CompletableFuture;
2424
import lombok.extern.slf4j.Slf4j;
2525
import org.apache.bookkeeper.mledger.Position;
26+
import org.apache.bookkeeper.mledger.PositionFactory;
2627
import org.apache.pulsar.broker.service.BrokerServiceException;
2728
import org.apache.pulsar.broker.service.Topic;
2829
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
@@ -108,7 +109,7 @@ public void syncMaxReadPositionForNormalPublish(Position position, boolean isMar
108109

109110
@Override
110111
public Position getMaxReadPosition() {
111-
return topic.getLastPosition();
112+
return PositionFactory.LATEST;
112113
}
113114

114115
@Override

0 commit comments

Comments
 (0)