Skip to content

Commit 2f86d55

Browse files
TakaHiR07fanjianye
authored andcommitted
[fix][broker] fix getMaxReadPosition in TransactionBufferDisable should return latest (apache#24898)
Co-authored-by: fanjianye <fanjianye@bigo.sg> (cherry picked from commit b297f1f)
1 parent d5db777 commit 2f86d55

File tree

2 files changed

+8
-2
lines changed

2 files changed

+8
-2
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3731,6 +3731,12 @@ public CompletableFuture<Position> getLastDispatchablePosition() {
37313731
if (lastDispatchablePosition != null) {
37323732
return CompletableFuture.completedFuture(lastDispatchablePosition);
37333733
}
3734+
PositionImpl lastPosition;
3735+
if (transactionBuffer instanceof TransactionBufferDisable) {
3736+
lastPosition = (PositionImpl) getLastPosition();
3737+
} else {
3738+
lastPosition = getMaxReadPosition();
3739+
}
37343740
return ManagedLedgerImplUtils
37353741
.asyncGetLastValidPosition((ManagedLedgerImpl) ledger, entry -> {
37363742
MessageMetadata md = Commands.parseMessageMetadata(entry.getDataBuffer());
@@ -3743,7 +3749,7 @@ public CompletableFuture<Position> getLastDispatchablePosition() {
37433749
return !isTxnAborted(txnID, (PositionImpl) entry.getPosition());
37443750
}
37453751
return true;
3746-
}, getMaxReadPosition())
3752+
}, lastPosition)
37473753
.thenApply(position -> {
37483754
// Update lastDispatchablePosition to the given position
37493755
updateLastDispatchablePosition(position);

pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/buffer/impl/TransactionBufferDisable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ public void syncMaxReadPositionForNormalPublish(PositionImpl position, boolean i
107107

108108
@Override
109109
public PositionImpl getMaxReadPosition() {
110-
return (PositionImpl) topic.getLastPosition();
110+
return PositionImpl.LATEST;
111111
}
112112

113113
@Override

0 commit comments

Comments
 (0)