Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,46 @@ public interface ConsumerBuilder<T> extends Cloneable {
*/
ConsumerBuilder<T> receiverQueueSize(int receiverQueueSize);

/**
* Sets whether to enable {@link #multiTopicsSinglePartitionReceiverQueueSize(int)} config or not.
*
* <p>By default, this config is false, then every partition consumer's receiverQueueSize is equal to
* multi-topics consumer's receiverQueueSize.
*
* <p>If set to true, then every partition consumer's receiverQueueSize in multi-topics consumer is set to
* {@link #multiTopicsSinglePartitionReceiverQueueSize(int)}
*
* <p>Attention: Partitioned-topic's receiverQueueSize calculate logic is a little bit different from
* non-partitioned topic, see {@link #maxTotalReceiverQueueSizeAcrossPartitions(int)}
*
* @param multiTopicsSinglePartitionReceiverQueueSizeEnable
* enable multiTopicsSinglePartitionReceiverQueueSize or not
* @return the consumer builder instance
*/
ConsumerBuilder<T> enableMultiTopicsSinglePartitionReceiverQueueSize(
boolean multiTopicsSinglePartitionReceiverQueueSizeEnable);

/**
* Sets single partition consumer's receiverQueueSize in multi-topics consumer.
*
* <p>Attention: This config only takes effect when
* {@link #enableMultiTopicsSinglePartitionReceiverQueueSize(boolean)} is set to true.
*
* <p>For partitioned-topic with n partitions, Pulsar creates n independent ConsumerImpl instances for each
* partition. For non-partitioned topic, Pulsar creates one ConsumerImpl instance.
*
* <p>This config just set each ConsumerImpl's receiverQueueSize to multiTopicsSingleConsumerReceiverQueueSize in
* multi-topics consumer.
*
* <p>For partitioned-topic, each partition consumer's receiverQueueSize is the min value of receiverQueueSize and
* (maxTotalReceiverQueueSizeAcrossPartitions / numPartitions)
*
* @param multiTopicsSinglePartitionReceiverQueueSize
* the receiverQueueSize of single partition consumer in multi-topics consumer
* @return the consumer builder instance
*/
ConsumerBuilder<T> multiTopicsSinglePartitionReceiverQueueSize(int multiTopicsSinglePartitionReceiverQueueSize);

/**
* Sets amount of time for group consumer acknowledgments.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,29 @@ public interface ReaderBuilder<T> extends Cloneable {
*/
ReaderBuilder<T> receiverQueueSize(int receiverQueueSize);

/**
* Sets whether to enable {@link #multiTopicsSinglePartitionReceiverQueueSize(int)} config or not.
*
* <p>See also {@link ConsumerBuilder#enableMultiTopicsSinglePartitionReceiverQueueSize(boolean)}
*
* @param multiTopicsSinglePartitionReceiverQueueSizeEnable
* enable multiTopicsSinglePartitionReceiverQueueSize or not
* @return the reader builder instance
*/
ReaderBuilder<T> enableMultiTopicsSinglePartitionReceiverQueueSize(
boolean multiTopicsSinglePartitionReceiverQueueSizeEnable);

/**
* Sets single partition consumer's receiverQueueSize in multi-topics consumer.
*
* <p>See also {@link ConsumerBuilder#multiTopicsSinglePartitionReceiverQueueSize(int)}
*
* @param multiTopicsSinglePartitionReceiverQueueSize
* the receiverQueueSize of single partition consumer in multi-topics consumer
* @return the reader builder instance
*/
ReaderBuilder<T> multiTopicsSinglePartitionReceiverQueueSize(int multiTopicsSinglePartitionReceiverQueueSize);

/**
* Specify a reader name.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,22 @@ public ConsumerBuilder<T> receiverQueueSize(int receiverQueueSize) {
return this;
}

@Override
public ConsumerBuilder<T> enableMultiTopicsSinglePartitionReceiverQueueSize(
boolean multiTopicsSinglePartitionReceiverQueueSizeEnable) {
conf.setMultiTopicsSinglePartitionReceiverQueueSizeEnable(multiTopicsSinglePartitionReceiverQueueSizeEnable);
return this;
}

@Override
public ConsumerBuilder<T> multiTopicsSinglePartitionReceiverQueueSize(
int multiTopicsSinglePartitionReceiverQueueSize) {
checkArgument(multiTopicsSinglePartitionReceiverQueueSize >= 0,
"multiTopicsSinglePartitionReceiverQueueSize needs to be >= 0");
conf.setMultiTopicsSinglePartitionReceiverQueueSize(multiTopicsSinglePartitionReceiverQueueSize);
return this;
}

@Override
public ConsumerBuilder<T> acknowledgmentGroupTime(long delay, TimeUnit unit) {
checkArgument(delay >= 0, "acknowledgmentGroupTime needs to be >= 0");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1110,7 +1110,7 @@ private void doSubscribeTopicPartitions(Schema<T> schema,
}
allTopicPartitionsNumber.addAndGet(numPartitions);

int receiverQueueSize = Math.min(conf.getReceiverQueueSize(),
int receiverQueueSize = Math.min(getSinglePartitionReceiverQueueSize(),
conf.getMaxTotalReceiverQueueSizeAcrossPartitions() / numPartitions);
ConsumerConfigurationData<T> configurationData = getInternalConsumerConfig();
configurationData.setReceiverQueueSize(receiverQueueSize);
Expand Down Expand Up @@ -1171,6 +1171,8 @@ private void doSubscribeTopicPartitions(Schema<T> schema,
return existingValue;
} else {
internalConfig.setStartPaused(paused);
ConsumerConfigurationData<T> configurationData = getInternalConsumerConfig();
configurationData.setReceiverQueueSize(getSinglePartitionReceiverQueueSize());
ConsumerImpl<T> newConsumer = createInternalConsumer(internalConfig, topicName,
-1, subscribeFuture, createIfDoesNotExist, schema);
if (paused) {
Expand Down Expand Up @@ -1217,7 +1219,7 @@ private ConsumerImpl<T> createInternalConsumer(ConsumerConfigurationData<T> conf
int partitionIndex, CompletableFuture<Consumer<T>> subFuture,
boolean createIfDoesNotExist, Schema<T> schema) {
BatchReceivePolicy internalBatchReceivePolicy = BatchReceivePolicy.builder()
.maxNumMessages(Math.max(configurationData.getReceiverQueueSize() / 2, 1))
.maxNumMessages(Math.max(getSinglePartitionReceiverQueueSize() / 2, 1))
.maxNumBytes(-1)
.timeout(1, TimeUnit.MILLISECONDS)
.build();
Expand Down Expand Up @@ -1448,6 +1450,10 @@ private CompletableFuture<Void> subscribeIncreasedTopicPartitions(String topicNa
CompletableFuture<Consumer<T>> subFuture = new CompletableFuture<>();
ConsumerConfigurationData<T> configurationData = getInternalConsumerConfig();
configurationData.setStartPaused(paused);
// It is kind of difficult to get previous configured receiverQueueSize.
// So we just use getSinglePartitionReceiverQueueSize() to set new partition consumer's
// receiverQueueSize, it is developer's duty to set this to a reasonable value
configurationData.setReceiverQueueSize(getSinglePartitionReceiverQueueSize());
ConsumerImpl<T> newConsumer = createInternalConsumer(configurationData, partitionName,
partitionIndex, subFuture, true, schema);
synchronized (pauseMutex) {
Expand Down Expand Up @@ -1637,6 +1643,11 @@ private CompletableFuture<List<Integer>> getExistsPartitions(String topic) {
});
}

protected int getSinglePartitionReceiverQueueSize() {
return conf.isMultiTopicsSinglePartitionReceiverQueueSizeEnable()
? conf.getMultiTopicsSinglePartitionReceiverQueueSize() : conf.getReceiverQueueSize();
}

private ConsumerInterceptors<T> getInternalConsumerInterceptors(ConsumerInterceptors<T> multiTopicInterceptors) {
return new ConsumerInterceptors<T>(new ArrayList<>()) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ public MultiTopicsReaderImpl(PulsarClientImpl client, ReaderConfigurationData<T>
consumerConfiguration.setSubscriptionType(SubscriptionType.Exclusive);
consumerConfiguration.setSubscriptionMode(SubscriptionMode.NonDurable);
consumerConfiguration.setReceiverQueueSize(readerConfiguration.getReceiverQueueSize());
consumerConfiguration.setMultiTopicsSinglePartitionReceiverQueueSizeEnable(
readerConfiguration.isMultiTopicsSinglePartitionReceiverQueueSizeEnable());
consumerConfiguration.setMultiTopicsSinglePartitionReceiverQueueSize(
readerConfiguration.getMultiTopicsSinglePartitionReceiverQueueSize());
consumerConfiguration.setReadCompacted(readerConfiguration.isReadCompacted());
consumerConfiguration.setPoolMessages(readerConfiguration.isPoolMessages());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,22 @@ public ReaderBuilder<T> receiverQueueSize(int receiverQueueSize) {
return this;
}

@Override
public ReaderBuilder<T> enableMultiTopicsSinglePartitionReceiverQueueSize(
boolean multiTopicsSinglePartitionReceiverQueueSizeEnable) {
conf.setMultiTopicsSinglePartitionReceiverQueueSizeEnable(multiTopicsSinglePartitionReceiverQueueSizeEnable);
return this;
}

@Override
public ReaderBuilder<T> multiTopicsSinglePartitionReceiverQueueSize(
int multiTopicsSinglePartitionReceiverQueueSize) {
checkArgument(multiTopicsSinglePartitionReceiverQueueSize >= 0,
"multiTopicsSinglePartitionReceiverQueueSize needs to be >= 0");
conf.setMultiTopicsSinglePartitionReceiverQueueSize(multiTopicsSinglePartitionReceiverQueueSize);
return this;
}

@Override
public ReaderBuilder<T> readerName(String readerName) {
conf.setReaderName(readerName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,30 @@ public class ConsumerConfigurationData<T> implements Serializable, Cloneable {
)
private int receiverQueueSize = 1000;

@ApiModelProperty(
name = "multiTopicsSinglePartitionReceiverQueueSizeEnable",
value = "Determine whether multiTopicsSinglePartitionReceiverQueueSize is effective.\n"
+ "\n"
+ "For backward compatibility, the default value is set to false."
)
private boolean multiTopicsSinglePartitionReceiverQueueSizeEnable = false;

@ApiModelProperty(
name = "multiTopicsSinglePartitionReceiverQueueSize",
value = "Size of a single partition consumer's receiver queue in multi-topics consumer.\n" + "\n"
+ "In MultiTopicsConsumerImpl, if you have a partitioned-topic with 10 partitions, set this value"
+ " to 200, then each partition consumer's receiverQueueSize is 200, and "
+ "MultiTopicsConsumerImpl's receiverQueueSize is 1000, so the max messages in memory is 1000 + "
+ "200 * 10 = 3000.\n"
+ "\n"
+ "In PatternMultiTopicsConsumerImpl, if you subscribe to a regex pattern that matches 600 topics"
+ " with 1000 partitions, set this to 50, then each partition consumer's receiverQueueSize is "
+ "200, and PatternMultiTopicsConsumerImpl's receiverQueueSize is 1000, so the max messages in "
+ "memory is 1000 + 50 * 200 = 11000.\n"
+ "\n"
)
private int multiTopicsSinglePartitionReceiverQueueSize = 1000;

@ApiModelProperty(
name = "acknowledgementsGroupTimeMicros",
value = "Group a consumer acknowledgment for a specified time.\n"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,30 @@ public class ReaderConfigurationData<T> implements Serializable, Cloneable {
)
private int receiverQueueSize = 1000;

@ApiModelProperty(
name = "multiTopicsSinglePartitionReceiverQueueSizeEnable",
value = "Determine whether multiTopicsSinglePartitionReceiverQueueSize is effective.\n"
+ "\n"
+ "For backward compatibility, the default value is set to false."
)
private boolean multiTopicsSinglePartitionReceiverQueueSizeEnable = false;

@ApiModelProperty(
name = "multiTopicsSinglePartitionReceiverQueueSize",
value = "Size of a single partition consumer's receiver queue in multi-topics consumer.\n" + "\n"
+ "In MultiTopicsConsumerImpl, if you have a partitioned-topic with 10 partitions, set this value"
+ " to 200, then each partition consumer's receiverQueueSize is 200, and "
+ "MultiTopicsConsumerImpl's receiverQueueSize is 1000, so the max messages in memory is 1000 + "
+ "200 * 10 = 3000.\n"
+ "\n"
+ "In PatternMultiTopicsConsumerImpl, if you subscribe to a regex pattern that matches 600 topics"
+ " with 1000 partitions, set this to 50, then each partition consumer's receiverQueueSize is "
+ "200, and PatternMultiTopicsConsumerImpl's receiverQueueSize is 1000, so the max messages in "
+ "memory is 1000 + 50 * 200 = 11000.\n"
+ "\n"
)
private int multiTopicsSinglePartitionReceiverQueueSize = 1000;

@ApiModelProperty(
name = "readerListener",
value = "A listener that is called for message received."
Expand Down