From d088585ee9ceb1955b01def238d3eb1eadc26a3d Mon Sep 17 00:00:00 2001 From: oneby-wang Date: Fri, 17 Oct 2025 13:47:22 +0800 Subject: [PATCH 1/5] [improve][client] Add independent multiTopicsSingleConsumerReceiverQueueSize config for single consumer in MultiTopicsConsumerImpl to reduce memory consumption --- .../client/impl/MultiTopicsConsumerImpl.java | 6 ++++-- .../client/impl/MultiTopicsReaderImpl.java | 2 ++ .../impl/conf/ConsumerConfigurationData.java | 18 ++++++++++++++++++ .../impl/conf/ReaderConfigurationData.java | 18 ++++++++++++++++++ 4 files changed, 42 insertions(+), 2 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index 998dc52951ad3..37fb9f4c04060 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -1110,7 +1110,7 @@ private void doSubscribeTopicPartitions(Schema schema, } allTopicPartitionsNumber.addAndGet(numPartitions); - int receiverQueueSize = Math.min(conf.getReceiverQueueSize(), + int receiverQueueSize = Math.min(conf.getMultiTopicsSingleConsumerReceiverQueueSize(), conf.getMaxTotalReceiverQueueSizeAcrossPartitions() / numPartitions); ConsumerConfigurationData configurationData = getInternalConsumerConfig(); configurationData.setReceiverQueueSize(receiverQueueSize); @@ -1171,6 +1171,8 @@ private void doSubscribeTopicPartitions(Schema schema, return existingValue; } else { internalConfig.setStartPaused(paused); + ConsumerConfigurationData configurationData = getInternalConsumerConfig(); + configurationData.setReceiverQueueSize(conf.getMultiTopicsSingleConsumerReceiverQueueSize()); ConsumerImpl newConsumer = createInternalConsumer(internalConfig, topicName, -1, subscribeFuture, createIfDoesNotExist, schema); if (paused) { @@ -1217,7 +1219,7 @@ private ConsumerImpl createInternalConsumer(ConsumerConfigurationData conf int partitionIndex, CompletableFuture> subFuture, boolean createIfDoesNotExist, Schema schema) { BatchReceivePolicy internalBatchReceivePolicy = BatchReceivePolicy.builder() - .maxNumMessages(Math.max(configurationData.getReceiverQueueSize() / 2, 1)) + .maxNumMessages(Math.max(configurationData.getMultiTopicsSingleConsumerReceiverQueueSize() / 2, 1)) .maxNumBytes(-1) .timeout(1, TimeUnit.MILLISECONDS) .build(); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsReaderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsReaderImpl.java index 18c44122050ab..903a427070512 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsReaderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsReaderImpl.java @@ -71,6 +71,8 @@ public MultiTopicsReaderImpl(PulsarClientImpl client, ReaderConfigurationData consumerConfiguration.setSubscriptionType(SubscriptionType.Exclusive); consumerConfiguration.setSubscriptionMode(SubscriptionMode.NonDurable); consumerConfiguration.setReceiverQueueSize(readerConfiguration.getReceiverQueueSize()); + consumerConfiguration.setMultiTopicsSingleConsumerReceiverQueueSize( + readerConfiguration.getMultiTopicsSingleConsumerReceiverQueueSize()); consumerConfiguration.setReadCompacted(readerConfiguration.isReadCompacted()); consumerConfiguration.setPoolMessages(readerConfiguration.isPoolMessages()); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java index 42fc2666573fe..766c85d1a19f4 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java @@ -132,6 +132,24 @@ public class ConsumerConfigurationData implements Serializable, Cloneable { ) private int receiverQueueSize = 1000; + @ApiModelProperty( + name = "multiTopicsSingleConsumerReceiverQueueSize", + value = "Size of a single consumer's receiver queue in multi-topics consumer.\n" + + "\n" + + "In MultiTopicsConsumerImpl, if you have a partitioned-topic with 10 partitions, set this value" + + " to 200, then each non-partitioned topic consumer's receiverQueueSize 200, and " + + "MultiTopicsConsumerImpl's receiverQueueSize is 1000, so the max messages in memory is 1000 + " + + "200 * 10 = 3000.\n" + + "\n" + + "In PatternMultiTopicsConsumerImpl, if you subscribe to a regex pattern that matches 1000 " + + "non-partitioned topics, set this to 50, then each non-partitioned topic consumer's " + + "receiverQueueSize 200, and PatternMultiTopicsConsumerImpl's receiverQueueSize is 1000, so the " + + "max messages in memory is 1000 + 50 * 200 = 11000.\n" + + "\n" + + "For backward compatibility, the default value is set to 1000, the same as receiverQueueSize." + ) + private int multiTopicsSingleConsumerReceiverQueueSize = 1000; + @ApiModelProperty( name = "acknowledgementsGroupTimeMicros", value = "Group a consumer acknowledgment for a specified time.\n" diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ReaderConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ReaderConfigurationData.java index 1fe39e6329c22..4d36d02c8bd4d 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ReaderConfigurationData.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ReaderConfigurationData.java @@ -68,6 +68,24 @@ public class ReaderConfigurationData implements Serializable, Cloneable { ) private int receiverQueueSize = 1000; + @ApiModelProperty( + name = "multiTopicsSingleConsumerReceiverQueueSize", + value = "Size of a single consumer's receiver queue in multi-topics consumer.\n" + + "\n" + + "In MultiTopicsConsumerImpl, if you have a partitioned-topic with 10 partitions, set this value" + + " to 200, then each non-partitioned topic consumer's receiverQueueSize 200, and " + + "MultiTopicsConsumerImpl's receiverQueueSize is 1000, so the max messages in memory is 1000 + " + + "200 * 10 = 3000.\n" + + "\n" + + "In PatternMultiTopicsConsumerImpl, if you subscribe to a regex pattern that matches 1000 " + + "non-partitioned topics, set this to 50, then each non-partitioned topic consumer's " + + "receiverQueueSize 200, and PatternMultiTopicsConsumerImpl's receiverQueueSize is 1000, so the " + + "max messages in memory is 1000 + 50 * 200 = 11000.\n" + + "\n" + + "For backward compatibility, the default value is set to 1000, the same as receiverQueueSize." + ) + private int multiTopicsSingleConsumerReceiverQueueSize = 1000; + @ApiModelProperty( name = "readerListener", value = "A listener that is called for message received." From 754e7abb19e3dce0959111e09b60dafc31b9f759 Mon Sep 17 00:00:00 2001 From: oneby-wang Date: Fri, 17 Oct 2025 14:42:19 +0800 Subject: [PATCH 2/5] [improve][client] Add multiTopicsSingleConsumerReceiverQueueSizeEnabled switch --- .../pulsar/client/impl/MultiTopicsConsumerImpl.java | 12 +++++++++--- .../pulsar/client/impl/MultiTopicsReaderImpl.java | 2 ++ .../client/impl/conf/ConsumerConfigurationData.java | 9 ++++++++- .../client/impl/conf/ReaderConfigurationData.java | 9 ++++++++- 4 files changed, 27 insertions(+), 5 deletions(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index 37fb9f4c04060..1db16f38390ff 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -1110,7 +1110,7 @@ private void doSubscribeTopicPartitions(Schema schema, } allTopicPartitionsNumber.addAndGet(numPartitions); - int receiverQueueSize = Math.min(conf.getMultiTopicsSingleConsumerReceiverQueueSize(), + int receiverQueueSize = Math.min(getSingleConsumerReceiverQueueSize(), conf.getMaxTotalReceiverQueueSizeAcrossPartitions() / numPartitions); ConsumerConfigurationData configurationData = getInternalConsumerConfig(); configurationData.setReceiverQueueSize(receiverQueueSize); @@ -1172,7 +1172,7 @@ private void doSubscribeTopicPartitions(Schema schema, } else { internalConfig.setStartPaused(paused); ConsumerConfigurationData configurationData = getInternalConsumerConfig(); - configurationData.setReceiverQueueSize(conf.getMultiTopicsSingleConsumerReceiverQueueSize()); + configurationData.setReceiverQueueSize(getSingleConsumerReceiverQueueSize()); ConsumerImpl newConsumer = createInternalConsumer(internalConfig, topicName, -1, subscribeFuture, createIfDoesNotExist, schema); if (paused) { @@ -1219,7 +1219,7 @@ private ConsumerImpl createInternalConsumer(ConsumerConfigurationData conf int partitionIndex, CompletableFuture> subFuture, boolean createIfDoesNotExist, Schema schema) { BatchReceivePolicy internalBatchReceivePolicy = BatchReceivePolicy.builder() - .maxNumMessages(Math.max(configurationData.getMultiTopicsSingleConsumerReceiverQueueSize() / 2, 1)) + .maxNumMessages(Math.max(getSingleConsumerReceiverQueueSize() / 2, 1)) .maxNumBytes(-1) .timeout(1, TimeUnit.MILLISECONDS) .build(); @@ -1639,6 +1639,12 @@ private CompletableFuture> getExistsPartitions(String topic) { }); } + private int getSingleConsumerReceiverQueueSize() { + return conf.isMultiTopicsSingleConsumerReceiverQueueSizeEnabled() ? + conf.getMultiTopicsSingleConsumerReceiverQueueSize() : + conf.getReceiverQueueSize(); + } + private ConsumerInterceptors getInternalConsumerInterceptors(ConsumerInterceptors multiTopicInterceptors) { return new ConsumerInterceptors(new ArrayList<>()) { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsReaderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsReaderImpl.java index 903a427070512..48fc7c18e5f65 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsReaderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsReaderImpl.java @@ -71,6 +71,8 @@ public MultiTopicsReaderImpl(PulsarClientImpl client, ReaderConfigurationData consumerConfiguration.setSubscriptionType(SubscriptionType.Exclusive); consumerConfiguration.setSubscriptionMode(SubscriptionMode.NonDurable); consumerConfiguration.setReceiverQueueSize(readerConfiguration.getReceiverQueueSize()); + consumerConfiguration.setMultiTopicsSingleConsumerReceiverQueueSizeEnabled( + readerConfiguration.isMultiTopicsSingleConsumerReceiverQueueSizeEnabled()); consumerConfiguration.setMultiTopicsSingleConsumerReceiverQueueSize( readerConfiguration.getMultiTopicsSingleConsumerReceiverQueueSize()); consumerConfiguration.setReadCompacted(readerConfiguration.isReadCompacted()); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java index 766c85d1a19f4..1942f4c2ecfaa 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java @@ -132,6 +132,14 @@ public class ConsumerConfigurationData implements Serializable, Cloneable { ) private int receiverQueueSize = 1000; + @ApiModelProperty( + name = "multiTopicsSingleConsumerReceiverQueueSizeEnabled", + value = "Determine whether multiTopicsSingleConsumerReceiverQueueSize is effective.\n" + + "\n" + + "For backward compatibility, the default value is set to false." + ) + private boolean multiTopicsSingleConsumerReceiverQueueSizeEnabled = false; + @ApiModelProperty( name = "multiTopicsSingleConsumerReceiverQueueSize", value = "Size of a single consumer's receiver queue in multi-topics consumer.\n" @@ -146,7 +154,6 @@ public class ConsumerConfigurationData implements Serializable, Cloneable { + "receiverQueueSize 200, and PatternMultiTopicsConsumerImpl's receiverQueueSize is 1000, so the " + "max messages in memory is 1000 + 50 * 200 = 11000.\n" + "\n" - + "For backward compatibility, the default value is set to 1000, the same as receiverQueueSize." ) private int multiTopicsSingleConsumerReceiverQueueSize = 1000; diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ReaderConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ReaderConfigurationData.java index 4d36d02c8bd4d..df49fa9e8247c 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ReaderConfigurationData.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ReaderConfigurationData.java @@ -68,6 +68,14 @@ public class ReaderConfigurationData implements Serializable, Cloneable { ) private int receiverQueueSize = 1000; + @ApiModelProperty( + name = "multiTopicsSingleConsumerReceiverQueueSizeEnabled", + value = "Determine whether multiTopicsSingleConsumerReceiverQueueSize is effective.\n" + + "\n" + + "For backward compatibility, the default value is set to false." + ) + private boolean multiTopicsSingleConsumerReceiverQueueSizeEnabled = false; + @ApiModelProperty( name = "multiTopicsSingleConsumerReceiverQueueSize", value = "Size of a single consumer's receiver queue in multi-topics consumer.\n" @@ -82,7 +90,6 @@ public class ReaderConfigurationData implements Serializable, Cloneable { + "receiverQueueSize 200, and PatternMultiTopicsConsumerImpl's receiverQueueSize is 1000, so the " + "max messages in memory is 1000 + 50 * 200 = 11000.\n" + "\n" - + "For backward compatibility, the default value is set to 1000, the same as receiverQueueSize." ) private int multiTopicsSingleConsumerReceiverQueueSize = 1000; From 5f33993388332a73e58b80099ba63c7e286ec096 Mon Sep 17 00:00:00 2001 From: oneby-wang Date: Fri, 17 Oct 2025 16:39:09 +0800 Subject: [PATCH 3/5] [improve][client] Add method in ConsumerBuilder and ReaderBuilder --- .../pulsar/client/api/ConsumerBuilder.java | 40 +++++++++++++++++++ .../pulsar/client/api/ReaderBuilder.java | 23 +++++++++++ .../client/impl/ConsumerBuilderImpl.java | 16 ++++++++ .../client/impl/MultiTopicsConsumerImpl.java | 13 +++--- .../client/impl/MultiTopicsReaderImpl.java | 8 ++-- .../pulsar/client/impl/ReaderBuilderImpl.java | 16 ++++++++ .../impl/conf/ConsumerConfigurationData.java | 23 +++++------ .../impl/conf/ReaderConfigurationData.java | 23 +++++------ 8 files changed, 127 insertions(+), 35 deletions(-) diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java index 843d5a18b654f..e392bb2e64031 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java @@ -422,6 +422,46 @@ public interface ConsumerBuilder extends Cloneable { */ ConsumerBuilder receiverQueueSize(int receiverQueueSize); + /** + * Sets whether to enable {@link #multiTopicsSinglePartitionReceiverQueueSize(int)} config or not. + * + *

By default, this config is false, then every partition consumer's receiverQueueSize is equal to + * multi-topics consumer's receiverQueueSize. + * + *

If set to true, then every partition consumer's receiverQueueSize in multi-topics consumer is set to + * {@link #multiTopicsSinglePartitionReceiverQueueSize(int)} + * + *

Attention: Partitioned-topic's receiverQueueSize calculate logic is a little bit different from + * non-partitioned topic, see {@link #maxTotalReceiverQueueSizeAcrossPartitions(int)} + * + * @param multiTopicsSinglePartitionReceiverQueueSizeEnable + * enable multiTopicsSinglePartitionReceiverQueueSize or not + * @return the consumer builder instance + */ + ConsumerBuilder enableMultiTopicsSinglePartitionReceiverQueueSize( + boolean multiTopicsSinglePartitionReceiverQueueSizeEnable); + + /** + * Sets single partition consumer's receiverQueueSize in multi-topics consumer. + * + *

Attention: This config only takes effect when + * {@link #enableMultiTopicsSinglePartitionReceiverQueueSize(boolean)} is set to true. + * + *

For partitioned-topic with n partitions, Pulsar creates n independent ConsumerImpl instances for each + * partition. For non-partitioned topic, Pulsar creates one ConsumerImpl instance. + * + *

This config just set each ConsumerImpl's receiverQueueSize to multiTopicsSingleConsumerReceiverQueueSize in + * multi-topics consumer. + * + *

For partitioned-topic, each partition consumer's receiverQueueSize is the min value of receiverQueueSize and + * (maxTotalReceiverQueueSizeAcrossPartitions() / numPartitions) + * + * @param multiTopicsSinglePartitionReceiverQueueSize + * the receiverQueueSize of single partition consumer in multi-topics consumer + * @return the consumer builder instance + */ + ConsumerBuilder multiTopicsSinglePartitionReceiverQueueSize(int multiTopicsSinglePartitionReceiverQueueSize); + /** * Sets amount of time for group consumer acknowledgments. * diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ReaderBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ReaderBuilder.java index ead36f13ea8ce..a090ba9a76402 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ReaderBuilder.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ReaderBuilder.java @@ -248,6 +248,29 @@ public interface ReaderBuilder extends Cloneable { */ ReaderBuilder receiverQueueSize(int receiverQueueSize); + /** + * Sets whether to enable {@link #multiTopicsSinglePartitionReceiverQueueSize(int)} config or not. + * + *

See also {@link ConsumerBuilder#enableMultiTopicsSinglePartitionReceiverQueueSize(boolean)} + * + * @param multiTopicsSinglePartitionReceiverQueueSizeEnable + * enable multiTopicsSinglePartitionReceiverQueueSize or not + * @return the reader builder instance + */ + ReaderBuilder enableMultiTopicsSinglePartitionReceiverQueueSize( + boolean multiTopicsSinglePartitionReceiverQueueSizeEnable); + + /** + * Sets single partition consumer's receiverQueueSize in multi-topics consumer. + * + *

See also {@link ConsumerBuilder#multiTopicsSinglePartitionReceiverQueueSize(int)} + * + * @param multiTopicsSinglePartitionReceiverQueueSize + * the receiverQueueSize of single partition consumer in multi-topics consumer + * @return the reader builder instance + */ + ReaderBuilder multiTopicsSinglePartitionReceiverQueueSize(int multiTopicsSinglePartitionReceiverQueueSize); + /** * Specify a reader name. * diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java index dc2363c279f7e..a0079d13fbbe0 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java @@ -377,6 +377,22 @@ public ConsumerBuilder receiverQueueSize(int receiverQueueSize) { return this; } + @Override + public ConsumerBuilder enableMultiTopicsSinglePartitionReceiverQueueSize( + boolean multiTopicsSinglePartitionReceiverQueueSizeEnable) { + conf.setMultiTopicsSinglePartitionReceiverQueueSizeEnable(multiTopicsSinglePartitionReceiverQueueSizeEnable); + return this; + } + + @Override + public ConsumerBuilder multiTopicsSinglePartitionReceiverQueueSize( + int multiTopicsSinglePartitionReceiverQueueSize) { + checkArgument(multiTopicsSinglePartitionReceiverQueueSize >= 0, + "multiTopicsSinglePartitionReceiverQueueSize needs to be >= 0"); + conf.setMultiTopicsSinglePartitionReceiverQueueSize(multiTopicsSinglePartitionReceiverQueueSize); + return this; + } + @Override public ConsumerBuilder acknowledgmentGroupTime(long delay, TimeUnit unit) { checkArgument(delay >= 0, "acknowledgmentGroupTime needs to be >= 0"); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index 1db16f38390ff..d987550e9e7a4 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -1110,7 +1110,7 @@ private void doSubscribeTopicPartitions(Schema schema, } allTopicPartitionsNumber.addAndGet(numPartitions); - int receiverQueueSize = Math.min(getSingleConsumerReceiverQueueSize(), + int receiverQueueSize = Math.min(getSinglePartitionReceiverQueueSize(), conf.getMaxTotalReceiverQueueSizeAcrossPartitions() / numPartitions); ConsumerConfigurationData configurationData = getInternalConsumerConfig(); configurationData.setReceiverQueueSize(receiverQueueSize); @@ -1172,7 +1172,7 @@ private void doSubscribeTopicPartitions(Schema schema, } else { internalConfig.setStartPaused(paused); ConsumerConfigurationData configurationData = getInternalConsumerConfig(); - configurationData.setReceiverQueueSize(getSingleConsumerReceiverQueueSize()); + configurationData.setReceiverQueueSize(getSinglePartitionReceiverQueueSize()); ConsumerImpl newConsumer = createInternalConsumer(internalConfig, topicName, -1, subscribeFuture, createIfDoesNotExist, schema); if (paused) { @@ -1219,7 +1219,7 @@ private ConsumerImpl createInternalConsumer(ConsumerConfigurationData conf int partitionIndex, CompletableFuture> subFuture, boolean createIfDoesNotExist, Schema schema) { BatchReceivePolicy internalBatchReceivePolicy = BatchReceivePolicy.builder() - .maxNumMessages(Math.max(getSingleConsumerReceiverQueueSize() / 2, 1)) + .maxNumMessages(Math.max(getSinglePartitionReceiverQueueSize() / 2, 1)) .maxNumBytes(-1) .timeout(1, TimeUnit.MILLISECONDS) .build(); @@ -1639,10 +1639,9 @@ private CompletableFuture> getExistsPartitions(String topic) { }); } - private int getSingleConsumerReceiverQueueSize() { - return conf.isMultiTopicsSingleConsumerReceiverQueueSizeEnabled() ? - conf.getMultiTopicsSingleConsumerReceiverQueueSize() : - conf.getReceiverQueueSize(); + private int getSinglePartitionReceiverQueueSize() { + return conf.isMultiTopicsSinglePartitionReceiverQueueSizeEnable() + ? conf.getMultiTopicsSinglePartitionReceiverQueueSize() : conf.getReceiverQueueSize(); } private ConsumerInterceptors getInternalConsumerInterceptors(ConsumerInterceptors multiTopicInterceptors) { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsReaderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsReaderImpl.java index 48fc7c18e5f65..2db1de3c312f4 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsReaderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsReaderImpl.java @@ -71,10 +71,10 @@ public MultiTopicsReaderImpl(PulsarClientImpl client, ReaderConfigurationData consumerConfiguration.setSubscriptionType(SubscriptionType.Exclusive); consumerConfiguration.setSubscriptionMode(SubscriptionMode.NonDurable); consumerConfiguration.setReceiverQueueSize(readerConfiguration.getReceiverQueueSize()); - consumerConfiguration.setMultiTopicsSingleConsumerReceiverQueueSizeEnabled( - readerConfiguration.isMultiTopicsSingleConsumerReceiverQueueSizeEnabled()); - consumerConfiguration.setMultiTopicsSingleConsumerReceiverQueueSize( - readerConfiguration.getMultiTopicsSingleConsumerReceiverQueueSize()); + consumerConfiguration.setMultiTopicsSinglePartitionReceiverQueueSizeEnable( + readerConfiguration.isMultiTopicsSinglePartitionReceiverQueueSizeEnable()); + consumerConfiguration.setMultiTopicsSinglePartitionReceiverQueueSize( + readerConfiguration.getMultiTopicsSinglePartitionReceiverQueueSize()); consumerConfiguration.setReadCompacted(readerConfiguration.isReadCompacted()); consumerConfiguration.setPoolMessages(readerConfiguration.isPoolMessages()); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java index ed1169ce4db9a..5d441768bff5c 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderBuilderImpl.java @@ -206,6 +206,22 @@ public ReaderBuilder receiverQueueSize(int receiverQueueSize) { return this; } + @Override + public ReaderBuilder enableMultiTopicsSinglePartitionReceiverQueueSize( + boolean multiTopicsSinglePartitionReceiverQueueSizeEnable) { + conf.setMultiTopicsSinglePartitionReceiverQueueSizeEnable(multiTopicsSinglePartitionReceiverQueueSizeEnable); + return this; + } + + @Override + public ReaderBuilder multiTopicsSinglePartitionReceiverQueueSize( + int multiTopicsSinglePartitionReceiverQueueSize) { + checkArgument(multiTopicsSinglePartitionReceiverQueueSize >= 0, + "multiTopicsSinglePartitionReceiverQueueSize needs to be >= 0"); + conf.setMultiTopicsSinglePartitionReceiverQueueSize(multiTopicsSinglePartitionReceiverQueueSize); + return this; + } + @Override public ReaderBuilder readerName(String readerName) { conf.setReaderName(readerName); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java index 1942f4c2ecfaa..df9656cc841c1 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java @@ -133,29 +133,28 @@ public class ConsumerConfigurationData implements Serializable, Cloneable { private int receiverQueueSize = 1000; @ApiModelProperty( - name = "multiTopicsSingleConsumerReceiverQueueSizeEnabled", - value = "Determine whether multiTopicsSingleConsumerReceiverQueueSize is effective.\n" + name = "multiTopicsSinglePartitionReceiverQueueSizeEnable", + value = "Determine whether multiTopicsSinglePartitionReceiverQueueSize is effective.\n" + "\n" + "For backward compatibility, the default value is set to false." ) - private boolean multiTopicsSingleConsumerReceiverQueueSizeEnabled = false; + private boolean multiTopicsSinglePartitionReceiverQueueSizeEnable = false; @ApiModelProperty( - name = "multiTopicsSingleConsumerReceiverQueueSize", - value = "Size of a single consumer's receiver queue in multi-topics consumer.\n" - + "\n" + name = "multiTopicsSinglePartitionReceiverQueueSize", + value = "Size of a single partition consumer's receiver queue in multi-topics consumer.\n" + "\n" + "In MultiTopicsConsumerImpl, if you have a partitioned-topic with 10 partitions, set this value" - + " to 200, then each non-partitioned topic consumer's receiverQueueSize 200, and " + + " to 200, then each partition consumer's receiverQueueSize is 200, and " + "MultiTopicsConsumerImpl's receiverQueueSize is 1000, so the max messages in memory is 1000 + " + "200 * 10 = 3000.\n" + "\n" - + "In PatternMultiTopicsConsumerImpl, if you subscribe to a regex pattern that matches 1000 " - + "non-partitioned topics, set this to 50, then each non-partitioned topic consumer's " - + "receiverQueueSize 200, and PatternMultiTopicsConsumerImpl's receiverQueueSize is 1000, so the " - + "max messages in memory is 1000 + 50 * 200 = 11000.\n" + + "In PatternMultiTopicsConsumerImpl, if you subscribe to a regex pattern that matches 600 topics" + + " with 1000 partitions, set this to 50, then each partition consumer's receiverQueueSize is " + + "200, and PatternMultiTopicsConsumerImpl's receiverQueueSize is 1000, so the max messages in " + + "memory is 1000 + 50 * 200 = 11000.\n" + "\n" ) - private int multiTopicsSingleConsumerReceiverQueueSize = 1000; + private int multiTopicsSinglePartitionReceiverQueueSize = 1000; @ApiModelProperty( name = "acknowledgementsGroupTimeMicros", diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ReaderConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ReaderConfigurationData.java index df49fa9e8247c..727e7f7aed629 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ReaderConfigurationData.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ReaderConfigurationData.java @@ -69,29 +69,28 @@ public class ReaderConfigurationData implements Serializable, Cloneable { private int receiverQueueSize = 1000; @ApiModelProperty( - name = "multiTopicsSingleConsumerReceiverQueueSizeEnabled", - value = "Determine whether multiTopicsSingleConsumerReceiverQueueSize is effective.\n" + name = "multiTopicsSinglePartitionReceiverQueueSizeEnable", + value = "Determine whether multiTopicsSinglePartitionReceiverQueueSize is effective.\n" + "\n" + "For backward compatibility, the default value is set to false." ) - private boolean multiTopicsSingleConsumerReceiverQueueSizeEnabled = false; + private boolean multiTopicsSinglePartitionReceiverQueueSizeEnable = false; @ApiModelProperty( - name = "multiTopicsSingleConsumerReceiverQueueSize", - value = "Size of a single consumer's receiver queue in multi-topics consumer.\n" - + "\n" + name = "multiTopicsSinglePartitionReceiverQueueSize", + value = "Size of a single partition consumer's receiver queue in multi-topics consumer.\n" + "\n" + "In MultiTopicsConsumerImpl, if you have a partitioned-topic with 10 partitions, set this value" - + " to 200, then each non-partitioned topic consumer's receiverQueueSize 200, and " + + " to 200, then each partition consumer's receiverQueueSize is 200, and " + "MultiTopicsConsumerImpl's receiverQueueSize is 1000, so the max messages in memory is 1000 + " + "200 * 10 = 3000.\n" + "\n" - + "In PatternMultiTopicsConsumerImpl, if you subscribe to a regex pattern that matches 1000 " - + "non-partitioned topics, set this to 50, then each non-partitioned topic consumer's " - + "receiverQueueSize 200, and PatternMultiTopicsConsumerImpl's receiverQueueSize is 1000, so the " - + "max messages in memory is 1000 + 50 * 200 = 11000.\n" + + "In PatternMultiTopicsConsumerImpl, if you subscribe to a regex pattern that matches 600 topics" + + " with 1000 partitions, set this to 50, then each partition consumer's receiverQueueSize is " + + "200, and PatternMultiTopicsConsumerImpl's receiverQueueSize is 1000, so the max messages in " + + "memory is 1000 + 50 * 200 = 11000.\n" + "\n" ) - private int multiTopicsSingleConsumerReceiverQueueSize = 1000; + private int multiTopicsSinglePartitionReceiverQueueSize = 1000; @ApiModelProperty( name = "readerListener", From bacc6b86affe39897dd643a4dc694cabd7f7f802 Mon Sep 17 00:00:00 2001 From: oneby-wang Date: Fri, 17 Oct 2025 17:00:47 +0800 Subject: [PATCH 4/5] [improve][client] Handle partitioned-topic onTopicsExtended event --- .../apache/pulsar/client/impl/MultiTopicsConsumerImpl.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index d987550e9e7a4..d02bbd843ac20 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -1450,6 +1450,10 @@ private CompletableFuture subscribeIncreasedTopicPartitions(String topicNa CompletableFuture> subFuture = new CompletableFuture<>(); ConsumerConfigurationData configurationData = getInternalConsumerConfig(); configurationData.setStartPaused(paused); + // It is kind of difficult to get previous configured receiverQueueSize. + // So we just use getSinglePartitionReceiverQueueSize() to set new partition consumer's + // receiverQueueSize, it is developer's duty to set this to a reasonable value + configurationData.setReceiverQueueSize(getSinglePartitionReceiverQueueSize()); ConsumerImpl newConsumer = createInternalConsumer(configurationData, partitionName, partitionIndex, subFuture, true, schema); synchronized (pauseMutex) { @@ -1639,7 +1643,7 @@ private CompletableFuture> getExistsPartitions(String topic) { }); } - private int getSinglePartitionReceiverQueueSize() { + protected int getSinglePartitionReceiverQueueSize() { return conf.isMultiTopicsSinglePartitionReceiverQueueSizeEnable() ? conf.getMultiTopicsSinglePartitionReceiverQueueSize() : conf.getReceiverQueueSize(); } From b567962148c3fe82f9b4621f50d959cbe5160b5a Mon Sep 17 00:00:00 2001 From: oneby-wang Date: Fri, 17 Oct 2025 21:38:15 +0800 Subject: [PATCH 5/5] [improve][client] Modify ConsumerBuilder java doc --- .../main/java/org/apache/pulsar/client/api/ConsumerBuilder.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java index e392bb2e64031..e16adaee4a160 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java @@ -454,7 +454,7 @@ ConsumerBuilder enableMultiTopicsSinglePartitionReceiverQueueSize( * multi-topics consumer. * *

For partitioned-topic, each partition consumer's receiverQueueSize is the min value of receiverQueueSize and - * (maxTotalReceiverQueueSizeAcrossPartitions() / numPartitions) + * (maxTotalReceiverQueueSizeAcrossPartitions / numPartitions) * * @param multiTopicsSinglePartitionReceiverQueueSize * the receiverQueueSize of single partition consumer in multi-topics consumer