diff --git a/apisix/plugins/kafka-logger.lua b/apisix/plugins/kafka-logger.lua index ec4f694f61ec..8b900b8ce136 100644 --- a/apisix/plugins/kafka-logger.lua +++ b/apisix/plugins/kafka-logger.lua @@ -129,6 +129,15 @@ local schema = { producer_max_buffering = {type = "integer", minimum = 1, default = 50000}, producer_time_linger = {type = "integer", minimum = 1, default = 1}, meta_refresh_interval = {type = "integer", minimum = 1, default = 30}, + -- Kafka Produce API version. Default 0 for backward compatibility. + -- Kafka 4.x drops support for magic0 and magic1. Use 2 for Kafka 4.x. + api_version = { + type = "integer", + default = 0, + minimum = 0, + maximum = 2, + description = "Produce API version. Use 2 for Kafka 4.x compatibility.", + }, }, oneOf = { { required = {"broker_list", "kafka_topic"},}, @@ -285,6 +294,7 @@ function _M.log(conf, ctx) broker_config["request_timeout"] = conf.timeout * 1000 broker_config["producer_type"] = conf.producer_type broker_config["required_acks"] = conf.required_acks + broker_config["api_version"] = conf.api_version broker_config["batch_num"] = conf.producer_batch_num broker_config["batch_size"] = conf.producer_batch_size broker_config["max_buffering"] = conf.producer_max_buffering diff --git a/ci/init-plugin-test-service.sh b/ci/init-plugin-test-service.sh index 7b60607f5851..75a8066ce99e 100755 --- a/ci/init-plugin-test-service.sh +++ b/ci/init-plugin-test-service.sh @@ -64,6 +64,15 @@ after() { # configure clickhouse echo 'CREATE TABLE default.test (`host` String, `client_ip` String, `route_id` String, `service_id` String, `@timestamp` String, PRIMARY KEY(`@timestamp`)) ENGINE = MergeTree()' | curl 'http://localhost:8123/' --data-binary @- echo 'CREATE TABLE default.test (`host` String, `client_ip` String, `route_id` String, `service_id` String, `@timestamp` String, PRIMARY KEY(`@timestamp`)) ENGINE = MergeTree()' | curl 'http://localhost:8124/' --data-binary @- + + # Kafka 4.x topic for api_version=2 verification (uses bootstrap-server, not zookeeper) + # Placed at the end so failures don't block other service initialization + for i in {1..10}; do + sleep 3 + docker exec -i apache-apisix-kafka-server4-kafka4-1 /opt/bitnami/kafka/bin/kafka-topics.sh \ + --create --topic test-kafka4 --bootstrap-server localhost:9092 \ + --partitions 1 --replication-factor 1 2>/dev/null && break || true + done } before() { diff --git a/ci/pod/docker-compose.plugin.yml b/ci/pod/docker-compose.plugin.yml index a0923fd67733..2d13233e3f41 100644 --- a/ci/pod/docker-compose.plugin.yml +++ b/ci/pod/docker-compose.plugin.yml @@ -123,6 +123,31 @@ services: volumes: - ./ci/pod/kafka/kafka-server/kafka_scram_jaas.conf:/opt/bitnami/kafka/config/kafka_jaas.conf:ro + ## Kafka 4.x for api_version=2 verification (KRaft mode, same family as kafka-server1/2/3) + kafka-server4-kafka4: + image: bitnamilegacy/kafka:4.0.0 + hostname: kafka-server4-kafka4 + restart: unless-stopped + ports: + - "39092:9092" + environment: + KAFKA_ENABLE_KRAFT: "yes" + KAFKA_KRAFT_CLUSTER_ID: "LelM2dIFQkiUFvXCEcqRWA" + ALLOW_PLAINTEXT_LISTENER: "yes" + KAFKA_CFG_NODE_ID: 0 + KAFKA_CFG_PROCESS_ROLES: controller,broker + KAFKA_CFG_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093 + KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://127.0.0.1:39092 + KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT + KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER + KAFKA_CFG_INTER_BROKER_LISTENER_NAME: PLAINTEXT + KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 0@kafka-server4-kafka4:9093 + KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 + KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR: 1 + networks: + kafka_net: + ## SkyWalking skywalking: image: apache/skywalking-oap-server:8.7.0-es6 diff --git a/docs/en/latest/plugins/kafka-logger.md b/docs/en/latest/plugins/kafka-logger.md index eeaf3f2f0a6e..20e11850d7d5 100644 --- a/docs/en/latest/plugins/kafka-logger.md +++ b/docs/en/latest/plugins/kafka-logger.md @@ -65,6 +65,7 @@ It might take some time to receive the log data. It will be automatically sent a | producer_max_buffering | integer | optional | 50000 | [1,...] | `max_buffering` parameter in [lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka) representing maximum buffer size. Unit is message count. | | producer_time_linger | integer | optional | 1 | [1,...] | `flush_time` parameter in [lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka) in seconds. | | meta_refresh_interval | integer | optional | 30 | [1,...] | `refresh_interval` parameter in [lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka) specifies the time to auto refresh the metadata, in seconds. | +| api_version | integer | optional | 0 | [0, 1, 2] | Produce API version in [lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka). Default `0` for backward compatibility. Use `2` for Apache Kafka 4.x (Kafka 4.x drops support for magic0 and magic1). | This Plugin supports using batch processors to aggregate and process entries (logs/data) in a batch. This avoids the need for frequently submitting the data. The batch processor submits data every `5` seconds or when the data in the queue reaches `1000`. See [Batch Processor](../batch-processor.md#configuration) for more information or setting your custom configuration. diff --git a/docs/zh/latest/plugins/kafka-logger.md b/docs/zh/latest/plugins/kafka-logger.md index bd606f180340..89dc33202c17 100644 --- a/docs/zh/latest/plugins/kafka-logger.md +++ b/docs/zh/latest/plugins/kafka-logger.md @@ -63,6 +63,7 @@ description: API 网关 Apache APISIX 的 kafka-logger 插件用于将日志作 | producer_max_buffering | integer | 否 | 50000 | [1,...] | 对应 [lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka) 中的 `max_buffering` 参数,表示最大缓冲区,单位为条。 | | producer_time_linger | integer | 否 | 1 | [1,...] | 对应 [lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka) 中的 `flush_time` 参数,单位为秒。| | meta_refresh_interval | integer | 否 | 30 | [1,...] | 对应 [lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka) 中的 `refresh_interval` 参数,用于指定自动刷新 metadata 的间隔时长,单位为秒。 | +| api_version | integer | 否 | 0 | [0, 1, 2] | [lua-resty-kafka](https://github.com/doujiang24/lua-resty-kafka) 中的 Produce API 版本。默认 `0` 以保持向后兼容。使用 Apache Kafka 4.x 时需设置为 `2`(Kafka 4.x 弃用了 magic0 和 magic1)。 | 该插件支持使用批处理器来聚合并批量处理条目(日志/数据)。这样可以避免插件频繁地提交数据,默认设置情况下批处理器会每 `5` 秒钟或队列中的数据达到 `1000` 条时提交数据,如需了解批处理器相关参数设置,请参考 [Batch-Processor](../batch-processor.md#配置) 配置部分。 diff --git a/t/plugin/kafka-logger.t b/t/plugin/kafka-logger.t index 90003e20c8d0..9527d06e0440 100644 --- a/t/plugin/kafka-logger.t +++ b/t/plugin/kafka-logger.t @@ -98,7 +98,73 @@ done -=== TEST 4: set route(id: 1) +=== TEST 4: api_version 2 for Kafka 4.x +--- config + location /t { + content_by_lua_block { + local plugin = require("apisix.plugins.kafka-logger") + local ok, err = plugin.check_schema({ + kafka_topic = "test", + brokers = {{host = "127.0.0.1", port = 9092}}, + api_version = 2, + }) + if not ok then + ngx.say("err: ", err) + return + end + ngx.say("done") + } + } +--- response_body +done + + + +=== TEST 5: api_version 0 for Kafka < 0.10.0.0 +--- config + location /t { + content_by_lua_block { + local plugin = require("apisix.plugins.kafka-logger") + local ok, err = plugin.check_schema({ + kafka_topic = "test", + brokers = {{host = "127.0.0.1", port = 9092}}, + api_version = 0, + }) + if not ok then + ngx.say("err: ", err) + return + end + ngx.say("done") + } + } +--- response_body +done + + + +=== TEST 6: invalid api_version +--- config + location /t { + content_by_lua_block { + local plugin = require("apisix.plugins.kafka-logger") + local ok, err = plugin.check_schema({ + kafka_topic = "test", + brokers = {{host = "127.0.0.1", port = 9092}}, + api_version = 3, + }) + if not ok then + ngx.say("err: ", err) + return + end + ngx.say("done") + } + } +--- response_body +err: property "api_version" validation failed: expected 3 to be at most 2 + + + +=== TEST 7: set route(id: 1) --- config location /t { content_by_lua_block { @@ -138,7 +204,7 @@ passed -=== TEST 5: access +=== TEST 8: access --- request GET /hello --- response_body @@ -148,7 +214,7 @@ hello world -=== TEST 6: error log +=== TEST 9: error log --- config location /t { content_by_lua_block { @@ -195,7 +261,7 @@ failed to send data to Kafka topic -=== TEST 7: set route(meta_format = origin, include_req_body = true) +=== TEST 10: set route(meta_format = origin, include_req_body = true) --- config location /t { content_by_lua_block { @@ -236,7 +302,7 @@ passed -=== TEST 8: hit route, report log to kafka +=== TEST 11: hit route, report log to kafka --- request GET /hello?ab=cd abcdef @@ -253,7 +319,7 @@ abcdef -=== TEST 9: set route(meta_format = origin, include_req_body = false) +=== TEST 12: set route(meta_format = origin, include_req_body = false) --- config location /t { content_by_lua_block { @@ -294,7 +360,7 @@ passed -=== TEST 10: hit route, report log to kafka +=== TEST 13: hit route, report log to kafka --- request GET /hello?ab=cd abcdef @@ -309,7 +375,7 @@ connection: close -=== TEST 11: set route(meta_format = default) +=== TEST 14: set route(meta_format = default) --- config location /t { content_by_lua_block { @@ -349,7 +415,7 @@ passed -=== TEST 12: hit route, report log to kafka +=== TEST 15: hit route, report log to kafka --- request GET /hello?ab=cd abcdef @@ -361,7 +427,7 @@ qr/send data to kafka: \{.*"upstream":"127.0.0.1:1980"/ -=== TEST 13: set route(id: 1), missing key field +=== TEST 16: set route(id: 1), missing key field --- config location /t { content_by_lua_block { @@ -400,7 +466,7 @@ passed -=== TEST 14: access, test key field is optional +=== TEST 17: access, test key field is optional --- request GET /hello --- response_body @@ -409,7 +475,7 @@ hello world -=== TEST 15: set route(meta_format = default), missing key field +=== TEST 18: set route(meta_format = default), missing key field --- config location /t { content_by_lua_block { @@ -448,7 +514,7 @@ passed -=== TEST 16: hit route, report log to kafka +=== TEST 19: hit route, report log to kafka --- request GET /hello?ab=cd abcdef @@ -460,7 +526,7 @@ qr/send data to kafka: \{.*"upstream":"127.0.0.1:1980"/ -=== TEST 17: use the topic with 3 partitions +=== TEST 20: use the topic with 3 partitions --- config location /t { content_by_lua_block { @@ -499,7 +565,7 @@ passed -=== TEST 18: report log to kafka by different partitions +=== TEST 21: report log to kafka by different partitions --- config location /t { content_by_lua_block { @@ -546,7 +612,7 @@ qr/partition_id: 2/] -=== TEST 19: report log to kafka by different partitions in async mode +=== TEST 22: report log to kafka by different partitions in async mode --- config location /t { content_by_lua_block { @@ -592,7 +658,7 @@ qr/partition_id: 2/] -=== TEST 20: set route with incorrect sasl_config +=== TEST 23: set route with incorrect sasl_config --- config location /t { content_by_lua_block { @@ -638,7 +704,7 @@ passed -=== TEST 21: hit route, failed to send data to kafka +=== TEST 24: hit route, failed to send data to kafka --- request GET /hello --- response_body @@ -649,7 +715,7 @@ failed to do PLAIN auth with 127.0.0.1:19094: Authentication failed: Invalid use -=== TEST 22: set route with correct sasl_config +=== TEST 25: set route with correct sasl_config --- config location /t { content_by_lua_block { @@ -697,7 +763,7 @@ passed -=== TEST 23: hit route, send data to kafka successfully +=== TEST 26: hit route, send data to kafka successfully --- request POST /hello?name=qwerty abcdef @@ -711,7 +777,56 @@ qr/send data to kafka: \{.*"body":"abcdef"/ -=== TEST 24: set route(batch_max_size = 2), check if prometheus is initialized properly +=== TEST 27: Kafka 4.x with api_version=2 (verify compatibility) +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/routes/1', + ngx.HTTP_PUT, + [[{ + "plugins": { + "kafka-logger": { + "brokers": [{"host": "127.0.0.1", "port": 39092}], + "kafka_topic": "test-kafka4", + "api_version": 2, + "key": "key1", + "timeout": 3, + "batch_max_size": 1, + "producer_type": "sync" + } + }, + "upstream": {"nodes": {"127.0.0.1:1980": 1}, "type": "roundrobin"}, + "uri": "/hello" + }]] + ) + if code >= 300 then + ngx.status = code + ngx.say(body) + return + end + ngx.say("passed") + } + } +--- response_body +passed + + + +=== TEST 28: hit route, send data to Kafka 4.x successfully +--- request +GET /hello?kafka4=yes +--- response_body +hello world +--- error_log_like eval +qr/send data to kafka: \{.*"upstream":"127.0.0.1:1980"/ +--- no_error_log +[error] +--- wait: 3 + + + +=== TEST 29: set route(batch_max_size = 2), check if prometheus is initialized properly --- config location /t { content_by_lua_block { @@ -751,7 +866,7 @@ passed -=== TEST 25: access +=== TEST 30: access --- extra_yaml_config plugins: - kafka-logger @@ -763,7 +878,7 @@ hello world -=== TEST 26: create a service with kafka-logger and three routes bound to it +=== TEST 31: create a service with kafka-logger and three routes bound to it --- config location /t { content_by_lua_block { @@ -817,7 +932,7 @@ passed -=== TEST 27: hit three routes, should create batch processor only once +=== TEST 32: hit three routes, should create batch processor only once --- log_level: debug --- config location /t {