diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java index 843d5a18b654f..e16adaee4a160 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java @@ -422,6 +422,46 @@ public interface ConsumerBuilder extends Cloneable { */ ConsumerBuilder receiverQueueSize(int receiverQueueSize); + /** + * Sets whether to enable {@link #multiTopicsSinglePartitionReceiverQueueSize(int)} config or not. + * + *

By default, this config is false, then every partition consumer's receiverQueueSize is equal to + * multi-topics consumer's receiverQueueSize. + * + *

If set to true, then every partition consumer's receiverQueueSize in multi-topics consumer is set to + * {@link #multiTopicsSinglePartitionReceiverQueueSize(int)} + * + *

Attention: Partitioned-topic's receiverQueueSize calculate logic is a little bit different from + * non-partitioned topic, see {@link #maxTotalReceiverQueueSizeAcrossPartitions(int)} + * + * @param multiTopicsSinglePartitionReceiverQueueSizeEnable + * enable multiTopicsSinglePartitionReceiverQueueSize or not + * @return the consumer builder instance + */ + ConsumerBuilder enableMultiTopicsSinglePartitionReceiverQueueSize( + boolean multiTopicsSinglePartitionReceiverQueueSizeEnable); + + /** + * Sets single partition consumer's receiverQueueSize in multi-topics consumer. + * + *

Attention: This config only takes effect when + * {@link #enableMultiTopicsSinglePartitionReceiverQueueSize(boolean)} is set to true. + * + *

For partitioned-topic with n partitions, Pulsar creates n independent ConsumerImpl instances for each + * partition. For non-partitioned topic, Pulsar creates one ConsumerImpl instance. + * + *

This config just set each ConsumerImpl's receiverQueueSize to multiTopicsSingleConsumerReceiverQueueSize in + * multi-topics consumer. + * + *

For partitioned-topic, each partition consumer's receiverQueueSize is the min value of receiverQueueSize and + * (maxTotalReceiverQueueSizeAcrossPartitions / numPartitions) + * + * @param multiTopicsSinglePartitionReceiverQueueSize + * the receiverQueueSize of single partition consumer in multi-topics consumer + * @return the consumer builder instance + */ + ConsumerBuilder multiTopicsSinglePartitionReceiverQueueSize(int multiTopicsSinglePartitionReceiverQueueSize); + /** * Sets amount of time for group consumer acknowledgments. * diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ReaderBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ReaderBuilder.java index ead36f13ea8ce..a090ba9a76402 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ReaderBuilder.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ReaderBuilder.java @@ -248,6 +248,29 @@ public interface ReaderBuilder extends Cloneable { */ ReaderBuilder receiverQueueSize(int receiverQueueSize); + /** + * Sets whether to enable {@link #multiTopicsSinglePartitionReceiverQueueSize(int)} config or not. + * + *

See also {@link ConsumerBuilder#enableMultiTopicsSinglePartitionReceiverQueueSize(boolean)} + * + * @param multiTopicsSinglePartitionReceiverQueueSizeEnable + * enable multiTopicsSinglePartitionReceiverQueueSize or not + * @return the reader builder instance + */ + ReaderBuilder enableMultiTopicsSinglePartitionReceiverQueueSize( + boolean multiTopicsSinglePartitionReceiverQueueSizeEnable); + + /** + * Sets single partition consumer's receiverQueueSize in multi-topics consumer. + * + *

See also {@link ConsumerBuilder#multiTopicsSinglePartitionReceiverQueueSize(int)} + * + * @param multiTopicsSinglePartitionReceiverQueueSize + * the receiverQueueSize of single partition consumer in multi-topics consumer + * @return the reader builder instance + */ + ReaderBuilder multiTopicsSinglePartitionReceiverQueueSize(int multiTopicsSinglePartitionReceiverQueueSize); + /** * Specify a reader name. * diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java index dc2363c279f7e..a0079d13fbbe0 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java @@ -377,6 +377,22 @@ public ConsumerBuilder receiverQueueSize(int receiverQueueSize) { return this; } + @Override + public ConsumerBuilder enableMultiTopicsSinglePartitionReceiverQueueSize( + boolean multiTopicsSinglePartitionReceiverQueueSizeEnable) { + conf.setMultiTopicsSinglePartitionReceiverQueueSizeEnable(multiTopicsSinglePartitionReceiverQueueSizeEnable); + return this; + } + + @Override + public ConsumerBuilder multiTopicsSinglePartitionReceiverQueueSize( + int multiTopicsSinglePartitionReceiverQueueSize) { + checkArgument(multiTopicsSinglePartitionReceiverQueueSize >= 0, + "multiTopicsSinglePartitionReceiverQueueSize needs to be >= 0"); + conf.setMultiTopicsSinglePartitionReceiverQueueSize(multiTopicsSinglePartitionReceiverQueueSize); + return this; + } + @Override public ConsumerBuilder acknowledgmentGroupTime(long delay, TimeUnit unit) { checkArgument(delay >= 0, "acknowledgmentGroupTime needs to be >= 0"); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index 998dc52951ad3..d02bbd843ac20 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -1110,7 +1110,7 @@ private void doSubscribeTopicPartitions(Schema schema, } allTopicPartitionsNumber.addAndGet(numPartitions); - int receiverQueueSize = Math.min(conf.getReceiverQueueSize(), + int receiverQueueSize = Math.min(getSinglePartitionReceiverQueueSize(), conf.getMaxTotalReceiverQueueSizeAcrossPartitions() / numPartitions); ConsumerConfigurationData configurationData = getInternalConsumerConfig(); configurationData.setReceiverQueueSize(receiverQueueSize); @@ -1171,6 +1171,8 @@ private void doSubscribeTopicPartitions(Schema schema, return existingValue; } else { internalConfig.setStartPaused(paused); + ConsumerConfigurationData configurationData = getInternalConsumerConfig(); + configurationData.setReceiverQueueSize(getSinglePartitionReceiverQueueSize()); ConsumerImpl newConsumer = createInternalConsumer(internalConfig, topicName, -1, subscribeFuture, createIfDoesNotExist, schema); if (paused) { @@ -1217,7 +1219,7 @@ private ConsumerImpl createInternalConsumer(ConsumerConfigurationData conf int partitionIndex, CompletableFuture> subFuture, boolean createIfDoesNotExist, Schema schema) { BatchReceivePolicy internalBatchReceivePolicy = BatchReceivePolicy.builder() - .maxNumMessages(Math.max(configurationData.getReceiverQueueSize() / 2, 1)) + .maxNumMessages(Math.max(getSinglePartitionReceiverQueueSize() / 2, 1)) .maxNumBytes(-1) .timeout(1, TimeUnit.MILLISECONDS) .build(); @@ -1448,6 +1450,10 @@ private CompletableFuture subscribeIncreasedTopicPartitions(String topicNa CompletableFuture> subFuture = new CompletableFuture<>(); ConsumerConfigurationData configurationData = getInternalConsumerConfig(); configurationData.setStartPaused(paused); + // It is kind of difficult to get previous configured receiverQueueSize. + // So we just use getSinglePartitionReceiverQueueSize() to set new partition consumer's + // receiverQueueSize, it is developer's duty to set this to a reasonable value + configurationData.setReceiverQueueSize(getSinglePartitionReceiverQueueSize()); ConsumerImpl newConsumer = createInternalConsumer(configurationData, partitionName, partitionIndex, subFuture, true, schema); synchronized (pauseMutex) { @@ -1637,6 +1643,11 @@ private CompletableFuture> getExistsPartitions(String topic) { }); } + protected int getSinglePartitionReceiverQueueSize() { + return conf.isMultiTopicsSinglePartitionReceiverQueueSizeEnable() + ? conf.getMultiTopicsSinglePartitionReceiverQueueSize() : conf.getReceiverQueueSize(); + } + private ConsumerInterceptors getInternalConsumerInterceptors(ConsumerInterceptors multiTopicInterceptors) { return new ConsumerInterceptors(new ArrayList<>()) { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsReaderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsReaderImpl.java index 18c44122050ab..2db1de3c312f4 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsReaderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsReaderImpl.java @@ -71,6 +71,10 @@ public MultiTopicsReaderImpl(PulsarClientImpl client, ReaderConfigurationData consumerConfiguration.setSubscriptionType(SubscriptionType.Exclusive); consumerConfiguration.setSubscriptionMode(SubscriptionMode.NonDurable); consumerConfiguration.setReceiverQueueSize(readerConfiguration.getReceiverQueueSize()); + consumerConfiguration.setMultiTopicsSinglePartitionReceiverQueueSizeEnable( + readerConfiguration.isMultiTopicsSinglePartitionReceiverQueueSizeEnable()); + consumerConfiguration.setMultiTopicsSinglePartitionReceiverQueueSize( + readerConfiguration.getMultiTopicsSinglePartitionReceiverQueueSize()); consumerConfiguration.setReadCompacted(readerConfiguration.isReadCompacted()); consumerConfiguration.setPoolMessages(readerConfiguration.isPoolMessages()); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java index ed1169ce4db9a..5d441768bff5c 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java @@ -206,6 +206,22 @@ public ReaderBuilder receiverQueueSize(int receiverQueueSize) { return this; } + @Override + public ReaderBuilder enableMultiTopicsSinglePartitionReceiverQueueSize( + boolean multiTopicsSinglePartitionReceiverQueueSizeEnable) { + conf.setMultiTopicsSinglePartitionReceiverQueueSizeEnable(multiTopicsSinglePartitionReceiverQueueSizeEnable); + return this; + } + + @Override + public ReaderBuilder multiTopicsSinglePartitionReceiverQueueSize( + int multiTopicsSinglePartitionReceiverQueueSize) { + checkArgument(multiTopicsSinglePartitionReceiverQueueSize >= 0, + "multiTopicsSinglePartitionReceiverQueueSize needs to be >= 0"); + conf.setMultiTopicsSinglePartitionReceiverQueueSize(multiTopicsSinglePartitionReceiverQueueSize); + return this; + } + @Override public ReaderBuilder readerName(String readerName) { conf.setReaderName(readerName); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java index 42fc2666573fe..df9656cc841c1 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java @@ -132,6 +132,30 @@ public class ConsumerConfigurationData implements Serializable, Cloneable { ) private int receiverQueueSize = 1000; + @ApiModelProperty( + name = "multiTopicsSinglePartitionReceiverQueueSizeEnable", + value = "Determine whether multiTopicsSinglePartitionReceiverQueueSize is effective.\n" + + "\n" + + "For backward compatibility, the default value is set to false." + ) + private boolean multiTopicsSinglePartitionReceiverQueueSizeEnable = false; + + @ApiModelProperty( + name = "multiTopicsSinglePartitionReceiverQueueSize", + value = "Size of a single partition consumer's receiver queue in multi-topics consumer.\n" + "\n" + + "In MultiTopicsConsumerImpl, if you have a partitioned-topic with 10 partitions, set this value" + + " to 200, then each partition consumer's receiverQueueSize is 200, and " + + "MultiTopicsConsumerImpl's receiverQueueSize is 1000, so the max messages in memory is 1000 + " + + "200 * 10 = 3000.\n" + + "\n" + + "In PatternMultiTopicsConsumerImpl, if you subscribe to a regex pattern that matches 600 topics" + + " with 1000 partitions, set this to 50, then each partition consumer's receiverQueueSize is " + + "200, and PatternMultiTopicsConsumerImpl's receiverQueueSize is 1000, so the max messages in " + + "memory is 1000 + 50 * 200 = 11000.\n" + + "\n" + ) + private int multiTopicsSinglePartitionReceiverQueueSize = 1000; + @ApiModelProperty( name = "acknowledgementsGroupTimeMicros", value = "Group a consumer acknowledgment for a specified time.\n" diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ReaderConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ReaderConfigurationData.java index 1fe39e6329c22..727e7f7aed629 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ReaderConfigurationData.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ReaderConfigurationData.java @@ -68,6 +68,30 @@ public class ReaderConfigurationData implements Serializable, Cloneable { ) private int receiverQueueSize = 1000; + @ApiModelProperty( + name = "multiTopicsSinglePartitionReceiverQueueSizeEnable", + value = "Determine whether multiTopicsSinglePartitionReceiverQueueSize is effective.\n" + + "\n" + + "For backward compatibility, the default value is set to false." + ) + private boolean multiTopicsSinglePartitionReceiverQueueSizeEnable = false; + + @ApiModelProperty( + name = "multiTopicsSinglePartitionReceiverQueueSize", + value = "Size of a single partition consumer's receiver queue in multi-topics consumer.\n" + "\n" + + "In MultiTopicsConsumerImpl, if you have a partitioned-topic with 10 partitions, set this value" + + " to 200, then each partition consumer's receiverQueueSize is 200, and " + + "MultiTopicsConsumerImpl's receiverQueueSize is 1000, so the max messages in memory is 1000 + " + + "200 * 10 = 3000.\n" + + "\n" + + "In PatternMultiTopicsConsumerImpl, if you subscribe to a regex pattern that matches 600 topics" + + " with 1000 partitions, set this to 50, then each partition consumer's receiverQueueSize is " + + "200, and PatternMultiTopicsConsumerImpl's receiverQueueSize is 1000, so the max messages in " + + "memory is 1000 + 50 * 200 = 11000.\n" + + "\n" + ) + private int multiTopicsSinglePartitionReceiverQueueSize = 1000; + @ApiModelProperty( name = "readerListener", value = "A listener that is called for message received."