diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 85387ccc26731..be40132cfbfa7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -1693,6 +1693,7 @@ private void internalAnalyzeSubscriptionBacklogForNonPartitionedTopic(AsyncRespo result.setEntries(rawResult.getEntries()); result.setMessages(rawResult.getMessages()); + result.setMarkerMessages(rawResult.getMarkerMessages()); result.setFilterAcceptedEntries(rawResult.getFilterAcceptedEntries()); result.setFilterRejectedEntries(rawResult.getFilterRejectedEntries()); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AnalyzeBacklogResult.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AnalyzeBacklogResult.java index e227acf4e8f62..b9c279f97cec7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AnalyzeBacklogResult.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AnalyzeBacklogResult.java @@ -29,6 +29,7 @@ public final class AnalyzeBacklogResult { private long entries; private long messages; + private long markerMessages; private long filterRejectedEntries; private long filterAcceptedEntries; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index 457bae5e69c07..786b902b3baca 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -635,6 +635,7 @@ public CompletableFuture analyzeBacklog(Optional AtomicLong rejected = new AtomicLong(); AtomicLong rescheduled = new AtomicLong(); AtomicLong messages = new AtomicLong(); + AtomicLong markerMessages = new AtomicLong(); AtomicLong acceptedMessages = new AtomicLong(); AtomicLong rejectedMessages = new AtomicLong(); AtomicLong rescheduledMessages = new AtomicLong(); @@ -667,6 +668,10 @@ public CompletableFuture analyzeBacklog(Optional } else { messageMetadata = Commands.peekMessageMetadata(metadataAndPayload, "", -1); } + if (messageMetadata.hasMarkerType()) { + markerMessages.incrementAndGet(); + return true; + } int numMessages = 1; if (messageMetadata.hasNumMessagesInBatch()) { numMessages = messageMetadata.getNumMessagesInBatch(); @@ -716,6 +721,7 @@ public CompletableFuture analyzeBacklog(Optional result.setLastPosition(lastPosition.get()); result.setEntries(entries.get()); result.setMessages(messages.get()); + result.setMarkerMessages(markerMessages.get()); result.setFilterAcceptedEntries(accepted.get()); result.setFilterAcceptedMessages(acceptedMessages.get()); result.setFilterRejectedEntries(rejected.get()); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AnalyzeBacklogSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AnalyzeBacklogSubscriptionTest.java index dbc632c6cf3ad..acea913204999 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AnalyzeBacklogSubscriptionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AnalyzeBacklogSubscriptionTest.java @@ -161,6 +161,7 @@ private void verifyBacklog(String topic, String subscription, int numEntries, in assertEquals(analyzeSubscriptionBacklogResult.getFilterRescheduledEntries(), 0); assertEquals(analyzeSubscriptionBacklogResult.getMessages(), numMessages); + assertEquals(analyzeSubscriptionBacklogResult.getMarkerMessages(), 0); assertEquals(analyzeSubscriptionBacklogResult.getFilterAcceptedMessages(), numMessages); assertEquals(analyzeSubscriptionBacklogResult.getFilterRejectedMessages(), 0); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java index b3c4b804286fe..11ef18b8fd0a7 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/v3/AdminApiTransactionTest.java @@ -31,6 +31,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @@ -79,6 +80,7 @@ import org.apache.pulsar.common.policies.data.TransactionMetadata; import org.apache.pulsar.common.policies.data.TransactionPendingAckInternalStats; import org.apache.pulsar.common.policies.data.TransactionPendingAckStats; +import org.apache.pulsar.common.stats.AnalyzeSubscriptionBacklogResult; import org.apache.pulsar.common.stats.PositionInPendingAckStats; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.packages.management.core.MockedPackagesStorageProvider; @@ -1053,6 +1055,63 @@ public void testPeekMessageForShowAllMessages() throws Exception { } } + @Test + public void testAnalyzeSubscriptionBacklogWithTransactionMarker() throws Exception { + initTransaction(1); + final String topic = BrokerTestUtil.newUniqueName("persistent://public/default/analyze-subscription-backlog"); + String transactionSubName = "analyze-subscription-backlog-topic-sub"; + + // Init subscription and then close the consumer. If consumer is connected and has available permits, + // AbstractBaseDispatcher#filterEntriesForConsumer will auto ack marker messages + pulsarClient.newConsumer(Schema.STRING).topic(topic).subscriptionName(transactionSubName).subscribe().close(); + @Cleanup Producer producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create(); + + int numMessages = 10; + List committedMsgIds = new ArrayList<>(); + for (int i = 0; i < numMessages; i++) { + Transaction txn = pulsarClient.newTransaction().build().get(); + MessageId messageId = producer.newMessage(txn).value("commited-msg" + i).send(); + committedMsgIds.add(messageId); + txn.commit().get(); + } + + AnalyzeSubscriptionBacklogResult backlogResult = + admin.topics().analyzeSubscriptionBacklog(topic, transactionSubName, Optional.empty()); + assertEquals(backlogResult.getMessages(), numMessages); + assertEquals(backlogResult.getMarkerMessages(), numMessages); + + MessageId committedMiddleMsgId = committedMsgIds.get(numMessages / 2); + backlogResult = + admin.topics().analyzeSubscriptionBacklog(topic, transactionSubName, Optional.of(committedMiddleMsgId)); + assertEquals(backlogResult.getMessages(), numMessages / 2); + assertEquals(backlogResult.getMarkerMessages(), numMessages / 2); + + List abortedMsgIds = new ArrayList<>(); + for (int i = 0; i < numMessages; i++) { + Transaction txn = pulsarClient.newTransaction().build().get(); + MessageId messageId = producer.newMessage(txn).value("aborted-msg" + i).send(); + abortedMsgIds.add(messageId); + txn.abort(); + } + backlogResult = admin.topics().analyzeSubscriptionBacklog(topic, transactionSubName, Optional.empty()); + assertEquals(backlogResult.getMessages(), numMessages * 2); + assertEquals(backlogResult.getMarkerMessages(), numMessages * 2); + + MessageId abortedMiddleMsgId = abortedMsgIds.get(numMessages / 2); + backlogResult = + admin.topics().analyzeSubscriptionBacklog(topic, transactionSubName, Optional.of(abortedMiddleMsgId)); + assertEquals(backlogResult.getMessages(), numMessages / 2); + assertEquals(backlogResult.getMarkerMessages(), numMessages / 2); + + Transaction txn = pulsarClient.newTransaction().build().get(); + for (int i = 0; i < numMessages; i++) { + producer.newMessage(txn).value("uncommitted-msg-" + i).send(); + } + backlogResult = admin.topics().analyzeSubscriptionBacklog(topic, transactionSubName, Optional.empty()); + assertEquals(backlogResult.getMessages(), numMessages * 3); + assertEquals(backlogResult.getMarkerMessages(), numMessages * 2); + } + private static void verifyCoordinatorStats(String state, long sequenceId, long lowWaterMark) { assertEquals(state, "Ready"); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatedSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatedSubscriptionTest.java index 74dbc5e52916b..c538207584d5b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatedSubscriptionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatedSubscriptionTest.java @@ -39,6 +39,7 @@ import java.util.LinkedHashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.TimeUnit; import lombok.Cleanup; @@ -70,6 +71,7 @@ import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats; import org.apache.pulsar.common.policies.data.TenantInfoImpl; import org.apache.pulsar.common.policies.data.TopicStats; +import org.apache.pulsar.common.stats.AnalyzeSubscriptionBacklogResult; import org.awaitility.Awaitility; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -1090,6 +1092,12 @@ public void testReplicatedSubscriptionOneWay() throws Exception { } Assert.assertEquals(numSnapshotRequest, 1); + // Assert analyze backlog total messages and marker messages. + AnalyzeSubscriptionBacklogResult backlogResult = + admin4.topics().analyzeSubscriptionBacklog(topicName, subscriptionName, Optional.empty()); + assertEquals(backlogResult.getMessages(), numMessages); + assertEquals(backlogResult.getMarkerMessages(), numSnapshotRequest); + // Wait pending snapshot timeout Thread.sleep(config1.getReplicatedSubscriptionsSnapshotTimeoutSeconds() * 1000); numSnapshotRequest = 0; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java index 89b409ae581e9..a70c2f3cf8d64 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/plugin/FilterEntryTest.java @@ -253,7 +253,7 @@ public void testFilter() throws Exception { producer.send("test"); } - verifyBacklog(topic, subName, 10, 10, 10, 10, 0, 0, 0, 0); + verifyBacklog(topic, subName, 10, 10, 0, 10, 10, 0, 0, 0, 0); int counter = 0; while (true) { @@ -268,7 +268,7 @@ public void testFilter() throws Exception { // All normal messages can be received assertEquals(10, counter); - verifyBacklog(topic, subName, 0, 0, 0, 0, 0, 0, 0, 0); + verifyBacklog(topic, subName, 0, 0, 0, 0, 0, 0, 0, 0, 0); // stop the consumer consumer.close(); @@ -280,7 +280,7 @@ public void testFilter() throws Exception { // analyze the subscription and predict that // 10 messages will be rejected by the filter - verifyBacklog(topic, subName, 10, 10, 0, 0, 10, 10, 0, 0); + verifyBacklog(topic, subName, 10, 10, 0, 0, 0, 10, 10, 0, 0); consumer = pulsarClient.newConsumer(Schema.STRING) .topic(topic) @@ -304,7 +304,7 @@ public void testFilter() throws Exception { // now the Filter acknoledged the messages on behalf of the Consumer // backlog is now zero again - verifyBacklog(topic, subName, 0, 0, 0, 0, 0, 0, 0, 0); + verifyBacklog(topic, subName, 0, 0, 0, 0, 0, 0, 0, 0, 0); // All messages should be acked, check the MarkDeletedPosition assertNotNull(lastMsgId); @@ -545,7 +545,7 @@ public void testEntryFilterRescheduleMessageDependingOnConsumerSharedSubscriptio private void verifyBacklog(String topic, String subscription, - int numEntries, int numMessages, + int numEntries, int numMessages, int numMarkerMessages, int numEntriesAccepted, int numMessagesAccepted, int numEntriesRejected, int numMessagesRejected, int numEntriesRescheduled, int numMessagesRescheduled @@ -559,6 +559,7 @@ private void verifyBacklog(String topic, String subscription, Assert.assertEquals(numEntriesRescheduled, a1.getFilterRescheduledEntries()); Assert.assertEquals(numMessages, a1.getMessages()); + Assert.assertEquals(numMarkerMessages, a1.getMarkerMessages()); Assert.assertEquals(numMessagesAccepted, a1.getFilterAcceptedMessages()); Assert.assertEquals(numMessagesRejected, a1.getFilterRejectedMessages()); Assert.assertEquals(numMessagesRescheduled, a1.getFilterRescheduledMessages()); diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/stats/AnalyzeSubscriptionBacklogResult.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/stats/AnalyzeSubscriptionBacklogResult.java index 059026b80c575..622d91e4e98a9 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/stats/AnalyzeSubscriptionBacklogResult.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/common/stats/AnalyzeSubscriptionBacklogResult.java @@ -26,6 +26,7 @@ public class AnalyzeSubscriptionBacklogResult { private long entries; private long messages; + private long markerMessages; private long filterRejectedEntries; private long filterAcceptedEntries;