From 6235c2ed0d91fe293dea9b17af9b0395799918a6 Mon Sep 17 00:00:00 2001 From: kris20030907 <3185633428@qq.com> Date: Tue, 14 Oct 2025 22:57:12 +0800 Subject: [PATCH] feat: add push consumer configs --- .../annotation/RocketMQMessageListener.java | 22 +++++++++ .../DefaultRocketMQListenerContainer.java | 46 +++++++++++++++++++ .../DefaultRocketMQListenerContainerTest.java | 5 ++ 3 files changed, 73 insertions(+) diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java index 0c1638c7..eab700f9 100644 --- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java +++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/annotation/RocketMQMessageListener.java @@ -18,6 +18,8 @@ package org.apache.rocketmq.spring.annotation; import org.apache.rocketmq.client.impl.consumer.ConsumeMessageService; +import org.apache.rocketmq.common.consumer.ConsumeFromWhere; + import java.lang.annotation.Documented; import java.lang.annotation.ElementType; import java.lang.annotation.Retention; @@ -173,4 +175,24 @@ * The property of "instanceName". */ String instanceName() default "DEFAULT"; + + /** + * Message pull Interval. + */ + long pullInterval() default 0; + + /** + * Batch pull size. + */ + int pullBatchSize() default 32; + + /** + * Batch consumption size. + */ + int consumeMessageBatchMaxSize() default 1; + + /** + * Consuming point on consumer booting. + */ + ConsumeFromWhere consumeFromWhere() default ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET; } diff --git a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java index c3bf81b2..dcd6bbd1 100644 --- a/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java +++ b/rocketmq-spring-boot/src/main/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainer.java @@ -39,6 +39,7 @@ import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.client.producer.SendStatus; import org.apache.rocketmq.client.utils.MessageUtil; +import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.remoting.RPCHook; import org.apache.rocketmq.remoting.exception.RemotingException; @@ -137,6 +138,10 @@ public class DefaultRocketMQListenerContainer implements InitializingBean, private String namespace; private String namespaceV2; private long awaitTerminationMillisWhenShutdown; + private long pullInterval; + private int pullBatchSize; + private int consumeMessageBatchMaxSize; + private ConsumeFromWhere consumeFromWhere; private String instanceName; @@ -252,6 +257,10 @@ public void setRocketMQMessageListener(RocketMQMessageListener anno) { this.suspendCurrentQueueTimeMillis = anno.suspendCurrentQueueTimeMillis(); this.awaitTerminationMillisWhenShutdown = Math.max(0, anno.awaitTerminationMillisWhenShutdown()); this.instanceName = anno.instanceName(); + this.pullInterval = anno.pullInterval(); + this.pullBatchSize = anno.pullBatchSize(); + this.consumeMessageBatchMaxSize = anno.consumeMessageBatchMaxSize(); + this.consumeFromWhere = anno.consumeFromWhere(); } public ConsumeMode getConsumeMode() { @@ -318,6 +327,38 @@ public void setInstanceName(String instanceName) { this.instanceName = instanceName; } + public ConsumeFromWhere getConsumeFromWhere() { + return consumeFromWhere; + } + + public void setConsumeFromWhere(ConsumeFromWhere consumeFromWhere) { + this.consumeFromWhere = consumeFromWhere; + } + + public long getPullInterval() { + return pullInterval; + } + + public void setPullInterval(long pullInterval) { + this.pullInterval = pullInterval; + } + + public int getPullBatchSize() { + return pullBatchSize; + } + + public void setPullBatchSize(int pullBatchSize) { + this.pullBatchSize = pullBatchSize; + } + + public int getConsumeMessageBatchMaxSize() { + return consumeMessageBatchMaxSize; + } + + public void setConsumeMessageBatchMaxSize(int consumeMessageBatchMaxSize) { + this.consumeMessageBatchMaxSize = consumeMessageBatchMaxSize; + } + public DefaultRocketMQListenerContainer setAwaitTerminationMillisWhenShutdown(long awaitTerminationMillisWhenShutdown) { this.awaitTerminationMillisWhenShutdown = awaitTerminationMillisWhenShutdown; return this; @@ -662,6 +703,11 @@ private void initRocketMQPushConsumer() throws MQClientException { consumer.setMaxReconsumeTimes(maxReconsumeTimes); consumer.setAwaitTerminationMillisWhenShutdown(awaitTerminationMillisWhenShutdown); consumer.setInstanceName(instanceName); + consumer.setPullInterval(pullInterval); + consumer.setPullBatchSize(pullBatchSize); + consumer.setConsumeMessageBatchMaxSize(consumeMessageBatchMaxSize); + consumer.setConsumeFromWhere(consumeFromWhere); + switch (messageModel) { case BROADCASTING: consumer.setMessageModel(org.apache.rocketmq.remoting.protocol.heartbeat.MessageModel.BROADCASTING); diff --git a/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainerTest.java b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainerTest.java index 182d2fa3..be1a0b88 100644 --- a/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainerTest.java +++ b/rocketmq-spring-boot/src/test/java/org/apache/rocketmq/spring/support/DefaultRocketMQListenerContainerTest.java @@ -255,6 +255,11 @@ public void testSetRocketMQMessageListener() { assertEquals(anno.delayLevelWhenNextConsume(), container.getDelayLevelWhenNextConsume()); assertEquals(anno.suspendCurrentQueueTimeMillis(), container.getSuspendCurrentQueueTimeMillis()); assertEquals(anno.instanceName(), container.getInstanceName()); + assertEquals(anno.pullInterval(), container.getPullInterval()); + assertEquals(anno.pullBatchSize(), container.getPullBatchSize()); + assertEquals(anno.consumeMessageBatchMaxSize(), container.getConsumeMessageBatchMaxSize()); + assertEquals(anno.awaitTerminationMillisWhenShutdown(), container.getAwaitTerminationMillisWhenShutdown()); + assertEquals(anno.consumeFromWhere(), container.getConsumeFromWhere()); } @RocketMQMessageListener(consumerGroup = "abc1", topic = "test",