Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,13 @@
import java.util.function.Supplier;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.common.api.proto.MessageMetadata;

/**
* The assigner to assign entries to the proper {@link Consumer} in the shared subscription.
*/
@Slf4j
@RequiredArgsConstructor
public class SharedConsumerAssignor {

Expand All @@ -50,6 +52,8 @@ public class SharedConsumerAssignor {
// Process the unassigned messages, e.g. adding them to the replay queue
private final java.util.function.Consumer<EntryAndMetadata> unassignedMessageProcessor;

private final Subscription subscription;

public Map<Consumer, List<EntryAndMetadata>> assign(final List<EntryAndMetadata> entryAndMetadataList,
final int numConsumers) {
assert numConsumers >= 0;
Expand All @@ -58,7 +62,11 @@ public Map<Consumer, List<EntryAndMetadata>> assign(final List<EntryAndMetadata>

Consumer consumer = getConsumer(numConsumers);
if (consumer == null) {
entryAndMetadataList.forEach(EntryAndMetadata::release);
if (subscription != null) {
log.info("No consumer found to assign in topic:{}, subscription:{}, redelivering {} messages.",
subscription.getTopic().getName(), subscription.getName(), entryAndMetadataList.size());
}
entryAndMetadataList.forEach(unassignedMessageProcessor);
return consumerToEntries;
}
// The actual available permits might change, here we use the permits at the moment to assign entries
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ public PersistentDispatcherMultipleConsumers(PersistentTopic topic, ManagedCurso
: RedeliveryTrackerDisabled.REDELIVERY_TRACKER_DISABLED;
this.readBatchSize = serviceConfig.getDispatcherMaxReadBatchSize();
this.initializeDispatchRateLimiterIfNeeded();
this.assignor = new SharedConsumerAssignor(this::getNextConsumer, this::addEntryToReplay);
this.assignor = new SharedConsumerAssignor(this::getNextConsumer, this::addEntryToReplay, subscription);
ServiceConfiguration serviceConfiguration = topic.getBrokerService().pulsar().getConfiguration();
this.readFailureBackoff = new Backoff(
serviceConfiguration.getDispatcherReadFailureBackoffInitialTimeInMs(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ public PersistentDispatcherMultipleConsumersClassic(PersistentTopic topic, Manag
: RedeliveryTrackerDisabled.REDELIVERY_TRACKER_DISABLED;
this.readBatchSize = serviceConfig.getDispatcherMaxReadBatchSize();
this.initializeDispatchRateLimiterIfNeeded();
this.assignor = new SharedConsumerAssignor(this::getNextConsumer, this::addMessageToReplay);
this.assignor = new SharedConsumerAssignor(this::getNextConsumer, this::addMessageToReplay, subscription);
this.readFailureBackoff = new Backoff(
topic.getBrokerService().pulsar().getConfiguration().getDispatcherReadFailureBackoffInitialTimeInMs(),
TimeUnit.MILLISECONDS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public void prepareData() {
roundRobinConsumerSelector.clear();
entryAndMetadataList.clear();
replayQueue.clear();
assignor = new SharedConsumerAssignor(roundRobinConsumerSelector, replayQueue::add);
assignor = new SharedConsumerAssignor(roundRobinConsumerSelector, replayQueue::add, null);
final AtomicLong entryId = new AtomicLong(0L);
final MockProducer producerA = new MockProducer("A", entryId, entryAndMetadataList);
final MockProducer producerB = new MockProducer("B", entryId, entryAndMetadataList);
Expand Down Expand Up @@ -238,4 +238,60 @@ private static MessageMetadata createMetadata(final String producerName,
}
return metadata;
}

/**
* When there are no consumers online, chunk messages will not be directly lost
*/
@Test
public void testChunkMessagesNotBeLostNoConsumer() {
// 1. No consumer initially
Map<Consumer, List<EntryAndMetadata>> result = assignor.assign(entryAndMetadataList, 1);
assertTrue(result.isEmpty());
assertEquals(replayQueue.size(), entryAndMetadataList.size());
assertEquals(toString(replayQueue), toString(entryAndMetadataList));

// 2. Two Consumers come online
final Consumer consumerA = new Consumer("A", 100);
final Consumer consumerB = new Consumer("B", 100);
roundRobinConsumerSelector.addConsumers(consumerA, consumerB);

// 3. Retry messages from replay queue
List<EntryAndMetadata> retryList = new ArrayList<>(replayQueue);
replayQueue.clear();

// Use a larger batch size to ensure we can process enough messages
result = assignor.assign(retryList, 10);

// 4. Verify consumer receives all messages
int totalReceived = result.values().stream().mapToInt(List::size).sum();
assertEquals(totalReceived, retryList.size());

// Verify that chunks are assigned to the same consumer
List<String> entriesA = toString(result.getOrDefault(consumerA, Collections.emptyList()));
List<String> entriesB = toString(result.getOrDefault(consumerB, Collections.emptyList()));

// Check A-1 chunks (0:1, 0:2, 0:5)
boolean a1InA = entriesA.stream().anyMatch(s -> s.contains("A-1"));
if (a1InA) {
assertTrue(entriesA.containsAll(Arrays.asList("0:1@A-1-0-3", "0:2@A-1-1-3", "0:5@A-1-2-3")));
assertTrue(entriesB.stream().noneMatch(s -> s.contains("A-1")));
} else {
assertTrue(entriesB.containsAll(Arrays.asList("0:1@A-1-0-3", "0:2@A-1-1-3", "0:5@A-1-2-3")));
assertTrue(entriesA.stream().noneMatch(s -> s.contains("A-1")));
}

// Check B-1 chunks (0:4, 0:6)
boolean b1InA = entriesA.stream().anyMatch(s -> s.contains("B-1"));
if (b1InA) {
assertTrue(entriesA.containsAll(Arrays.asList("0:4@B-1-0-2", "0:6@B-1-1-2")));
assertTrue(entriesB.stream().noneMatch(s -> s.contains("B-1")));
} else {
assertTrue(entriesB.containsAll(Arrays.asList("0:4@B-1-0-2", "0:6@B-1-1-2")));
assertTrue(entriesA.stream().noneMatch(s -> s.contains("B-1")));
}

// 5. Verify internal state is clean (since all chunks are completed)
assertTrue(assignor.getUuidToConsumer().isEmpty());
}

}