message filter for Apache Pulsar, both support server-side and client-side.
pulsar-msg-filter-plugin 是一个基于PIP 105: Support pluggable entry filter in Dispatcher 为 Apache Pulsar 实现的 服务端 消息过滤插件。
pulsar-msg-filter-interceptor 是一个基于 Pulsar ConsumerInterceptor 实现的 客户端 消息过滤拦截器。
- 高性能、小巧
 - 支持常见条件表达式,几乎满足各种业务过滤场景
 
[server-side] pulsar-msg-filter-plugin 使用说明
- 
下载pulsar-msg-filter-plugin-VERSION.nar插件并保存至指定目录,如/app/conf/plugin
 - 
修改pulsar broker.conf配置(version >= 2.10),插件名称
pulsar-msg-filter# Class name of Pluggable entry filter that can decide whether the entry needs to be filtered # You can use this class to decide which entries can be sent to consumers. # Multiple classes need to be separated by commas. entryFilterNames=pulsar-msg-filter # The directory for all the entry filter implementations entryFiltersDirectory=/app/plugin # Location of unpacked NAR file narExtractionDirectory=/app/nar
 - 
重启broker,查看日志,如果看到如下日志:
Successfully loaded entry filter for name`pulsar-msg-filter`则说明配置成功
 - 
验证(option)
- 
发送方构建Producer实例时关闭
batch操作 .enableBatching(false)Producer<String> producer = client.newProducer(Schema.STRING) .topic("test-topic-1") .enableBatching(false) .create(); producer.newMessage() .property("k1","7") .property("k2", "vvvv") .property("k3", "true") .value("hi, this msg from `pulsar-msg-filter-plugin`") .send();
 - 
消费方使用admin配置订阅消费组过滤表达式,其key固定为 pulsar-msg-filter-expression
pulsar-admin topics update-subscription-properties --property --property pulsar-msg-filter-expression="double(k1)<6 || (k2=='vvvv' && k3=='true')" --subscription 订阅组名称 主题 pulsar-admin topics get-subscription-properties --subscription 订阅组名称 主题Consumer consumer = client.newConsumer() .subscriptionName("订阅组名称") .topic("主题") .subscribe();
 
 - 
 
[client-side] pulsar-msg-filter-interceptor 使用说明
- 
添加 pulsar-msg-filter-interceptor 依赖
<dependency> <groupId>io.github.yangl</groupId> <artifactId>pulsar-msg-filter-interceptor</artifactId> <version>VERSION</version> </dependency>
 - 
创建Consumer实例时配置 MsgFilterConsumerInterceptor 过滤器
Consumer<String> consumer = client.newConsumer(Schema.STRING) .subscriptionName("订阅组名称") .topic("主题") .intercept(MsgFilterConsumerInterceptor.<String>builder().build()) .subscribe();
.intercept(MsgFilterConsumerInterceptor.<String>builder().webServiceUrl(YOUR_HTTP_SERVICE_URL).build())
 
注意事项
- 由于pulsar message header的key&value全部为
String类型,在使用表达式的时候注意将其类型转换至目标类型 - AviatorScript的
false判断个人建议直接使用字符串的==true/false比较,AviatorScript只有nil false为false,其他全部为true - 过滤引擎使用AviatorScript (感谢晓丹),其内置函数详见其 函数库列表
 
pulsar-msg-filter is licensed under the AGPLv3 License.
