Skip to content
Open
10 changes: 10 additions & 0 deletions apisix/plugins/kafka-logger.lua
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,15 @@ local schema = {
producer_max_buffering = {type = "integer", minimum = 1, default = 50000},
producer_time_linger = {type = "integer", minimum = 1, default = 1},
meta_refresh_interval = {type = "integer", minimum = 1, default = 30},
-- Kafka Produce API version. Default 0 for backward compatibility.
-- Kafka 4.x drops support for magic0 and magic1. Use 2 for Kafka 4.x.
api_version = {
type = "integer",
default = 0,
minimum = 0,
maximum = 2,
description = "Produce API version. Use 2 for Kafka 4.x compatibility.",
},
},
oneOf = {
{ required = {"broker_list", "kafka_topic"},},
Expand Down Expand Up @@ -285,6 +294,7 @@ function _M.log(conf, ctx)
broker_config["request_timeout"] = conf.timeout * 1000
broker_config["producer_type"] = conf.producer_type
broker_config["required_acks"] = conf.required_acks
broker_config["api_version"] = conf.api_version
broker_config["batch_num"] = conf.producer_batch_num
broker_config["batch_size"] = conf.producer_batch_size
broker_config["max_buffering"] = conf.producer_max_buffering
Expand Down
9 changes: 9 additions & 0 deletions ci/init-plugin-test-service.sh
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,15 @@ after() {
# configure clickhouse
echo 'CREATE TABLE default.test (`host` String, `client_ip` String, `route_id` String, `service_id` String, `@timestamp` String, PRIMARY KEY(`@timestamp`)) ENGINE = MergeTree()' | curl 'http://localhost:8123/' --data-binary @-
echo 'CREATE TABLE default.test (`host` String, `client_ip` String, `route_id` String, `service_id` String, `@timestamp` String, PRIMARY KEY(`@timestamp`)) ENGINE = MergeTree()' | curl 'http://localhost:8124/' --data-binary @-

# Kafka 4.x topic for api_version=2 verification (uses bootstrap-server, not zookeeper)
# Placed at the end so failures don't block other service initialization
for i in {1..10}; do
sleep 3
docker exec -i apache-apisix-kafka-server4-kafka4-1 /opt/bitnami/kafka/bin/kafka-topics.sh \
--create --topic test-kafka4 --bootstrap-server localhost:9092 \
--partitions 1 --replication-factor 1 2>/dev/null && break || true
done
}

before() {
Expand Down
25 changes: 25 additions & 0 deletions ci/pod/docker-compose.plugin.yml
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,31 @@ services:
volumes:
- ./ci/pod/kafka/kafka-server/kafka_scram_jaas.conf:/opt/bitnami/kafka/config/kafka_jaas.conf:ro

## Kafka 4.x for api_version=2 verification (KRaft mode, same family as kafka-server1/2/3)
kafka-server4-kafka4:
image: bitnamilegacy/kafka:4.0.0
hostname: kafka-server4-kafka4
restart: unless-stopped
ports:
- "39092:9092"
environment:
KAFKA_ENABLE_KRAFT: "yes"
KAFKA_KRAFT_CLUSTER_ID: "LelM2dIFQkiUFvXCEcqRWA"
ALLOW_PLAINTEXT_LISTENER: "yes"
KAFKA_CFG_NODE_ID: 0
KAFKA_CFG_PROCESS_ROLES: controller,broker
KAFKA_CFG_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093
KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://127.0.0.1:39092
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_CFG_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 0@kafka-server4-kafka4:9093
KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR: 1
networks:
kafka_net:

## SkyWalking
skywalking:
image: apache/skywalking-oap-server:8.7.0-es6
Expand Down
1 change: 1 addition & 0 deletions docs/en/latest/plugins/kafka-logger.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ It might take some time to receive the log data. It will be automatically sent a
| producer_max_buffering | integer | optional | 50000 | [1,...] | `max_buffering` parameter in [lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka) representing maximum buffer size. Unit is message count. |
| producer_time_linger | integer | optional | 1 | [1,...] | `flush_time` parameter in [lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka) in seconds. |
| meta_refresh_interval | integer | optional | 30 | [1,...] | `refresh_interval` parameter in [lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka) specifies the time to auto refresh the metadata, in seconds. |
| api_version | integer | optional | 0 | [0, 1, 2] | Produce API version in [lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka). Default `0` for backward compatibility. Use `2` for Apache Kafka 4.x (Kafka 4.x drops support for magic0 and magic1). |

This Plugin supports using batch processors to aggregate and process entries (logs/data) in a batch. This avoids the need for frequently submitting the data. The batch processor submits data every `5` seconds or when the data in the queue reaches `1000`. See [Batch Processor](../batch-processor.md#configuration) for more information or setting your custom configuration.

Expand Down
1 change: 1 addition & 0 deletions docs/zh/latest/plugins/kafka-logger.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ description: API 网关 Apache APISIX 的 kafka-logger 插件用于将日志作
| producer_max_buffering | integer | 否 | 50000 | [1,...] | 对应 [lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka) 中的 `max_buffering` 参数,表示最大缓冲区,单位为条。 |
| producer_time_linger | integer | 否 | 1 | [1,...] | 对应 [lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka) 中的 `flush_time` 参数,单位为秒。|
| meta_refresh_interval | integer | 否 | 30 | [1,...] | 对应 [lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka) 中的 `refresh_interval` 参数,用于指定自动刷新 metadata 的间隔时长,单位为秒。 |
| api_version | integer | 否 | 0 | [0, 1, 2] | [lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka) 中的 Produce API 版本。默认 `0` 以保持向后兼容。使用 Apache Kafka 4.x 时需设置为 `2`(Kafka 4.x 弃用了 magic0 和 magic1)。 |

该插件支持使用批处理器来聚合并批量处理条目(日志/数据)。这样可以避免插件频繁地提交数据,默认设置情况下批处理器会每 `5` 秒钟或队列中的数据达到 `1000` 条时提交数据,如需了解批处理器相关参数设置,请参考 [Batch-Processor](../batch-processor.md#配置) 配置部分。

Expand Down
Loading