Skip to content

Commit b297f1f

Browse files
TakaHiR07fanjianye
andauthored
[fix][broker] fix getMaxReadPosition in TransactionBufferDisable should return latest (#24898)
Co-authored-by: fanjianye <fanjianye@bigo.sg>
1 parent fbc50b0 commit b297f1f

File tree

2 files changed

+9
-2
lines changed

2 files changed

+9
-2
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4140,6 +4140,12 @@ public CompletableFuture<Position> getLastDispatchablePosition() {
41404140
if (lastDispatchablePosition != null) {
41414141
return CompletableFuture.completedFuture(lastDispatchablePosition);
41424142
}
4143+
Position lastPosition;
4144+
if (transactionBuffer instanceof TransactionBufferDisable) {
4145+
lastPosition = getLastPosition();
4146+
} else {
4147+
lastPosition = getMaxReadPosition();
4148+
}
41434149
return ledger.getLastDispatchablePosition(entry -> {
41444150
MessageMetadata md = entry.getMessageMetadata();
41454151
if (md == null) {
@@ -4154,7 +4160,7 @@ public CompletableFuture<Position> getLastDispatchablePosition() {
41544160
return !isTxnAborted(txnID, entry.getPosition());
41554161
}
41564162
return true;
4157-
}, getMaxReadPosition()).thenApply(position -> {
4163+
}, lastPosition).thenApply(position -> {
41584164
// Update lastDispatchablePosition to the given position
41594165
updateLastDispatchablePosition(position);
41604166
return position;

pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.util.concurrent.CompletableFuture;
2424
import lombok.extern.slf4j.Slf4j;
2525
import org.apache.bookkeeper.mledger.Position;
26+
import org.apache.bookkeeper.mledger.PositionFactory;
2627
import org.apache.pulsar.broker.service.BrokerServiceException;
2728
import org.apache.pulsar.broker.service.Topic;
2829
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
@@ -108,7 +109,7 @@ public void syncMaxReadPositionForNormalPublish(Position position, boolean isMar
108109

109110
@Override
110111
public Position getMaxReadPosition() {
111-
return topic.getLastPosition();
112+
return PositionFactory.LATEST;
112113
}
113114

114115
@Override

0 commit comments

Comments
 (0)