From 2e66e70ae9fcc67bd005f8355522d13f19f16c64 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=AE=B5=E6=99=93=E9=9B=84?= Date: Fri, 27 Feb 2026 11:51:00 +0800 Subject: [PATCH 01/12] feat(kafka-logger): add configurable api_version for Apache Kafka 4.x support - Set default api_version to 2 (Kafka 4.x drops magic0/magic1) - Make api_version configurable (0, 1, 2) for Kafka < 0.10.0.0 compatibility - Add api_version to broker_config when creating producer fix #12984 Made-with: Cursor --- CHANGELOG.md | 1 + apisix/plugins/kafka-logger.lua | 10 ++++ docs/en/latest/plugins/kafka-logger.md | 1 + docs/zh/latest/plugins/kafka-logger.md | 1 + t/plugin/kafka-logger.t | 66 ++++++++++++++++++++++++++ 5 files changed, 79 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 21f8e0256783..05b28ec4d2f3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -121,6 +121,7 @@ title: Changelog - feat(file-logger): add path properties to file-logger plugin metadata [#12825](https://github.com/apache/apisix/pull/12825) - feat(log): add nested log format support for logger plugins [#12697](https://github.com/apache/apisix/pull/12697) - feat: add max pending entries to all logger plugins [#12709](https://github.com/apache/apisix/pull/12709) +- feat(kafka-logger): add support for Apache Kafka 4.x with configurable `api_version` [#12984](https://github.com/apache/apisix/issues/12984) - feat(kafka-logger): add support for scram for authentication [#12693](https://github.com/apache/apisix/pull/12693) - fix(limit-conn): implement configurable redis key expiry [#12872](https://github.com/apache/apisix/pull/12872) - fix(skywalking): start timer when route is hit [#12855](https://github.com/apache/apisix/pull/12855) diff --git a/apisix/plugins/kafka-logger.lua b/apisix/plugins/kafka-logger.lua index ec4f694f61ec..18185167542c 100644 --- a/apisix/plugins/kafka-logger.lua +++ b/apisix/plugins/kafka-logger.lua @@ -129,6 +129,15 @@ local schema = { producer_max_buffering = {type = "integer", minimum = 1, default = 50000}, producer_time_linger = {type = "integer", minimum = 1, default = 1}, meta_refresh_interval = {type = "integer", minimum = 1, default = 30}, + -- Kafka Produce API version. Default 2 for Kafka 4.x compatibility. + -- Kafka 4.x drops support for magic0 and magic1. Use 0 for Kafka < 0.10.0.0. + api_version = { + type = "integer", + default = 2, + minimum = 0, + maximum = 2, + description = "Produce API version in lua-resty-kafka. Default 2 for Kafka 4.x compatibility.", + }, }, oneOf = { { required = {"broker_list", "kafka_topic"},}, @@ -285,6 +294,7 @@ function _M.log(conf, ctx) broker_config["request_timeout"] = conf.timeout * 1000 broker_config["producer_type"] = conf.producer_type broker_config["required_acks"] = conf.required_acks + broker_config["api_version"] = conf.api_version or 2 broker_config["batch_num"] = conf.producer_batch_num broker_config["batch_size"] = conf.producer_batch_size broker_config["max_buffering"] = conf.producer_max_buffering diff --git a/docs/en/latest/plugins/kafka-logger.md b/docs/en/latest/plugins/kafka-logger.md index eeaf3f2f0a6e..3d03abeabc83 100644 --- a/docs/en/latest/plugins/kafka-logger.md +++ b/docs/en/latest/plugins/kafka-logger.md @@ -65,6 +65,7 @@ It might take some time to receive the log data. It will be automatically sent a | producer_max_buffering | integer | optional | 50000 | [1,...] | `max_buffering` parameter in [lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka) representing maximum buffer size. Unit is message count. | | producer_time_linger | integer | optional | 1 | [1,...] | `flush_time` parameter in [lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka) in seconds. | | meta_refresh_interval | integer | optional | 30 | [1,...] | `refresh_interval` parameter in [lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka) specifies the time to auto refresh the metadata, in seconds. | +| api_version | integer | optional | 2 | [0, 1, 2] | Produce API version in [lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka). Default `2` for Apache Kafka 4.x compatibility (Kafka 4.x drops support for magic0 and magic1). Use `0` for Kafka < 0.10.0.0. | This Plugin supports using batch processors to aggregate and process entries (logs/data) in a batch. This avoids the need for frequently submitting the data. The batch processor submits data every `5` seconds or when the data in the queue reaches `1000`. See [Batch Processor](../batch-processor.md#configuration) for more information or setting your custom configuration. diff --git a/docs/zh/latest/plugins/kafka-logger.md b/docs/zh/latest/plugins/kafka-logger.md index bd606f180340..1b7bce2a91bc 100644 --- a/docs/zh/latest/plugins/kafka-logger.md +++ b/docs/zh/latest/plugins/kafka-logger.md @@ -63,6 +63,7 @@ description: API 网关 Apache APISIX 的 kafka-logger 插件用于将日志作 | producer_max_buffering | integer | 否 | 50000 | [1,...] | 对应 [lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka) 中的 `max_buffering` 参数,表示最大缓冲区,单位为条。 | | producer_time_linger | integer | 否 | 1 | [1,...] | 对应 [lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka) 中的 `flush_time` 参数,单位为秒。| | meta_refresh_interval | integer | 否 | 30 | [1,...] | 对应 [lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka) 中的 `refresh_interval` 参数,用于指定自动刷新 metadata 的间隔时长,单位为秒。 | +| api_version | integer | 否 | 2 | [0, 1, 2] | [lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka) 中的 Produce API 版本。默认 `2` 以兼容 Apache Kafka 4.x(Kafka 4.x 弃用了 magic0 和 magic1)。Kafka < 0.10.0.0 时使用 `0`。 | 该插件支持使用批处理器来聚合并批量处理条目(日志/数据)。这样可以避免插件频繁地提交数据,默认设置情况下批处理器会每 `5` 秒钟或队列中的数据达到 `1000` 条时提交数据,如需了解批处理器相关参数设置,请参考 [Batch-Processor](../batch-processor.md#配置) 配置部分。 diff --git a/t/plugin/kafka-logger.t b/t/plugin/kafka-logger.t index 90003e20c8d0..8c3dede39cd0 100644 --- a/t/plugin/kafka-logger.t +++ b/t/plugin/kafka-logger.t @@ -98,6 +98,72 @@ done +=== TEST 3.1: api_version configurable (default is 2) +--- config + location /t { + content_by_lua_block { + local plugin = require("apisix.plugins.kafka-logger") + local ok, err = plugin.check_schema({ + kafka_topic = "test", + brokers = {{host = "127.0.0.1", port = 9092}}, + api_version = 2, + }) + if not ok then + ngx.say("err: ", err) + return + end + ngx.say("done") + } + } +--- response_body +done + + + +=== TEST 3.2: api_version 0 for Kafka < 0.10.0.0 +--- config + location /t { + content_by_lua_block { + local plugin = require("apisix.plugins.kafka-logger") + local ok, err = plugin.check_schema({ + kafka_topic = "test", + brokers = {{host = "127.0.0.1", port = 9092}}, + api_version = 0, + }) + if not ok then + ngx.say("err: ", err) + return + end + ngx.say("done") + } + } +--- response_body +done + + + +=== TEST 3.3: invalid api_version +--- config + location /t { + content_by_lua_block { + local plugin = require("apisix.plugins.kafka-logger") + local ok, err = plugin.check_schema({ + kafka_topic = "test", + brokers = {{host = "127.0.0.1", port = 9092}}, + api_version = 3, + }) + if not ok then + ngx.say("err: ", err) + return + end + ngx.say("done") + } + } +--- response_body +err: property "api_version" validation failed: expected 3 to be at most 2 + + + === TEST 4: set route(id: 1) --- config location /t { From 9e3214e9d75402253a104c8830a32ab2c33b0562 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=AE=B5=E6=99=93=E9=9B=84?= Date: Fri, 27 Feb 2026 16:03:49 +0800 Subject: [PATCH 02/12] refactor(kafka-logger): address PR review feedback - Revert CHANGELOG.md (maintainers update it during release) - Set api_version default to 0 for backward compatibility - Use conf.api_version directly without fallback Made-with: Cursor --- CHANGELOG.md | 1 - apisix/plugins/kafka-logger.lua | 10 +++++----- docs/en/latest/plugins/kafka-logger.md | 2 +- docs/zh/latest/plugins/kafka-logger.md | 2 +- t/plugin/kafka-logger.t | 2 +- 5 files changed, 8 insertions(+), 9 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 05b28ec4d2f3..21f8e0256783 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -121,7 +121,6 @@ title: Changelog - feat(file-logger): add path properties to file-logger plugin metadata [#12825](https://github.com/apache/apisix/pull/12825) - feat(log): add nested log format support for logger plugins [#12697](https://github.com/apache/apisix/pull/12697) - feat: add max pending entries to all logger plugins [#12709](https://github.com/apache/apisix/pull/12709) -- feat(kafka-logger): add support for Apache Kafka 4.x with configurable `api_version` [#12984](https://github.com/apache/apisix/issues/12984) - feat(kafka-logger): add support for scram for authentication [#12693](https://github.com/apache/apisix/pull/12693) - fix(limit-conn): implement configurable redis key expiry [#12872](https://github.com/apache/apisix/pull/12872) - fix(skywalking): start timer when route is hit [#12855](https://github.com/apache/apisix/pull/12855) diff --git a/apisix/plugins/kafka-logger.lua b/apisix/plugins/kafka-logger.lua index 18185167542c..888870604507 100644 --- a/apisix/plugins/kafka-logger.lua +++ b/apisix/plugins/kafka-logger.lua @@ -129,14 +129,14 @@ local schema = { producer_max_buffering = {type = "integer", minimum = 1, default = 50000}, producer_time_linger = {type = "integer", minimum = 1, default = 1}, meta_refresh_interval = {type = "integer", minimum = 1, default = 30}, - -- Kafka Produce API version. Default 2 for Kafka 4.x compatibility. - -- Kafka 4.x drops support for magic0 and magic1. Use 0 for Kafka < 0.10.0.0. + -- Kafka Produce API version. Default 0 for backward compatibility. + -- Kafka 4.x drops support for magic0 and magic1. Use 2 for Kafka 4.x. api_version = { type = "integer", - default = 2, + default = 0, minimum = 0, maximum = 2, - description = "Produce API version in lua-resty-kafka. Default 2 for Kafka 4.x compatibility.", + description = "Produce API version in lua-resty-kafka. Use 2 for Kafka 4.x compatibility.", }, }, oneOf = { @@ -294,7 +294,7 @@ function _M.log(conf, ctx) broker_config["request_timeout"] = conf.timeout * 1000 broker_config["producer_type"] = conf.producer_type broker_config["required_acks"] = conf.required_acks - broker_config["api_version"] = conf.api_version or 2 + broker_config["api_version"] = conf.api_version broker_config["batch_num"] = conf.producer_batch_num broker_config["batch_size"] = conf.producer_batch_size broker_config["max_buffering"] = conf.producer_max_buffering diff --git a/docs/en/latest/plugins/kafka-logger.md b/docs/en/latest/plugins/kafka-logger.md index 3d03abeabc83..20e11850d7d5 100644 --- a/docs/en/latest/plugins/kafka-logger.md +++ b/docs/en/latest/plugins/kafka-logger.md @@ -65,7 +65,7 @@ It might take some time to receive the log data. It will be automatically sent a | producer_max_buffering | integer | optional | 50000 | [1,...] | `max_buffering` parameter in [lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka) representing maximum buffer size. Unit is message count. | | producer_time_linger | integer | optional | 1 | [1,...] | `flush_time` parameter in [lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka) in seconds. | | meta_refresh_interval | integer | optional | 30 | [1,...] | `refresh_interval` parameter in [lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka) specifies the time to auto refresh the metadata, in seconds. | -| api_version | integer | optional | 2 | [0, 1, 2] | Produce API version in [lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka). Default `2` for Apache Kafka 4.x compatibility (Kafka 4.x drops support for magic0 and magic1). Use `0` for Kafka < 0.10.0.0. | +| api_version | integer | optional | 0 | [0, 1, 2] | Produce API version in [lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka). Default `0` for backward compatibility. Use `2` for Apache Kafka 4.x (Kafka 4.x drops support for magic0 and magic1). | This Plugin supports using batch processors to aggregate and process entries (logs/data) in a batch. This avoids the need for frequently submitting the data. The batch processor submits data every `5` seconds or when the data in the queue reaches `1000`. See [Batch Processor](../batch-processor.md#configuration) for more information or setting your custom configuration. diff --git a/docs/zh/latest/plugins/kafka-logger.md b/docs/zh/latest/plugins/kafka-logger.md index 1b7bce2a91bc..89dc33202c17 100644 --- a/docs/zh/latest/plugins/kafka-logger.md +++ b/docs/zh/latest/plugins/kafka-logger.md @@ -63,7 +63,7 @@ description: API 网关 Apache APISIX 的 kafka-logger 插件用于将日志作 | producer_max_buffering | integer | 否 | 50000 | [1,...] | 对应 [lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka) 中的 `max_buffering` 参数,表示最大缓冲区,单位为条。 | | producer_time_linger | integer | 否 | 1 | [1,...] | 对应 [lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka) 中的 `flush_time` 参数,单位为秒。| | meta_refresh_interval | integer | 否 | 30 | [1,...] | 对应 [lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka) 中的 `refresh_interval` 参数,用于指定自动刷新 metadata 的间隔时长,单位为秒。 | -| api_version | integer | 否 | 2 | [0, 1, 2] | [lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka) 中的 Produce API 版本。默认 `2` 以兼容 Apache Kafka 4.x(Kafka 4.x 弃用了 magic0 和 magic1)。Kafka < 0.10.0.0 时使用 `0`。 | +| api_version | integer | 否 | 0 | [0, 1, 2] | [lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka) 中的 Produce API 版本。默认 `0` 以保持向后兼容。使用 Apache Kafka 4.x 时需设置为 `2`(Kafka 4.x 弃用了 magic0 和 magic1)。 | 该插件支持使用批处理器来聚合并批量处理条目(日志/数据)。这样可以避免插件频繁地提交数据,默认设置情况下批处理器会每 `5` 秒钟或队列中的数据达到 `1000` 条时提交数据,如需了解批处理器相关参数设置,请参考 [Batch-Processor](../batch-processor.md#配置) 配置部分。 diff --git a/t/plugin/kafka-logger.t b/t/plugin/kafka-logger.t index 8c3dede39cd0..dfbad93521e0 100644 --- a/t/plugin/kafka-logger.t +++ b/t/plugin/kafka-logger.t @@ -98,7 +98,7 @@ done -=== TEST 3.1: api_version configurable (default is 2) +=== TEST 3.1: api_version 2 for Kafka 4.x --- config location /t { content_by_lua_block { From 5fd8c929c292a719784628f7551af219d0691979 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=AE=B5=E6=99=93=E9=9B=84?= Date: Fri, 27 Feb 2026 17:39:36 +0800 Subject: [PATCH 03/12] fix(kafka-logger): only pass api_version when explicitly set Avoid passing api_version=nil to producer config when not specified, let lua-resty-kafka use its default (0) for backward compatibility. Made-with: Cursor --- apisix/plugins/kafka-logger.lua | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/apisix/plugins/kafka-logger.lua b/apisix/plugins/kafka-logger.lua index 888870604507..dc952ef1d5e7 100644 --- a/apisix/plugins/kafka-logger.lua +++ b/apisix/plugins/kafka-logger.lua @@ -294,7 +294,9 @@ function _M.log(conf, ctx) broker_config["request_timeout"] = conf.timeout * 1000 broker_config["producer_type"] = conf.producer_type broker_config["required_acks"] = conf.required_acks - broker_config["api_version"] = conf.api_version + if conf.api_version ~= nil then + broker_config["api_version"] = conf.api_version + end broker_config["batch_num"] = conf.producer_batch_num broker_config["batch_size"] = conf.producer_batch_size broker_config["max_buffering"] = conf.producer_max_buffering From ef16f5e04aa1edcf3ba7282af1404a5874cc7f68 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=AE=B5=E6=99=93=E9=9B=84?= Date: Fri, 27 Feb 2026 17:41:00 +0800 Subject: [PATCH 04/12] fix(kafka-logger): shorten api_version description to comply with max line length (100) Made-with: Cursor --- apisix/plugins/kafka-logger.lua | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apisix/plugins/kafka-logger.lua b/apisix/plugins/kafka-logger.lua index dc952ef1d5e7..d6ca59b87e14 100644 --- a/apisix/plugins/kafka-logger.lua +++ b/apisix/plugins/kafka-logger.lua @@ -136,7 +136,7 @@ local schema = { default = 0, minimum = 0, maximum = 2, - description = "Produce API version in lua-resty-kafka. Use 2 for Kafka 4.x compatibility.", + description = "Produce API version. Use 2 for Kafka 4.x compatibility.", }, }, oneOf = { From d77745ad614a367f6b0ff726cecf802d9d489a5f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=AE=B5=E6=99=93=E9=9B=84?= Date: Sat, 28 Feb 2026 10:15:15 +0800 Subject: [PATCH 05/12] ci: add Kafka 4.x integration test for kafka-logger api_version Made-with: Cursor --- ci/init-plugin-test-service.sh | 5 ++++ ci/pod/docker-compose.plugin.yml | 12 ++++++++ t/plugin/kafka-logger.t | 49 ++++++++++++++++++++++++++++++++ 3 files changed, 66 insertions(+) diff --git a/ci/init-plugin-test-service.sh b/ci/init-plugin-test-service.sh index 7b60607f5851..1f8e4ecbb98c 100755 --- a/ci/init-plugin-test-service.sh +++ b/ci/init-plugin-test-service.sh @@ -18,6 +18,11 @@ after() { docker exec -i apache-apisix-kafka-server1-1 /opt/bitnami/kafka/bin/kafka-topics.sh --create --zookeeper zookeeper-server1:2181 --replication-factor 1 --partitions 1 --topic test2 + # Kafka 4.x topic for api_version=2 verification (uses bootstrap-server, not zookeeper) + for i in 1 2 3 4 5 6 7 8 9 10; do + sleep 3 + docker exec -i apache-apisix-kafka-server4-kafka4-1 /opt/kafka/bin/kafka-topics.sh --create --topic test-kafka4 --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1 2>/dev/null && break || true + done docker exec -i apache-apisix-kafka-server1-1 /opt/bitnami/kafka/bin/kafka-topics.sh --create --zookeeper zookeeper-server1:2181 --replication-factor 1 --partitions 3 --topic test3 docker exec -i apache-apisix-kafka-server2-1 /opt/bitnami/kafka/bin/kafka-topics.sh --create --zookeeper zookeeper-server2:2181 --replication-factor 1 --partitions 1 --topic test4 docker exec -i apache-apisix-kafka-server3-scram-1 /opt/bitnami/kafka/bin/kafka-topics.sh --create --zookeeper zookeeper-server3:2181 --replication-factor 1 --partitions 1 --topic test-scram-256 diff --git a/ci/pod/docker-compose.plugin.yml b/ci/pod/docker-compose.plugin.yml index a0923fd67733..65a844baa989 100644 --- a/ci/pod/docker-compose.plugin.yml +++ b/ci/pod/docker-compose.plugin.yml @@ -123,6 +123,18 @@ services: volumes: - ./ci/pod/kafka/kafka-server/kafka_scram_jaas.conf:/opt/bitnami/kafka/config/kafka_jaas.conf:ro + ## Kafka 4.x for api_version=2 verification (KRaft mode, no Zookeeper) + kafka-server4-kafka4: + image: apache/kafka:4.0.1 + restart: unless-stopped + ports: + - "39092:9092" + environment: + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://127.0.0.1:39092 + KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092 + networks: + kafka_net: + ## SkyWalking skywalking: image: apache/skywalking-oap-server:8.7.0-es6 diff --git a/t/plugin/kafka-logger.t b/t/plugin/kafka-logger.t index dfbad93521e0..e9a36608fae9 100644 --- a/t/plugin/kafka-logger.t +++ b/t/plugin/kafka-logger.t @@ -777,6 +777,55 @@ qr/send data to kafka: \{.*"body":"abcdef"/ +=== TEST 23.1: Kafka 4.x with api_version=2 (verify compatibility) +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/routes/1', + ngx.HTTP_PUT, + [[{ + "plugins": { + "kafka-logger": { + "brokers": [{"host": "127.0.0.1", "port": 39092}], + "kafka_topic": "test-kafka4", + "api_version": 2, + "key": "key1", + "timeout": 3, + "batch_max_size": 1, + "producer_type": "sync" + } + }, + "upstream": {"nodes": {"127.0.0.1:1980": 1}, "type": "roundrobin"}, + "uri": "/hello" + }]] + ) + if code >= 300 then + ngx.status = code + ngx.say(body) + return + end + ngx.say("passed") + } + } +--- response_body +passed + + + +=== TEST 23.2: hit route, send data to Kafka 4.x successfully +--- request +GET /hello?kafka4=yes +--- response_body +hello world +--- error_log_like eval +qr/send data to kafka: \{.*"upstream":"127.0.0.1:1980"/ +--- no_error_log +[error] +--- wait: 3 + + + === TEST 24: set route(batch_max_size = 2), check if prometheus is initialized properly --- config location /t { From 6e9f7d6edd26afdf0c0beab109c357b550f5bf22 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=AE=B5=E6=99=93=E9=9B=84?= Date: Sat, 28 Feb 2026 13:13:01 +0800 Subject: [PATCH 06/12] ci: fix Kafka 4.x KRaft config and improve topic creation reliability Made-with: Cursor --- ci/init-plugin-test-service.sh | 3 ++- ci/pod/docker-compose.plugin.yml | 16 ++++++++++++++-- 2 files changed, 16 insertions(+), 3 deletions(-) diff --git a/ci/init-plugin-test-service.sh b/ci/init-plugin-test-service.sh index 1f8e4ecbb98c..85c1482319c7 100755 --- a/ci/init-plugin-test-service.sh +++ b/ci/init-plugin-test-service.sh @@ -19,7 +19,8 @@ after() { docker exec -i apache-apisix-kafka-server1-1 /opt/bitnami/kafka/bin/kafka-topics.sh --create --zookeeper zookeeper-server1:2181 --replication-factor 1 --partitions 1 --topic test2 # Kafka 4.x topic for api_version=2 verification (uses bootstrap-server, not zookeeper) - for i in 1 2 3 4 5 6 7 8 9 10; do + # Wait for Kafka 4.x KRaft to be ready (can take ~30s), then create topic + for i in 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20; do sleep 3 docker exec -i apache-apisix-kafka-server4-kafka4-1 /opt/kafka/bin/kafka-topics.sh --create --topic test-kafka4 --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1 2>/dev/null && break || true done diff --git a/ci/pod/docker-compose.plugin.yml b/ci/pod/docker-compose.plugin.yml index 65a844baa989..a00219d53601 100644 --- a/ci/pod/docker-compose.plugin.yml +++ b/ci/pod/docker-compose.plugin.yml @@ -126,12 +126,24 @@ services: ## Kafka 4.x for api_version=2 verification (KRaft mode, no Zookeeper) kafka-server4-kafka4: image: apache/kafka:4.0.1 + hostname: kafka-server4-kafka4 restart: unless-stopped ports: - "39092:9092" environment: - KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://127.0.0.1:39092 - KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092 + KAFKA_NODE_ID: 1 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT_HOST://127.0.0.1:39092,PLAINTEXT://kafka-server4-kafka4:19092 + KAFKA_PROCESS_ROLES: broker,controller + KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka-server4-kafka4:29093 + KAFKA_LISTENERS: CONTROLLER://:29093,PLAINTEXT_HOST://:9092,PLAINTEXT://:19092 + KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT + KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 + KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 + KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 + KAFKA_LOG_DIRS: /tmp/kraft-combined-logs networks: kafka_net: From f3ebb9560f39235e5e5b9c0b6de97706fab09a61 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=AE=B5=E6=99=93=E9=9B=84?= Date: Sat, 28 Feb 2026 13:20:54 +0800 Subject: [PATCH 07/12] refactor(kafka-logger): remove redundant api_version nil check per schema guarantee Made-with: Cursor --- apisix/plugins/kafka-logger.lua | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/apisix/plugins/kafka-logger.lua b/apisix/plugins/kafka-logger.lua index d6ca59b87e14..8b900b8ce136 100644 --- a/apisix/plugins/kafka-logger.lua +++ b/apisix/plugins/kafka-logger.lua @@ -294,9 +294,7 @@ function _M.log(conf, ctx) broker_config["request_timeout"] = conf.timeout * 1000 broker_config["producer_type"] = conf.producer_type broker_config["required_acks"] = conf.required_acks - if conf.api_version ~= nil then - broker_config["api_version"] = conf.api_version - end + broker_config["api_version"] = conf.api_version broker_config["batch_num"] = conf.producer_batch_num broker_config["batch_size"] = conf.producer_batch_size broker_config["max_buffering"] = conf.producer_max_buffering From 1812f510c7e8c78e0ca014ddc9d9468739162b4d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=AE=B5=E6=99=93=E9=9B=84?= Date: Sat, 28 Feb 2026 21:05:47 +0800 Subject: [PATCH 08/12] ci: use bitnamilegacy/kafka:4.0.0 with full KRaft config and fix test numbering - Switch Kafka 4.x image from apache/kafka to bitnamilegacy/kafka:4.0.0 - Add single-node replication factor settings for KRaft - Simplify topic creation retry loop to {1..20} - Run reindex to fix kafka-logger.t test numbering for lint Made-with: Cursor --- ci/init-plugin-test-service.sh | 4 +-- ci/pod/docker-compose.plugin.yml | 28 +++++++-------- t/plugin/kafka-logger.t | 58 ++++++++++++++++---------------- 3 files changed, 44 insertions(+), 46 deletions(-) diff --git a/ci/init-plugin-test-service.sh b/ci/init-plugin-test-service.sh index 85c1482319c7..cd71c4a07c53 100755 --- a/ci/init-plugin-test-service.sh +++ b/ci/init-plugin-test-service.sh @@ -20,9 +20,9 @@ after() { docker exec -i apache-apisix-kafka-server1-1 /opt/bitnami/kafka/bin/kafka-topics.sh --create --zookeeper zookeeper-server1:2181 --replication-factor 1 --partitions 1 --topic test2 # Kafka 4.x topic for api_version=2 verification (uses bootstrap-server, not zookeeper) # Wait for Kafka 4.x KRaft to be ready (can take ~30s), then create topic - for i in 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20; do + for i in {1..20}; do sleep 3 - docker exec -i apache-apisix-kafka-server4-kafka4-1 /opt/kafka/bin/kafka-topics.sh --create --topic test-kafka4 --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1 2>/dev/null && break || true + docker exec -i apache-apisix-kafka-server4-kafka4-1 /opt/bitnami/kafka/bin/kafka-topics.sh --create --topic test-kafka4 --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1 2>/dev/null && break || true done docker exec -i apache-apisix-kafka-server1-1 /opt/bitnami/kafka/bin/kafka-topics.sh --create --zookeeper zookeeper-server1:2181 --replication-factor 1 --partitions 3 --topic test3 docker exec -i apache-apisix-kafka-server2-1 /opt/bitnami/kafka/bin/kafka-topics.sh --create --zookeeper zookeeper-server2:2181 --replication-factor 1 --partitions 1 --topic test4 diff --git a/ci/pod/docker-compose.plugin.yml b/ci/pod/docker-compose.plugin.yml index a00219d53601..b22bb2b66dca 100644 --- a/ci/pod/docker-compose.plugin.yml +++ b/ci/pod/docker-compose.plugin.yml @@ -123,27 +123,25 @@ services: volumes: - ./ci/pod/kafka/kafka-server/kafka_scram_jaas.conf:/opt/bitnami/kafka/config/kafka_jaas.conf:ro - ## Kafka 4.x for api_version=2 verification (KRaft mode, no Zookeeper) + ## Kafka 4.x for api_version=2 verification (KRaft mode, same family as kafka-server1/2/3) kafka-server4-kafka4: - image: apache/kafka:4.0.1 + image: bitnamilegacy/kafka:4.0.0 hostname: kafka-server4-kafka4 restart: unless-stopped ports: - "39092:9092" environment: - KAFKA_NODE_ID: 1 - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT - KAFKA_ADVERTISED_LISTENERS: PLAINTEXT_HOST://127.0.0.1:39092,PLAINTEXT://kafka-server4-kafka4:19092 - KAFKA_PROCESS_ROLES: broker,controller - KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka-server4-kafka4:29093 - KAFKA_LISTENERS: CONTROLLER://:29093,PLAINTEXT_HOST://:9092,PLAINTEXT://:19092 - KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT - KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 - KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 - KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 - KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 - KAFKA_LOG_DIRS: /tmp/kraft-combined-logs + KAFKA_CFG_NODE_ID: 0 + KAFKA_CFG_PROCESS_ROLES: controller,broker + KAFKA_CFG_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093 + KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://127.0.0.1:39092 + KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT + KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER + KAFKA_CFG_INTER_BROKER_LISTENER_NAME: PLAINTEXT + KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 0@kafka-server4-kafka4:9093 + KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 + KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR: 1 networks: kafka_net: diff --git a/t/plugin/kafka-logger.t b/t/plugin/kafka-logger.t index e9a36608fae9..9527d06e0440 100644 --- a/t/plugin/kafka-logger.t +++ b/t/plugin/kafka-logger.t @@ -98,7 +98,7 @@ done -=== TEST 3.1: api_version 2 for Kafka 4.x +=== TEST 4: api_version 2 for Kafka 4.x --- config location /t { content_by_lua_block { @@ -120,7 +120,7 @@ done -=== TEST 3.2: api_version 0 for Kafka < 0.10.0.0 +=== TEST 5: api_version 0 for Kafka < 0.10.0.0 --- config location /t { content_by_lua_block { @@ -142,7 +142,7 @@ done -=== TEST 3.3: invalid api_version +=== TEST 6: invalid api_version --- config location /t { content_by_lua_block { @@ -164,7 +164,7 @@ err: property "api_version" validation failed: expected 3 to be at most 2 -=== TEST 4: set route(id: 1) +=== TEST 7: set route(id: 1) --- config location /t { content_by_lua_block { @@ -204,7 +204,7 @@ passed -=== TEST 5: access +=== TEST 8: access --- request GET /hello --- response_body @@ -214,7 +214,7 @@ hello world -=== TEST 6: error log +=== TEST 9: error log --- config location /t { content_by_lua_block { @@ -261,7 +261,7 @@ failed to send data to Kafka topic -=== TEST 7: set route(meta_format = origin, include_req_body = true) +=== TEST 10: set route(meta_format = origin, include_req_body = true) --- config location /t { content_by_lua_block { @@ -302,7 +302,7 @@ passed -=== TEST 8: hit route, report log to kafka +=== TEST 11: hit route, report log to kafka --- request GET /hello?ab=cd abcdef @@ -319,7 +319,7 @@ abcdef -=== TEST 9: set route(meta_format = origin, include_req_body = false) +=== TEST 12: set route(meta_format = origin, include_req_body = false) --- config location /t { content_by_lua_block { @@ -360,7 +360,7 @@ passed -=== TEST 10: hit route, report log to kafka +=== TEST 13: hit route, report log to kafka --- request GET /hello?ab=cd abcdef @@ -375,7 +375,7 @@ connection: close -=== TEST 11: set route(meta_format = default) +=== TEST 14: set route(meta_format = default) --- config location /t { content_by_lua_block { @@ -415,7 +415,7 @@ passed -=== TEST 12: hit route, report log to kafka +=== TEST 15: hit route, report log to kafka --- request GET /hello?ab=cd abcdef @@ -427,7 +427,7 @@ qr/send data to kafka: \{.*"upstream":"127.0.0.1:1980"/ -=== TEST 13: set route(id: 1), missing key field +=== TEST 16: set route(id: 1), missing key field --- config location /t { content_by_lua_block { @@ -466,7 +466,7 @@ passed -=== TEST 14: access, test key field is optional +=== TEST 17: access, test key field is optional --- request GET /hello --- response_body @@ -475,7 +475,7 @@ hello world -=== TEST 15: set route(meta_format = default), missing key field +=== TEST 18: set route(meta_format = default), missing key field --- config location /t { content_by_lua_block { @@ -514,7 +514,7 @@ passed -=== TEST 16: hit route, report log to kafka +=== TEST 19: hit route, report log to kafka --- request GET /hello?ab=cd abcdef @@ -526,7 +526,7 @@ qr/send data to kafka: \{.*"upstream":"127.0.0.1:1980"/ -=== TEST 17: use the topic with 3 partitions +=== TEST 20: use the topic with 3 partitions --- config location /t { content_by_lua_block { @@ -565,7 +565,7 @@ passed -=== TEST 18: report log to kafka by different partitions +=== TEST 21: report log to kafka by different partitions --- config location /t { content_by_lua_block { @@ -612,7 +612,7 @@ qr/partition_id: 2/] -=== TEST 19: report log to kafka by different partitions in async mode +=== TEST 22: report log to kafka by different partitions in async mode --- config location /t { content_by_lua_block { @@ -658,7 +658,7 @@ qr/partition_id: 2/] -=== TEST 20: set route with incorrect sasl_config +=== TEST 23: set route with incorrect sasl_config --- config location /t { content_by_lua_block { @@ -704,7 +704,7 @@ passed -=== TEST 21: hit route, failed to send data to kafka +=== TEST 24: hit route, failed to send data to kafka --- request GET /hello --- response_body @@ -715,7 +715,7 @@ failed to do PLAIN auth with 127.0.0.1:19094: Authentication failed: Invalid use -=== TEST 22: set route with correct sasl_config +=== TEST 25: set route with correct sasl_config --- config location /t { content_by_lua_block { @@ -763,7 +763,7 @@ passed -=== TEST 23: hit route, send data to kafka successfully +=== TEST 26: hit route, send data to kafka successfully --- request POST /hello?name=qwerty abcdef @@ -777,7 +777,7 @@ qr/send data to kafka: \{.*"body":"abcdef"/ -=== TEST 23.1: Kafka 4.x with api_version=2 (verify compatibility) +=== TEST 27: Kafka 4.x with api_version=2 (verify compatibility) --- config location /t { content_by_lua_block { @@ -813,7 +813,7 @@ passed -=== TEST 23.2: hit route, send data to Kafka 4.x successfully +=== TEST 28: hit route, send data to Kafka 4.x successfully --- request GET /hello?kafka4=yes --- response_body @@ -826,7 +826,7 @@ qr/send data to kafka: \{.*"upstream":"127.0.0.1:1980"/ -=== TEST 24: set route(batch_max_size = 2), check if prometheus is initialized properly +=== TEST 29: set route(batch_max_size = 2), check if prometheus is initialized properly --- config location /t { content_by_lua_block { @@ -866,7 +866,7 @@ passed -=== TEST 25: access +=== TEST 30: access --- extra_yaml_config plugins: - kafka-logger @@ -878,7 +878,7 @@ hello world -=== TEST 26: create a service with kafka-logger and three routes bound to it +=== TEST 31: create a service with kafka-logger and three routes bound to it --- config location /t { content_by_lua_block { @@ -932,7 +932,7 @@ passed -=== TEST 27: hit three routes, should create batch processor only once +=== TEST 32: hit three routes, should create batch processor only once --- log_level: debug --- config location /t { From ce6a645901efe74454550fd18129eef10fc29218 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=AE=B5=E6=99=93=E9=9B=84?= Date: Mon, 2 Mar 2026 21:17:39 +0800 Subject: [PATCH 09/12] fix(ci): add missing KRaft env vars and move Kafka 4.x init to end MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The bitnamilegacy/kafka:4.0.0 KRaft broker was not starting because KAFKA_ENABLE_KRAFT and KAFKA_KRAFT_CLUSTER_ID were missing. This caused the topic creation retry loop (20 × 60s timeout) to block all other service init for ~22 minutes, making the CI exceed its time limit. - Add KAFKA_ENABLE_KRAFT, KAFKA_KRAFT_CLUSTER_ID, ALLOW_PLAINTEXT_LISTENER - Move Kafka 4.x topic creation to end of init script - Reduce retry count from 20 to 10 Made-with: Cursor --- ci/init-plugin-test-service.sh | 15 +++++++++------ ci/pod/docker-compose.plugin.yml | 3 +++ 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/ci/init-plugin-test-service.sh b/ci/init-plugin-test-service.sh index cd71c4a07c53..75a8066ce99e 100755 --- a/ci/init-plugin-test-service.sh +++ b/ci/init-plugin-test-service.sh @@ -18,12 +18,6 @@ after() { docker exec -i apache-apisix-kafka-server1-1 /opt/bitnami/kafka/bin/kafka-topics.sh --create --zookeeper zookeeper-server1:2181 --replication-factor 1 --partitions 1 --topic test2 - # Kafka 4.x topic for api_version=2 verification (uses bootstrap-server, not zookeeper) - # Wait for Kafka 4.x KRaft to be ready (can take ~30s), then create topic - for i in {1..20}; do - sleep 3 - docker exec -i apache-apisix-kafka-server4-kafka4-1 /opt/bitnami/kafka/bin/kafka-topics.sh --create --topic test-kafka4 --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1 2>/dev/null && break || true - done docker exec -i apache-apisix-kafka-server1-1 /opt/bitnami/kafka/bin/kafka-topics.sh --create --zookeeper zookeeper-server1:2181 --replication-factor 1 --partitions 3 --topic test3 docker exec -i apache-apisix-kafka-server2-1 /opt/bitnami/kafka/bin/kafka-topics.sh --create --zookeeper zookeeper-server2:2181 --replication-factor 1 --partitions 1 --topic test4 docker exec -i apache-apisix-kafka-server3-scram-1 /opt/bitnami/kafka/bin/kafka-topics.sh --create --zookeeper zookeeper-server3:2181 --replication-factor 1 --partitions 1 --topic test-scram-256 @@ -70,6 +64,15 @@ after() { # configure clickhouse echo 'CREATE TABLE default.test (`host` String, `client_ip` String, `route_id` String, `service_id` String, `@timestamp` String, PRIMARY KEY(`@timestamp`)) ENGINE = MergeTree()' | curl 'http://localhost:8123/' --data-binary @- echo 'CREATE TABLE default.test (`host` String, `client_ip` String, `route_id` String, `service_id` String, `@timestamp` String, PRIMARY KEY(`@timestamp`)) ENGINE = MergeTree()' | curl 'http://localhost:8124/' --data-binary @- + + # Kafka 4.x topic for api_version=2 verification (uses bootstrap-server, not zookeeper) + # Placed at the end so failures don't block other service initialization + for i in {1..10}; do + sleep 3 + docker exec -i apache-apisix-kafka-server4-kafka4-1 /opt/bitnami/kafka/bin/kafka-topics.sh \ + --create --topic test-kafka4 --bootstrap-server localhost:9092 \ + --partitions 1 --replication-factor 1 2>/dev/null && break || true + done } before() { diff --git a/ci/pod/docker-compose.plugin.yml b/ci/pod/docker-compose.plugin.yml index b22bb2b66dca..2d13233e3f41 100644 --- a/ci/pod/docker-compose.plugin.yml +++ b/ci/pod/docker-compose.plugin.yml @@ -131,6 +131,9 @@ services: ports: - "39092:9092" environment: + KAFKA_ENABLE_KRAFT: "yes" + KAFKA_KRAFT_CLUSTER_ID: "LelM2dIFQkiUFvXCEcqRWA" + ALLOW_PLAINTEXT_LISTENER: "yes" KAFKA_CFG_NODE_ID: 0 KAFKA_CFG_PROCESS_ROLES: controller,broker KAFKA_CFG_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093 From d7b3b274c2448e686dd0e5ede4d1d21fe8c9b92f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=AE=B5=E6=99=93=E9=9B=84?= Date: Wed, 4 Mar 2026 18:10:48 +0800 Subject: [PATCH 10/12] chore(kafka-logger,ci): use apache kafka image and align api_version default - Switch Kafka 4.x test broker to official apache/kafka:4.0.0 image with KRaft configuration based on upstream docker examples - Call kafka-topics.sh via PATH inside the container for Kafka 4.x topic creation - Set api_version schema default to 1 and update comments/docs so that the default matches lua-resty-kafka while still documenting that 2 is required for Kafka 4.x Made-with: Cursor --- apisix/plugins/kafka-logger.lua | 4 ++-- ci/init-plugin-test-service.sh | 2 +- ci/pod/docker-compose.plugin.yml | 32 ++++++++++++++------------ docs/en/latest/plugins/kafka-logger.md | 2 +- docs/zh/latest/plugins/kafka-logger.md | 2 +- 5 files changed, 22 insertions(+), 20 deletions(-) diff --git a/apisix/plugins/kafka-logger.lua b/apisix/plugins/kafka-logger.lua index 8b900b8ce136..ed904dd14f4d 100644 --- a/apisix/plugins/kafka-logger.lua +++ b/apisix/plugins/kafka-logger.lua @@ -129,11 +129,11 @@ local schema = { producer_max_buffering = {type = "integer", minimum = 1, default = 50000}, producer_time_linger = {type = "integer", minimum = 1, default = 1}, meta_refresh_interval = {type = "integer", minimum = 1, default = 30}, - -- Kafka Produce API version. Default 0 for backward compatibility. + -- Kafka Produce API version. Default 1 to match lua-resty-kafka. -- Kafka 4.x drops support for magic0 and magic1. Use 2 for Kafka 4.x. api_version = { type = "integer", - default = 0, + default = 1, minimum = 0, maximum = 2, description = "Produce API version. Use 2 for Kafka 4.x compatibility.", diff --git a/ci/init-plugin-test-service.sh b/ci/init-plugin-test-service.sh index 75a8066ce99e..e1a3bc866c15 100755 --- a/ci/init-plugin-test-service.sh +++ b/ci/init-plugin-test-service.sh @@ -69,7 +69,7 @@ after() { # Placed at the end so failures don't block other service initialization for i in {1..10}; do sleep 3 - docker exec -i apache-apisix-kafka-server4-kafka4-1 /opt/bitnami/kafka/bin/kafka-topics.sh \ + docker exec -i apache-apisix-kafka-server4-kafka4-1 kafka-topics.sh \ --create --topic test-kafka4 --bootstrap-server localhost:9092 \ --partitions 1 --replication-factor 1 2>/dev/null && break || true done diff --git a/ci/pod/docker-compose.plugin.yml b/ci/pod/docker-compose.plugin.yml index 2d13233e3f41..f9379f2b9579 100644 --- a/ci/pod/docker-compose.plugin.yml +++ b/ci/pod/docker-compose.plugin.yml @@ -125,26 +125,28 @@ services: ## Kafka 4.x for api_version=2 verification (KRaft mode, same family as kafka-server1/2/3) kafka-server4-kafka4: - image: bitnamilegacy/kafka:4.0.0 + image: apache/kafka:4.0.0 hostname: kafka-server4-kafka4 restart: unless-stopped ports: - "39092:9092" environment: - KAFKA_ENABLE_KRAFT: "yes" - KAFKA_KRAFT_CLUSTER_ID: "LelM2dIFQkiUFvXCEcqRWA" - ALLOW_PLAINTEXT_LISTENER: "yes" - KAFKA_CFG_NODE_ID: 0 - KAFKA_CFG_PROCESS_ROLES: controller,broker - KAFKA_CFG_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093 - KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://127.0.0.1:39092 - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT - KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER - KAFKA_CFG_INTER_BROKER_LISTENER_NAME: PLAINTEXT - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 0@kafka-server4-kafka4:9093 - KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 - KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 - KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR: 1 + KAFKA_NODE_ID: 1 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT" + KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT_HOST://localhost:39092,PLAINTEXT://kafka-server4-kafka4:19092" + KAFKA_PROCESS_ROLES: "broker,controller" + KAFKA_CONTROLLER_QUORUM_VOTERS: "1@kafka-server4-kafka4:29093" + KAFKA_LISTENERS: "CONTROLLER://:29093,PLAINTEXT_HOST://:9092,PLAINTEXT://:19092" + KAFKA_INTER_BROKER_LISTENER_NAME: "PLAINTEXT" + KAFKA_CONTROLLER_LISTENER_NAMES: "CONTROLLER" + CLUSTER_ID: "4L6g3nShT-eMCtK--X86sw" + KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 + KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 + KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 + KAFKA_SHARE_COORDINATOR_STATE_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_SHARE_COORDINATOR_STATE_TOPIC_MIN_ISR: 1 + KAFKA_LOG_DIRS: "/tmp/kraft-combined-logs" networks: kafka_net: diff --git a/docs/en/latest/plugins/kafka-logger.md b/docs/en/latest/plugins/kafka-logger.md index 20e11850d7d5..03a8f6b3fe8b 100644 --- a/docs/en/latest/plugins/kafka-logger.md +++ b/docs/en/latest/plugins/kafka-logger.md @@ -65,7 +65,7 @@ It might take some time to receive the log data. It will be automatically sent a | producer_max_buffering | integer | optional | 50000 | [1,...] | `max_buffering` parameter in [lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka) representing maximum buffer size. Unit is message count. | | producer_time_linger | integer | optional | 1 | [1,...] | `flush_time` parameter in [lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka) in seconds. | | meta_refresh_interval | integer | optional | 30 | [1,...] | `refresh_interval` parameter in [lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka) specifies the time to auto refresh the metadata, in seconds. | -| api_version | integer | optional | 0 | [0, 1, 2] | Produce API version in [lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka). Default `0` for backward compatibility. Use `2` for Apache Kafka 4.x (Kafka 4.x drops support for magic0 and magic1). | +| api_version | integer | optional | 1 | [0, 1, 2] | Produce API version in [lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka). Default `1` to match the upstream library. Use `2` for Apache Kafka 4.x (Kafka 4.x drops support for magic0 and magic1). | This Plugin supports using batch processors to aggregate and process entries (logs/data) in a batch. This avoids the need for frequently submitting the data. The batch processor submits data every `5` seconds or when the data in the queue reaches `1000`. See [Batch Processor](../batch-processor.md#configuration) for more information or setting your custom configuration. diff --git a/docs/zh/latest/plugins/kafka-logger.md b/docs/zh/latest/plugins/kafka-logger.md index 89dc33202c17..ca1ea2267629 100644 --- a/docs/zh/latest/plugins/kafka-logger.md +++ b/docs/zh/latest/plugins/kafka-logger.md @@ -63,7 +63,7 @@ description: API 网关 Apache APISIX 的 kafka-logger 插件用于将日志作 | producer_max_buffering | integer | 否 | 50000 | [1,...] | 对应 [lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka) 中的 `max_buffering` 参数,表示最大缓冲区,单位为条。 | | producer_time_linger | integer | 否 | 1 | [1,...] | 对应 [lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka) 中的 `flush_time` 参数,单位为秒。| | meta_refresh_interval | integer | 否 | 30 | [1,...] | 对应 [lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka) 中的 `refresh_interval` 参数,用于指定自动刷新 metadata 的间隔时长,单位为秒。 | -| api_version | integer | 否 | 0 | [0, 1, 2] | [lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka) 中的 Produce API 版本。默认 `0` 以保持向后兼容。使用 Apache Kafka 4.x 时需设置为 `2`(Kafka 4.x 弃用了 magic0 和 magic1)。 | +| api_version | integer | 否 | 1 | [0, 1, 2] | [lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka) 中的 Produce API 版本。默认 `1`,与上游库保持一致。使用 Apache Kafka 4.x 时需设置为 `2`(Kafka 4.x 弃用了 magic0 和 magic1)。 | 该插件支持使用批处理器来聚合并批量处理条目(日志/数据)。这样可以避免插件频繁地提交数据,默认设置情况下批处理器会每 `5` 秒钟或队列中的数据达到 `1000` 条时提交数据,如需了解批处理器相关参数设置,请参考 [Batch-Processor](../batch-processor.md#配置) 配置部分。 From 699e41e69d33d36214bebaf7407af941320198f5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=AE=B5=E6=99=93=E9=9B=84?= Date: Wed, 4 Mar 2026 19:31:15 +0800 Subject: [PATCH 11/12] chore(ci): cap kafka 4.x topic creation time with timeout Made-with: Cursor --- ci/init-plugin-test-service.sh | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/ci/init-plugin-test-service.sh b/ci/init-plugin-test-service.sh index e1a3bc866c15..d3f085c81bab 100755 --- a/ci/init-plugin-test-service.sh +++ b/ci/init-plugin-test-service.sh @@ -66,10 +66,11 @@ after() { echo 'CREATE TABLE default.test (`host` String, `client_ip` String, `route_id` String, `service_id` String, `@timestamp` String, PRIMARY KEY(`@timestamp`)) ENGINE = MergeTree()' | curl 'http://localhost:8124/' --data-binary @- # Kafka 4.x topic for api_version=2 verification (uses bootstrap-server, not zookeeper) - # Placed at the end so failures don't block other service initialization + # Placed at the end so failures don't block other service initialization. + # Use a bounded timeout per attempt to avoid long hangs if the broker is unhealthy. for i in {1..10}; do sleep 3 - docker exec -i apache-apisix-kafka-server4-kafka4-1 kafka-topics.sh \ + timeout 30s docker exec -i apache-apisix-kafka-server4-kafka4-1 kafka-topics.sh \ --create --topic test-kafka4 --bootstrap-server localhost:9092 \ --partitions 1 --replication-factor 1 2>/dev/null && break || true done From 82fef14c7535b18194c77734ab73f8f46acc9da9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=AE=B5=E6=99=93=E9=9B=84?= Date: Wed, 4 Mar 2026 19:36:29 +0800 Subject: [PATCH 12/12] chore(ci): make kafka 4.x topic creation more robust Made-with: Cursor --- ci/init-plugin-test-service.sh | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/ci/init-plugin-test-service.sh b/ci/init-plugin-test-service.sh index d3f085c81bab..c6397f6c3c13 100755 --- a/ci/init-plugin-test-service.sh +++ b/ci/init-plugin-test-service.sh @@ -68,12 +68,16 @@ after() { # Kafka 4.x topic for api_version=2 verification (uses bootstrap-server, not zookeeper) # Placed at the end so failures don't block other service initialization. # Use a bounded timeout per attempt to avoid long hangs if the broker is unhealthy. - for i in {1..10}; do - sleep 3 + for i in {1..20}; do + sleep 5 timeout 30s docker exec -i apache-apisix-kafka-server4-kafka4-1 kafka-topics.sh \ --create --topic test-kafka4 --bootstrap-server localhost:9092 \ --partitions 1 --replication-factor 1 2>/dev/null && break || true done + + if ! docker exec -i apache-apisix-kafka-server4-kafka4-1 kafka-topics.sh --list 2>/dev/null | grep -q '^test-kafka4$'; then + echo "[warn] kafka-logger: failed to create test-kafka4 topic on Kafka 4.x after multiple retries, Kafka 4.x tests may be skipped." >&2 + fi } before() {