From 5b8ecebacac8cebe3be1505524cd52cc1f12da66 Mon Sep 17 00:00:00 2001 From: Oneby Wang <891734032@qq.com> Date: Thu, 18 Dec 2025 12:48:39 +0800 Subject: [PATCH 1/9] [improve][admin] Add markerMessages field in analyzeSubscriptionBacklog rest api --- .../apache/pulsar/broker/admin/impl/PersistentTopicsBase.java | 1 + .../apache/pulsar/broker/service/AnalyzeBacklogResult.java | 1 + .../broker/service/persistent/PersistentSubscription.java | 4 ++++ .../pulsar/common/stats/AnalyzeSubscriptionBacklogResult.java | 1 + 4 files changed, 7 insertions(+) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 85387ccc26731..be40132cfbfa7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -1693,6 +1693,7 @@ private void internalAnalyzeSubscriptionBacklogForNonPartitionedTopic(AsyncRespo result.setEntries(rawResult.getEntries()); result.setMessages(rawResult.getMessages()); + result.setMarkerMessages(rawResult.getMarkerMessages()); result.setFilterAcceptedEntries(rawResult.getFilterAcceptedEntries()); result.setFilterRejectedEntries(rawResult.getFilterRejectedEntries()); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AnalyzeBacklogResult.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AnalyzeBacklogResult.java index e227acf4e8f62..b9c279f97cec7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AnalyzeBacklogResult.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AnalyzeBacklogResult.java @@ -29,6 +29,7 @@ public final class AnalyzeBacklogResult { private long entries; private long messages; + private long markerMessages; private long filterRejectedEntries; private long filterAcceptedEntries; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index 457bae5e69c07..53ee60b67aad0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -635,6 +635,7 @@ public CompletableFuture analyzeBacklog(Optional AtomicLong rejected = new AtomicLong(); AtomicLong rescheduled = new AtomicLong(); AtomicLong messages = new AtomicLong(); + AtomicLong markerMessages = new AtomicLong(); AtomicLong acceptedMessages = new AtomicLong(); AtomicLong rejectedMessages = new AtomicLong(); AtomicLong rescheduledMessages = new AtomicLong(); @@ -693,6 +694,9 @@ public CompletableFuture analyzeBacklog(Optional } long num = entries.incrementAndGet(); messages.addAndGet(numMessages); + if (messageMetadata.hasMarkerType()) { + markerMessages.addAndGet(numMessages); + } if (num % 1000 == 0) { long end = System.currentTimeMillis(); diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/stats/AnalyzeSubscriptionBacklogResult.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/stats/AnalyzeSubscriptionBacklogResult.java index 059026b80c575..622d91e4e98a9 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/stats/AnalyzeSubscriptionBacklogResult.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/stats/AnalyzeSubscriptionBacklogResult.java @@ -26,6 +26,7 @@ public class AnalyzeSubscriptionBacklogResult { private long entries; private long messages; + private long markerMessages; private long filterRejectedEntries; private long filterAcceptedEntries; From 31340e1d2947873d4a6d81965bd1ad65ef6c4b03 Mon Sep 17 00:00:00 2001 From: Oneby Wang <891734032@qq.com> Date: Thu, 18 Dec 2025 16:46:02 +0800 Subject: [PATCH 2/9] [improve][admin] Adapt original analyzeSubscriptionBacklog tests --- .../broker/admin/AnalyzeBacklogSubscriptionTest.java | 1 + .../pulsar/broker/service/plugin/FilterEntryTest.java | 11 ++++++----- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AnalyzeBacklogSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AnalyzeBacklogSubscriptionTest.java index dbc632c6cf3ad..acea913204999 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AnalyzeBacklogSubscriptionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AnalyzeBacklogSubscriptionTest.java @@ -161,6 +161,7 @@ private void verifyBacklog(String topic, String subscription, int numEntries, in assertEquals(analyzeSubscriptionBacklogResult.getFilterRescheduledEntries(), 0); assertEquals(analyzeSubscriptionBacklogResult.getMessages(), numMessages); + assertEquals(analyzeSubscriptionBacklogResult.getMarkerMessages(), 0); assertEquals(analyzeSubscriptionBacklogResult.getFilterAcceptedMessages(), numMessages); assertEquals(analyzeSubscriptionBacklogResult.getFilterRejectedMessages(), 0); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java index 89b409ae581e9..a70c2f3cf8d64 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java @@ -253,7 +253,7 @@ public void testFilter() throws Exception { producer.send("test"); } - verifyBacklog(topic, subName, 10, 10, 10, 10, 0, 0, 0, 0); + verifyBacklog(topic, subName, 10, 10, 0, 10, 10, 0, 0, 0, 0); int counter = 0; while (true) { @@ -268,7 +268,7 @@ public void testFilter() throws Exception { // All normal messages can be received assertEquals(10, counter); - verifyBacklog(topic, subName, 0, 0, 0, 0, 0, 0, 0, 0); + verifyBacklog(topic, subName, 0, 0, 0, 0, 0, 0, 0, 0, 0); // stop the consumer consumer.close(); @@ -280,7 +280,7 @@ public void testFilter() throws Exception { // analyze the subscription and predict that // 10 messages will be rejected by the filter - verifyBacklog(topic, subName, 10, 10, 0, 0, 10, 10, 0, 0); + verifyBacklog(topic, subName, 10, 10, 0, 0, 0, 10, 10, 0, 0); consumer = pulsarClient.newConsumer(Schema.STRING) .topic(topic) @@ -304,7 +304,7 @@ public void testFilter() throws Exception { // now the Filter acknoledged the messages on behalf of the Consumer // backlog is now zero again - verifyBacklog(topic, subName, 0, 0, 0, 0, 0, 0, 0, 0); + verifyBacklog(topic, subName, 0, 0, 0, 0, 0, 0, 0, 0, 0); // All messages should be acked, check the MarkDeletedPosition assertNotNull(lastMsgId); @@ -545,7 +545,7 @@ public void testEntryFilterRescheduleMessageDependingOnConsumerSharedSubscriptio private void verifyBacklog(String topic, String subscription, - int numEntries, int numMessages, + int numEntries, int numMessages, int numMarkerMessages, int numEntriesAccepted, int numMessagesAccepted, int numEntriesRejected, int numMessagesRejected, int numEntriesRescheduled, int numMessagesRescheduled @@ -559,6 +559,7 @@ private void verifyBacklog(String topic, String subscription, Assert.assertEquals(numEntriesRescheduled, a1.getFilterRescheduledEntries()); Assert.assertEquals(numMessages, a1.getMessages()); + Assert.assertEquals(numMarkerMessages, a1.getMarkerMessages()); Assert.assertEquals(numMessagesAccepted, a1.getFilterAcceptedMessages()); Assert.assertEquals(numMessagesRejected, a1.getFilterRejectedMessages()); Assert.assertEquals(numMessagesRescheduled, a1.getFilterRescheduledMessages()); From f5cc932e059c4d4c83973bc880ff80eaebfd1821 Mon Sep 17 00:00:00 2001 From: Oneby Wang <891734032@qq.com> Date: Fri, 19 Dec 2025 10:52:22 +0800 Subject: [PATCH 3/9] [improve][admin] Add transaction related analyzeSubscriptionBacklog tests --- .../admin/v3/AdminApiTransactionTest.java | 51 +++++++++++++++++++ 1 file changed, 51 insertions(+) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java index b3c4b804286fe..56c932a473971 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java @@ -31,6 +31,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @@ -79,6 +80,7 @@ import org.apache.pulsar.common.policies.data.TransactionMetadata; import org.apache.pulsar.common.policies.data.TransactionPendingAckInternalStats; import org.apache.pulsar.common.policies.data.TransactionPendingAckStats; +import org.apache.pulsar.common.stats.AnalyzeSubscriptionBacklogResult; import org.apache.pulsar.common.stats.PositionInPendingAckStats; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.packages.management.core.MockedPackagesStorageProvider; @@ -1053,6 +1055,55 @@ public void testPeekMessageForShowAllMessages() throws Exception { } } + @Test + public void testAnalyzeSubscriptionBacklogWithTransactionMarker() throws Exception { + initTransaction(1); + final String topic = BrokerTestUtil.newUniqueName("persistent://public/default/peek_all"); + String transactionSubName = "transaction-topic-sub"; + + @Cleanup Consumer consumer = + pulsarClient.newConsumer(Schema.STRING).topic(topic).subscriptionName(transactionSubName).subscribe(); + @Cleanup Producer producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create(); + + int numMessages = 10; + String committedMsgPrefix = "commited-msg"; + List committedMsgIds = new ArrayList<>(); + for (int i = 0; i < numMessages; i++) { + Transaction txn = pulsarClient.newTransaction().build().get(); + MessageId messageId = producer.newMessage(txn).value(committedMsgPrefix + i).send(); + committedMsgIds.add(messageId); + txn.commit().get(); + } + + AnalyzeSubscriptionBacklogResult backlogResult = + admin.topics().analyzeSubscriptionBacklog(topic, transactionSubName, Optional.empty()); + assertEquals(backlogResult.getMessages(), numMessages * 2); + assertEquals(backlogResult.getMarkerMessages(), numMessages); + + MessageId committedMiddleMsgId = committedMsgIds.get(numMessages / 2); + backlogResult = + admin.topics().analyzeSubscriptionBacklog(topic, transactionSubName, Optional.of(committedMiddleMsgId)); + assertEquals(backlogResult.getMessages(), (numMessages / 2) + numMessages); + assertEquals(backlogResult.getMarkerMessages(), numMessages / 2); + + List abortedMsgIds = new ArrayList<>(); + for (int i = 0; i < numMessages; i++) { + Transaction txn = pulsarClient.newTransaction().build().get(); + MessageId messageId = producer.newMessage(txn).value("aborted-msg" + i).send(); + abortedMsgIds.add(messageId); + txn.abort(); + } + backlogResult = admin.topics().analyzeSubscriptionBacklog(topic, transactionSubName, Optional.empty()); + assertEquals(backlogResult.getMessages(), numMessages * 4); + assertEquals(backlogResult.getMarkerMessages(), numMessages * 2); + + MessageId abortedMiddleMsgId = abortedMsgIds.get(numMessages / 2); + backlogResult = + admin.topics().analyzeSubscriptionBacklog(topic, transactionSubName, Optional.of(abortedMiddleMsgId)); + assertEquals(backlogResult.getMessages(), (numMessages / 2) + numMessages); + assertEquals(backlogResult.getMarkerMessages(), numMessages / 2); + } + private static void verifyCoordinatorStats(String state, long sequenceId, long lowWaterMark) { assertEquals(state, "Ready"); From 5dc8d21a64e3caa4ec15ec7167ed7400110aa156 Mon Sep 17 00:00:00 2001 From: Oneby Wang <891734032@qq.com> Date: Fri, 19 Dec 2025 13:28:47 +0800 Subject: [PATCH 4/9] [improve][admin] Fix transaction analyzeSubscriptionBacklog test, add replicator analyzeSubscriptionBacklog test --- .../pulsar/broker/admin/v3/AdminApiTransactionTest.java | 7 +++---- .../pulsar/broker/service/ReplicatedSubscriptionTest.java | 8 ++++++++ 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java index 56c932a473971..a61fb8a9dbdb1 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java @@ -1058,11 +1058,10 @@ public void testPeekMessageForShowAllMessages() throws Exception { @Test public void testAnalyzeSubscriptionBacklogWithTransactionMarker() throws Exception { initTransaction(1); - final String topic = BrokerTestUtil.newUniqueName("persistent://public/default/peek_all"); - String transactionSubName = "transaction-topic-sub"; + final String topic = BrokerTestUtil.newUniqueName("persistent://public/default/analyze_subscription_backlog"); + String transactionSubName = "analyze_subscription_backlog-topic-sub"; - @Cleanup Consumer consumer = - pulsarClient.newConsumer(Schema.STRING).topic(topic).subscriptionName(transactionSubName).subscribe(); + pulsarClient.newConsumer(Schema.STRING).topic(topic).subscriptionName(transactionSubName).subscribe().close(); @Cleanup Producer producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create(); int numMessages = 10; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatedSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatedSubscriptionTest.java index 74dbc5e52916b..20e095f6106f7 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatedSubscriptionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatedSubscriptionTest.java @@ -39,6 +39,7 @@ import java.util.LinkedHashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.TimeUnit; import lombok.Cleanup; @@ -70,6 +71,7 @@ import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats; import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.apache.pulsar.common.policies.data.TopicStats; +import org.apache.pulsar.common.stats.AnalyzeSubscriptionBacklogResult; import org.awaitility.Awaitility; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -1090,6 +1092,12 @@ public void testReplicatedSubscriptionOneWay() throws Exception { } Assert.assertEquals(numSnapshotRequest, 1); + // Assert analyze backlog total messages and marker messages. + AnalyzeSubscriptionBacklogResult backlogResult = + admin4.topics().analyzeSubscriptionBacklog(topicName, subscriptionName, Optional.empty()); + assertEquals(backlogResult.getMessages(), numMessages + numSnapshotRequest); + assertEquals(backlogResult.getMarkerMessages(), numSnapshotRequest); + // Wait pending snapshot timeout Thread.sleep(config1.getReplicatedSubscriptionsSnapshotTimeoutSeconds() * 1000); numSnapshotRequest = 0; From 0fda5ba30eb0a9acad78f5bf05263349dc39c0bd Mon Sep 17 00:00:00 2001 From: Oneby Wang <891734032@qq.com> Date: Fri, 19 Dec 2025 14:11:32 +0800 Subject: [PATCH 5/9] [improve][admin] Set markerMessages in analyzeBacklog method --- .../broker/service/persistent/PersistentSubscription.java | 1 + .../apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java | 3 +-- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index 53ee60b67aad0..4df1863862da9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -720,6 +720,7 @@ public CompletableFuture analyzeBacklog(Optional result.setLastPosition(lastPosition.get()); result.setEntries(entries.get()); result.setMessages(messages.get()); + result.setMarkerMessages(markerMessages.get()); result.setFilterAcceptedEntries(accepted.get()); result.setFilterAcceptedMessages(acceptedMessages.get()); result.setFilterRejectedEntries(rejected.get()); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java index a61fb8a9dbdb1..945bd7f5c7e1f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java @@ -1065,11 +1065,10 @@ public void testAnalyzeSubscriptionBacklogWithTransactionMarker() throws Excepti @Cleanup Producer producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create(); int numMessages = 10; - String committedMsgPrefix = "commited-msg"; List committedMsgIds = new ArrayList<>(); for (int i = 0; i < numMessages; i++) { Transaction txn = pulsarClient.newTransaction().build().get(); - MessageId messageId = producer.newMessage(txn).value(committedMsgPrefix + i).send(); + MessageId messageId = producer.newMessage(txn).value("commited-msg" + i).send(); committedMsgIds.add(messageId); txn.commit().get(); } From 72a459c2d7299cdccc299ef258fb9f0bd28f9cbd Mon Sep 17 00:00:00 2001 From: Oneby Wang <891734032@qq.com> Date: Fri, 19 Dec 2025 15:26:35 +0800 Subject: [PATCH 6/9] [improve][admin] Fix testAnalyzeSubscriptionBacklogWithTransactionMarker test --- .../pulsar/broker/admin/v3/AdminApiTransactionTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java index 945bd7f5c7e1f..530da50498b37 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java @@ -1081,7 +1081,7 @@ public void testAnalyzeSubscriptionBacklogWithTransactionMarker() throws Excepti MessageId committedMiddleMsgId = committedMsgIds.get(numMessages / 2); backlogResult = admin.topics().analyzeSubscriptionBacklog(topic, transactionSubName, Optional.of(committedMiddleMsgId)); - assertEquals(backlogResult.getMessages(), (numMessages / 2) + numMessages); + assertEquals(backlogResult.getMessages(), numMessages); assertEquals(backlogResult.getMarkerMessages(), numMessages / 2); List abortedMsgIds = new ArrayList<>(); @@ -1098,7 +1098,7 @@ public void testAnalyzeSubscriptionBacklogWithTransactionMarker() throws Excepti MessageId abortedMiddleMsgId = abortedMsgIds.get(numMessages / 2); backlogResult = admin.topics().analyzeSubscriptionBacklog(topic, transactionSubName, Optional.of(abortedMiddleMsgId)); - assertEquals(backlogResult.getMessages(), (numMessages / 2) + numMessages); + assertEquals(backlogResult.getMessages(), numMessages); assertEquals(backlogResult.getMarkerMessages(), numMessages / 2); } From 555dc90afa1ab94b4bba2f1e542ff6584538d029 Mon Sep 17 00:00:00 2001 From: Oneby Wang <891734032@qq.com> Date: Fri, 19 Dec 2025 16:44:24 +0800 Subject: [PATCH 7/9] [improve][admin] Add comments --- .../pulsar/broker/admin/v3/AdminApiTransactionTest.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java index 530da50498b37..af0fcfed022ab 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java @@ -1058,9 +1058,11 @@ public void testPeekMessageForShowAllMessages() throws Exception { @Test public void testAnalyzeSubscriptionBacklogWithTransactionMarker() throws Exception { initTransaction(1); - final String topic = BrokerTestUtil.newUniqueName("persistent://public/default/analyze_subscription_backlog"); - String transactionSubName = "analyze_subscription_backlog-topic-sub"; + final String topic = BrokerTestUtil.newUniqueName("persistent://public/default/analyze-subscription-backlog"); + String transactionSubName = "analyze-subscription-backlog-topic-sub"; + // Init subscription and then close the consumer. If consumer is connected and has available permits, + // AbstractBaseDispatcher#filterEntriesForConsumer will auto ack marker messages pulsarClient.newConsumer(Schema.STRING).topic(topic).subscriptionName(transactionSubName).subscribe().close(); @Cleanup Producer producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create(); From c4f684a924c2741ff8aa726efa3bb5e39d7540b5 Mon Sep 17 00:00:00 2001 From: Oneby Wang <891734032@qq.com> Date: Mon, 22 Dec 2025 17:16:23 +0800 Subject: [PATCH 8/9] [improve][admin] Only increment markerMessages when message is a marker message --- .../broker/service/persistent/PersistentSubscription.java | 7 ++++--- .../pulsar/broker/admin/v3/AdminApiTransactionTest.java | 8 ++++---- .../pulsar/broker/service/ReplicatedSubscriptionTest.java | 2 +- 3 files changed, 9 insertions(+), 8 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index 4df1863862da9..786b902b3baca 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -668,6 +668,10 @@ public CompletableFuture analyzeBacklog(Optional } else { messageMetadata = Commands.peekMessageMetadata(metadataAndPayload, "", -1); } + if (messageMetadata.hasMarkerType()) { + markerMessages.incrementAndGet(); + return true; + } int numMessages = 1; if (messageMetadata.hasNumMessagesInBatch()) { numMessages = messageMetadata.getNumMessagesInBatch(); @@ -694,9 +698,6 @@ public CompletableFuture analyzeBacklog(Optional } long num = entries.incrementAndGet(); messages.addAndGet(numMessages); - if (messageMetadata.hasMarkerType()) { - markerMessages.addAndGet(numMessages); - } if (num % 1000 == 0) { long end = System.currentTimeMillis(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java index af0fcfed022ab..6634a2e857464 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java @@ -1077,13 +1077,13 @@ public void testAnalyzeSubscriptionBacklogWithTransactionMarker() throws Excepti AnalyzeSubscriptionBacklogResult backlogResult = admin.topics().analyzeSubscriptionBacklog(topic, transactionSubName, Optional.empty()); - assertEquals(backlogResult.getMessages(), numMessages * 2); + assertEquals(backlogResult.getMessages(), numMessages); assertEquals(backlogResult.getMarkerMessages(), numMessages); MessageId committedMiddleMsgId = committedMsgIds.get(numMessages / 2); backlogResult = admin.topics().analyzeSubscriptionBacklog(topic, transactionSubName, Optional.of(committedMiddleMsgId)); - assertEquals(backlogResult.getMessages(), numMessages); + assertEquals(backlogResult.getMessages(), numMessages / 2); assertEquals(backlogResult.getMarkerMessages(), numMessages / 2); List abortedMsgIds = new ArrayList<>(); @@ -1094,13 +1094,13 @@ public void testAnalyzeSubscriptionBacklogWithTransactionMarker() throws Excepti txn.abort(); } backlogResult = admin.topics().analyzeSubscriptionBacklog(topic, transactionSubName, Optional.empty()); - assertEquals(backlogResult.getMessages(), numMessages * 4); + assertEquals(backlogResult.getMessages(), numMessages * 2); assertEquals(backlogResult.getMarkerMessages(), numMessages * 2); MessageId abortedMiddleMsgId = abortedMsgIds.get(numMessages / 2); backlogResult = admin.topics().analyzeSubscriptionBacklog(topic, transactionSubName, Optional.of(abortedMiddleMsgId)); - assertEquals(backlogResult.getMessages(), numMessages); + assertEquals(backlogResult.getMessages(), numMessages / 2); assertEquals(backlogResult.getMarkerMessages(), numMessages / 2); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatedSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatedSubscriptionTest.java index 20e095f6106f7..c538207584d5b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatedSubscriptionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatedSubscriptionTest.java @@ -1095,7 +1095,7 @@ public void testReplicatedSubscriptionOneWay() throws Exception { // Assert analyze backlog total messages and marker messages. AnalyzeSubscriptionBacklogResult backlogResult = admin4.topics().analyzeSubscriptionBacklog(topicName, subscriptionName, Optional.empty()); - assertEquals(backlogResult.getMessages(), numMessages + numSnapshotRequest); + assertEquals(backlogResult.getMessages(), numMessages); assertEquals(backlogResult.getMarkerMessages(), numSnapshotRequest); // Wait pending snapshot timeout From bf0ed12367f646bdc246abd2d2b2f58ba65f33d2 Mon Sep 17 00:00:00 2001 From: Oneby Wang <891734032@qq.com> Date: Tue, 23 Dec 2025 08:49:10 +0800 Subject: [PATCH 9/9] [improve][admin] Add uncommitted messages backlog test --- .../pulsar/broker/admin/v3/AdminApiTransactionTest.java | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java index 6634a2e857464..11ef18b8fd0a7 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java @@ -1102,6 +1102,14 @@ public void testAnalyzeSubscriptionBacklogWithTransactionMarker() throws Excepti admin.topics().analyzeSubscriptionBacklog(topic, transactionSubName, Optional.of(abortedMiddleMsgId)); assertEquals(backlogResult.getMessages(), numMessages / 2); assertEquals(backlogResult.getMarkerMessages(), numMessages / 2); + + Transaction txn = pulsarClient.newTransaction().build().get(); + for (int i = 0; i < numMessages; i++) { + producer.newMessage(txn).value("uncommitted-msg-" + i).send(); + } + backlogResult = admin.topics().analyzeSubscriptionBacklog(topic, transactionSubName, Optional.empty()); + assertEquals(backlogResult.getMessages(), numMessages * 3); + assertEquals(backlogResult.getMarkerMessages(), numMessages * 2); } private static void verifyCoordinatorStats(String state,