From a8b7ba413a3ce7a9dcbd6a24c69845fd0636d94a Mon Sep 17 00:00:00 2001 From: zjxxzjwang Date: Tue, 16 Dec 2025 15:44:32 +0800 Subject: [PATCH 1/3] [fix][broker]Fixed an issue where sending chunk messages would be lost when no consumers were online --- .../pulsar/broker/service/SharedConsumerAssignor.java | 10 +++++++++- .../PersistentDispatcherMultipleConsumers.java | 2 +- .../PersistentDispatcherMultipleConsumersClassic.java | 2 +- .../broker/service/SharedConsumerAssignorTest.java | 2 +- 4 files changed, 12 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SharedConsumerAssignor.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SharedConsumerAssignor.java index 2161e418dff00..63ffc24ca5515 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SharedConsumerAssignor.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SharedConsumerAssignor.java @@ -27,11 +27,13 @@ import java.util.function.Supplier; import lombok.Getter; import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.common.api.proto.MessageMetadata; /** * The assigner to assign entries to the proper {@link Consumer} in the shared subscription. */ +@Slf4j @RequiredArgsConstructor public class SharedConsumerAssignor { @@ -50,6 +52,8 @@ public class SharedConsumerAssignor { // Process the unassigned messages, e.g. adding them to the replay queue private final java.util.function.Consumer unassignedMessageProcessor; + private final Subscription subscription; + public Map> assign(final List entryAndMetadataList, final int numConsumers) { assert numConsumers >= 0; @@ -58,7 +62,11 @@ public Map> assign(final List Consumer consumer = getConsumer(numConsumers); if (consumer == null) { - entryAndMetadataList.forEach(EntryAndMetadata::release); + if (subscription != null) { + log.info("Not found assign consumer in topic:{},subscription:{}, redelivery {} messages.", + subscription.getTopic(), subscription.getName(), entryAndMetadataList.size()); + } + entryAndMetadataList.forEach(unassignedMessageProcessor); return consumerToEntries; } // The actual available permits might change, here we use the permits at the moment to assign entries diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index 097ab9cd0febf..4f33e3e379bf8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -163,7 +163,7 @@ public PersistentDispatcherMultipleConsumers(PersistentTopic topic, ManagedCurso : RedeliveryTrackerDisabled.REDELIVERY_TRACKER_DISABLED; this.readBatchSize = serviceConfig.getDispatcherMaxReadBatchSize(); this.initializeDispatchRateLimiterIfNeeded(); - this.assignor = new SharedConsumerAssignor(this::getNextConsumer, this::addEntryToReplay); + this.assignor = new SharedConsumerAssignor(this::getNextConsumer, this::addEntryToReplay, subscription); ServiceConfiguration serviceConfiguration = topic.getBrokerService().pulsar().getConfiguration(); this.readFailureBackoff = new Backoff( serviceConfiguration.getDispatcherReadFailureBackoffInitialTimeInMs(), diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersClassic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersClassic.java index 0f496e461b85c..0746b7215b167 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersClassic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersClassic.java @@ -157,7 +157,7 @@ public PersistentDispatcherMultipleConsumersClassic(PersistentTopic topic, Manag : RedeliveryTrackerDisabled.REDELIVERY_TRACKER_DISABLED; this.readBatchSize = serviceConfig.getDispatcherMaxReadBatchSize(); this.initializeDispatchRateLimiterIfNeeded(); - this.assignor = new SharedConsumerAssignor(this::getNextConsumer, this::addMessageToReplay); + this.assignor = new SharedConsumerAssignor(this::getNextConsumer, this::addMessageToReplay, subscription); this.readFailureBackoff = new Backoff( topic.getBrokerService().pulsar().getConfiguration().getDispatcherReadFailureBackoffInitialTimeInMs(), TimeUnit.MILLISECONDS, diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedConsumerAssignorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedConsumerAssignorTest.java index 1b253df0f3772..92ff38ca03f1c 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedConsumerAssignorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedConsumerAssignorTest.java @@ -58,7 +58,7 @@ public void prepareData() { roundRobinConsumerSelector.clear(); entryAndMetadataList.clear(); replayQueue.clear(); - assignor = new SharedConsumerAssignor(roundRobinConsumerSelector, replayQueue::add); + assignor = new SharedConsumerAssignor(roundRobinConsumerSelector, replayQueue::add, null); final AtomicLong entryId = new AtomicLong(0L); final MockProducer producerA = new MockProducer("A", entryId, entryAndMetadataList); final MockProducer producerB = new MockProducer("B", entryId, entryAndMetadataList); From 36bde3a1dca8aa44c33c3b2d5fd6d858c7fee938 Mon Sep 17 00:00:00 2001 From: zjxxzjwang Date: Mon, 22 Dec 2025 23:11:07 +0800 Subject: [PATCH 2/3] [fix][broker]Fixed an issue where sending chunk messages would be lost when no consumers were online --- .../service/SharedConsumerAssignorTest.java | 56 +++++++++++++++++++ 1 file changed, 56 insertions(+) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedConsumerAssignorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedConsumerAssignorTest.java index 92ff38ca03f1c..b86746e22cc42 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedConsumerAssignorTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/SharedConsumerAssignorTest.java @@ -238,4 +238,60 @@ private static MessageMetadata createMetadata(final String producerName, } return metadata; } + + /** + * When there are no consumers online, chunk messages will not be directly lost + */ + @Test + public void testChunkMessagesNotBeLostNoConsumer() { + // 1. No consumer initially + Map> result = assignor.assign(entryAndMetadataList, 1); + assertTrue(result.isEmpty()); + assertEquals(replayQueue.size(), entryAndMetadataList.size()); + assertEquals(toString(replayQueue), toString(entryAndMetadataList)); + + // 2. Two Consumers come online + final Consumer consumerA = new Consumer("A", 100); + final Consumer consumerB = new Consumer("B", 100); + roundRobinConsumerSelector.addConsumers(consumerA, consumerB); + + // 3. Retry messages from replay queue + List retryList = new ArrayList<>(replayQueue); + replayQueue.clear(); + + // Use a larger batch size to ensure we can process enough messages + result = assignor.assign(retryList, 10); + + // 4. Verify consumer receives all messages + int totalReceived = result.values().stream().mapToInt(List::size).sum(); + assertEquals(totalReceived, retryList.size()); + + // Verify that chunks are assigned to the same consumer + List entriesA = toString(result.getOrDefault(consumerA, Collections.emptyList())); + List entriesB = toString(result.getOrDefault(consumerB, Collections.emptyList())); + + // Check A-1 chunks (0:1, 0:2, 0:5) + boolean a1InA = entriesA.stream().anyMatch(s -> s.contains("A-1")); + if (a1InA) { + assertTrue(entriesA.containsAll(Arrays.asList("0:1@A-1-0-3", "0:2@A-1-1-3", "0:5@A-1-2-3"))); + assertTrue(entriesB.stream().noneMatch(s -> s.contains("A-1"))); + } else { + assertTrue(entriesB.containsAll(Arrays.asList("0:1@A-1-0-3", "0:2@A-1-1-3", "0:5@A-1-2-3"))); + assertTrue(entriesA.stream().noneMatch(s -> s.contains("A-1"))); + } + + // Check B-1 chunks (0:4, 0:6) + boolean b1InA = entriesA.stream().anyMatch(s -> s.contains("B-1")); + if (b1InA) { + assertTrue(entriesA.containsAll(Arrays.asList("0:4@B-1-0-2", "0:6@B-1-1-2"))); + assertTrue(entriesB.stream().noneMatch(s -> s.contains("B-1"))); + } else { + assertTrue(entriesB.containsAll(Arrays.asList("0:4@B-1-0-2", "0:6@B-1-1-2"))); + assertTrue(entriesA.stream().noneMatch(s -> s.contains("B-1"))); + } + + // 5. Verify internal state is clean (since all chunks are completed) + assertTrue(assignor.getUuidToConsumer().isEmpty()); + } + } From 9b50ca20f3421dc7bb9198caee34e31fa208524d Mon Sep 17 00:00:00 2001 From: zjxxzjwang Date: Mon, 22 Dec 2025 23:17:26 +0800 Subject: [PATCH 3/3] [fix][broker]Fixed an issue where sending chunk messages would be lost when no consumers were online --- .../apache/pulsar/broker/service/SharedConsumerAssignor.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SharedConsumerAssignor.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SharedConsumerAssignor.java index 63ffc24ca5515..bbf8dfd2b10fc 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SharedConsumerAssignor.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SharedConsumerAssignor.java @@ -63,8 +63,8 @@ public Map> assign(final List Consumer consumer = getConsumer(numConsumers); if (consumer == null) { if (subscription != null) { - log.info("Not found assign consumer in topic:{},subscription:{}, redelivery {} messages.", - subscription.getTopic(), subscription.getName(), entryAndMetadataList.size()); + log.info("No consumer found to assign in topic:{}, subscription:{}, redelivering {} messages.", + subscription.getTopic().getName(), subscription.getName(), entryAndMetadataList.size()); } entryAndMetadataList.forEach(unassignedMessageProcessor); return consumerToEntries;