Skip to content
Open
Original file line number Diff line number Diff line change
Expand Up @@ -1693,6 +1693,7 @@ private void internalAnalyzeSubscriptionBacklogForNonPartitionedTopic(AsyncRespo

result.setEntries(rawResult.getEntries());
result.setMessages(rawResult.getMessages());
result.setMarkerMessages(rawResult.getMarkerMessages());

result.setFilterAcceptedEntries(rawResult.getFilterAcceptedEntries());
result.setFilterRejectedEntries(rawResult.getFilterRejectedEntries());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public final class AnalyzeBacklogResult {

private long entries;
private long messages;
private long markerMessages;

private long filterRejectedEntries;
private long filterAcceptedEntries;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -635,6 +635,7 @@ public CompletableFuture<AnalyzeBacklogResult> analyzeBacklog(Optional<Position>
AtomicLong rejected = new AtomicLong();
AtomicLong rescheduled = new AtomicLong();
AtomicLong messages = new AtomicLong();
AtomicLong markerMessages = new AtomicLong();
AtomicLong acceptedMessages = new AtomicLong();
AtomicLong rejectedMessages = new AtomicLong();
AtomicLong rescheduledMessages = new AtomicLong();
Expand Down Expand Up @@ -667,6 +668,10 @@ public CompletableFuture<AnalyzeBacklogResult> analyzeBacklog(Optional<Position>
} else {
messageMetadata = Commands.peekMessageMetadata(metadataAndPayload, "", -1);
}
if (messageMetadata.hasMarkerType()) {
markerMessages.incrementAndGet();
return true;
}
int numMessages = 1;
if (messageMetadata.hasNumMessagesInBatch()) {
numMessages = messageMetadata.getNumMessagesInBatch();
Expand Down Expand Up @@ -716,6 +721,7 @@ public CompletableFuture<AnalyzeBacklogResult> analyzeBacklog(Optional<Position>
result.setLastPosition(lastPosition.get());
result.setEntries(entries.get());
result.setMessages(messages.get());
result.setMarkerMessages(markerMessages.get());
result.setFilterAcceptedEntries(accepted.get());
result.setFilterAcceptedMessages(acceptedMessages.get());
result.setFilterRejectedEntries(rejected.get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ private void verifyBacklog(String topic, String subscription, int numEntries, in
assertEquals(analyzeSubscriptionBacklogResult.getFilterRescheduledEntries(), 0);

assertEquals(analyzeSubscriptionBacklogResult.getMessages(), numMessages);
assertEquals(analyzeSubscriptionBacklogResult.getMarkerMessages(), 0);
assertEquals(analyzeSubscriptionBacklogResult.getFilterAcceptedMessages(), numMessages);
assertEquals(analyzeSubscriptionBacklogResult.getFilterRejectedMessages(), 0);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -79,6 +80,7 @@
import org.apache.pulsar.common.policies.data.TransactionMetadata;
import org.apache.pulsar.common.policies.data.TransactionPendingAckInternalStats;
import org.apache.pulsar.common.policies.data.TransactionPendingAckStats;
import org.apache.pulsar.common.stats.AnalyzeSubscriptionBacklogResult;
import org.apache.pulsar.common.stats.PositionInPendingAckStats;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.packages.management.core.MockedPackagesStorageProvider;
Expand Down Expand Up @@ -1053,6 +1055,63 @@ public void testPeekMessageForShowAllMessages() throws Exception {
}
}

@Test
public void testAnalyzeSubscriptionBacklogWithTransactionMarker() throws Exception {
initTransaction(1);
final String topic = BrokerTestUtil.newUniqueName("persistent://public/default/analyze-subscription-backlog");
String transactionSubName = "analyze-subscription-backlog-topic-sub";

// Init subscription and then close the consumer. If consumer is connected and has available permits,
// AbstractBaseDispatcher#filterEntriesForConsumer will auto ack marker messages
pulsarClient.newConsumer(Schema.STRING).topic(topic).subscriptionName(transactionSubName).subscribe().close();
@Cleanup Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create();

int numMessages = 10;
List<MessageId> committedMsgIds = new ArrayList<>();
for (int i = 0; i < numMessages; i++) {
Transaction txn = pulsarClient.newTransaction().build().get();
MessageId messageId = producer.newMessage(txn).value("commited-msg" + i).send();
committedMsgIds.add(messageId);
txn.commit().get();
}

AnalyzeSubscriptionBacklogResult backlogResult =
admin.topics().analyzeSubscriptionBacklog(topic, transactionSubName, Optional.empty());
assertEquals(backlogResult.getMessages(), numMessages);
assertEquals(backlogResult.getMarkerMessages(), numMessages);

MessageId committedMiddleMsgId = committedMsgIds.get(numMessages / 2);
backlogResult =
admin.topics().analyzeSubscriptionBacklog(topic, transactionSubName, Optional.of(committedMiddleMsgId));
assertEquals(backlogResult.getMessages(), numMessages / 2);
assertEquals(backlogResult.getMarkerMessages(), numMessages / 2);

List<MessageId> abortedMsgIds = new ArrayList<>();
for (int i = 0; i < numMessages; i++) {
Transaction txn = pulsarClient.newTransaction().build().get();
MessageId messageId = producer.newMessage(txn).value("aborted-msg" + i).send();
abortedMsgIds.add(messageId);
txn.abort();
}
backlogResult = admin.topics().analyzeSubscriptionBacklog(topic, transactionSubName, Optional.empty());
assertEquals(backlogResult.getMessages(), numMessages * 2);
assertEquals(backlogResult.getMarkerMessages(), numMessages * 2);

MessageId abortedMiddleMsgId = abortedMsgIds.get(numMessages / 2);
backlogResult =
admin.topics().analyzeSubscriptionBacklog(topic, transactionSubName, Optional.of(abortedMiddleMsgId));
assertEquals(backlogResult.getMessages(), numMessages / 2);
assertEquals(backlogResult.getMarkerMessages(), numMessages / 2);

Transaction txn = pulsarClient.newTransaction().build().get();
for (int i = 0; i < numMessages; i++) {
producer.newMessage(txn).value("uncommitted-msg-" + i).send();
}
backlogResult = admin.topics().analyzeSubscriptionBacklog(topic, transactionSubName, Optional.empty());
assertEquals(backlogResult.getMessages(), numMessages * 3);
assertEquals(backlogResult.getMarkerMessages(), numMessages * 2);
}

private static void verifyCoordinatorStats(String state,
long sequenceId, long lowWaterMark) {
assertEquals(state, "Ready");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
Expand Down Expand Up @@ -70,6 +71,7 @@
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.stats.AnalyzeSubscriptionBacklogResult;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -1090,6 +1092,12 @@ public void testReplicatedSubscriptionOneWay() throws Exception {
}
Assert.assertEquals(numSnapshotRequest, 1);

// Assert analyze backlog total messages and marker messages.
AnalyzeSubscriptionBacklogResult backlogResult =
admin4.topics().analyzeSubscriptionBacklog(topicName, subscriptionName, Optional.empty());
assertEquals(backlogResult.getMessages(), numMessages);
assertEquals(backlogResult.getMarkerMessages(), numSnapshotRequest);

// Wait pending snapshot timeout
Thread.sleep(config1.getReplicatedSubscriptionsSnapshotTimeoutSeconds() * 1000);
numSnapshotRequest = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ public void testFilter() throws Exception {
producer.send("test");
}

verifyBacklog(topic, subName, 10, 10, 10, 10, 0, 0, 0, 0);
verifyBacklog(topic, subName, 10, 10, 0, 10, 10, 0, 0, 0, 0);

int counter = 0;
while (true) {
Expand All @@ -268,7 +268,7 @@ public void testFilter() throws Exception {
// All normal messages can be received
assertEquals(10, counter);

verifyBacklog(topic, subName, 0, 0, 0, 0, 0, 0, 0, 0);
verifyBacklog(topic, subName, 0, 0, 0, 0, 0, 0, 0, 0, 0);

// stop the consumer
consumer.close();
Expand All @@ -280,7 +280,7 @@ public void testFilter() throws Exception {

// analyze the subscription and predict that
// 10 messages will be rejected by the filter
verifyBacklog(topic, subName, 10, 10, 0, 0, 10, 10, 0, 0);
verifyBacklog(topic, subName, 10, 10, 0, 0, 0, 10, 10, 0, 0);

consumer = pulsarClient.newConsumer(Schema.STRING)
.topic(topic)
Expand All @@ -304,7 +304,7 @@ public void testFilter() throws Exception {

// now the Filter acknoledged the messages on behalf of the Consumer
// backlog is now zero again
verifyBacklog(topic, subName, 0, 0, 0, 0, 0, 0, 0, 0);
verifyBacklog(topic, subName, 0, 0, 0, 0, 0, 0, 0, 0, 0);

// All messages should be acked, check the MarkDeletedPosition
assertNotNull(lastMsgId);
Expand Down Expand Up @@ -545,7 +545,7 @@ public void testEntryFilterRescheduleMessageDependingOnConsumerSharedSubscriptio


private void verifyBacklog(String topic, String subscription,
int numEntries, int numMessages,
int numEntries, int numMessages, int numMarkerMessages,
int numEntriesAccepted, int numMessagesAccepted,
int numEntriesRejected, int numMessagesRejected,
int numEntriesRescheduled, int numMessagesRescheduled
Expand All @@ -559,6 +559,7 @@ private void verifyBacklog(String topic, String subscription,
Assert.assertEquals(numEntriesRescheduled, a1.getFilterRescheduledEntries());

Assert.assertEquals(numMessages, a1.getMessages());
Assert.assertEquals(numMarkerMessages, a1.getMarkerMessages());
Assert.assertEquals(numMessagesAccepted, a1.getFilterAcceptedMessages());
Assert.assertEquals(numMessagesRejected, a1.getFilterRejectedMessages());
Assert.assertEquals(numMessagesRescheduled, a1.getFilterRescheduledMessages());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
public class AnalyzeSubscriptionBacklogResult {
private long entries;
private long messages;
private long markerMessages;

private long filterRejectedEntries;
private long filterAcceptedEntries;
Expand Down
Loading