From 68df1fe762e3cc6e0239292b3a750e26a5f36496 Mon Sep 17 00:00:00 2001 From: elizax Date: Wed, 4 Feb 2026 11:34:46 +0000 Subject: [PATCH 1/2] fix(ai-proxy-multi):health check not work --- apisix/plugins/ai-proxy-multi.lua | 134 ++++++++++++++++++++---------- test-nginx | 1 + 2 files changed, 93 insertions(+), 42 deletions(-) create mode 160000 test-nginx diff --git a/apisix/plugins/ai-proxy-multi.lua b/apisix/plugins/ai-proxy-multi.lua index 74a84d0e3534..7e64203b4be5 100644 --- a/apisix/plugins/ai-proxy-multi.lua +++ b/apisix/plugins/ai-proxy-multi.lua @@ -35,7 +35,7 @@ local endpoint_regex = "^(https?)://([^:/]+):?(%d*)/?.*$" local pickers = {} local lrucache_server_picker = core.lrucache.new({ - ttl = 300, count = 256 + ttl = 10, count = 256 }) local plugin_name = "ai-proxy-multi" @@ -107,13 +107,6 @@ function _M.check_schema(conf) core.log.warn("fail to require ai provider: ", instance.provider, ", err", err) return false, "ai provider: " .. instance.provider .. " is not supported." end - local sa_json = core.table.try_read_attr(instance, "auth", "gcp", "service_account_json") - if sa_json then - local _, err = core.json.decode(sa_json) - if err then - return false, "invalid gcp service_account_json: " .. err - end - end end local algo = core.table.try_read_attr(conf, "balancer", "algorithm") local hash_on = core.table.try_read_attr(conf, "balancer", "hash_on") @@ -177,7 +170,9 @@ local function parse_domain_for_node(node) end -local function resolve_endpoint(instance_conf) +-- Calculate DNS node from instance config without modifying the input +-- Returns a node table with host, port, scheme fields +local function calculate_dns_node(instance_conf) local scheme, host, port local endpoint = core.table.try_read_attr(instance_conf, "override", "endpoint") if endpoint then @@ -188,22 +183,23 @@ local function resolve_endpoint(instance_conf) port = tonumber(port) else local ai_driver = require("apisix.plugins.ai-drivers." .. instance_conf.provider) - if ai_driver.get_node then - local node = ai_driver.get_node(instance_conf) - host = node.host - port = node.port - else - host = ai_driver.host - port = ai_driver.port - end + -- built-in ai driver always use https scheme = "https" + host = ai_driver.host + port = ai_driver.port end - local new_node = { + local node = { host = host, port = tonumber(port), scheme = scheme, } - parse_domain_for_node(new_node) + parse_domain_for_node(node) + return node +end + + +local function resolve_endpoint(instance_conf) + local new_node = calculate_dns_node(instance_conf) -- Compare with existing node to see if anything changed local old_node = instance_conf._dns_value @@ -229,32 +225,75 @@ local function get_checkers_status_ver(checkers) end +-- Read health status directly from SHM to get consistent state across all workers +-- This bypasses the local cache which can be stale due to async worker events +local function fetch_health_status_from_shm(shm, checker_name, ip, port, hostname, instance_name) + local lookup_hostname = hostname or ip + local state_key = string.format("lua-resty-healthcheck:%s:state:%s:%s:%s", + checker_name, + ip, + port, + lookup_hostname) + + core.log.info("[SHM-DIRECT] instance=", instance_name, " key=", state_key) + local state = shm:get(state_key) + if state then + -- State values: 1=healthy, 2=unhealthy, 3=mostly_healthy, 4=mostly_unhealthy + local ok = (state == 1 or state == 3) + core.log.info("[SHM-DIRECT] instance=", instance_name, " state=", state, " ok=", ok) + return ok, state + end + + -- State not found in SHM (checker not yet created), default to healthy + core.log.warn("[SHM-DIRECT] state not found for instance=", instance_name, ", defaulting to healthy") + return true, nil +end + + +-- Get SHM and checker_name for an instance, returns (shm, checker_name) +local function get_shm_info(checkers, conf, i, ins) + local checker = checkers and checkers[ins.name] + + -- Prefer the instance's own checker + if checker and checker.shm then + core.log.info("[SHM-DEBUG] instance=", ins.name, " using own checker") + return checker.shm, checker.name + end + + -- Fallback: use another checker's SHM and construct the checker_name + local checker_ref = checkers and next(checkers) + if checker_ref and checker_ref.shm then + local checker_name = "upstream#" .. conf._meta.parent.resource_key .. "#plugins['ai-proxy-multi'].instances[" .. (i - 1) .. "]" + core.log.info("[SHM-DEBUG] instance=", ins.name, " using fallback checker_ref, checker_name=", checker_name) + return checker_ref.shm, checker_name + end + + core.log.warn("[SHM-DEBUG] instance=", ins.name, " checkers=", checkers and "exists" or "nil", " checker_ref=", checker_ref and "exists" or "nil") + return nil, nil +end + local function fetch_health_instances(conf, checkers) local instances = conf.instances local new_instances = core.table.new(0, #instances) - if not checkers then - for _, ins in ipairs(conf.instances) do - transform_instances(new_instances, ins) - end - return new_instances - end - for _, ins in ipairs(instances) do - local checker = checkers[ins.name] - if checker then - local host = ins.checks and ins.checks.active and ins.checks.active.host - local port = ins.checks and ins.checks.active and ins.checks.active.port + for i, ins in ipairs(instances) do + local host = ins.checks and ins.checks.active and ins.checks.active.host + local port = ins.checks and ins.checks.active and ins.checks.active.port + local node = ins._dns_value - local node = ins._dns_value - local ok, err = checker:get_target_status(node.host, port or node.port, host) + -- Get SHM and checker_name (works regardless of whether checker exists in this worker) + local shm, checker_name = get_shm_info(checkers, conf, i, ins) + + if shm then + -- Read health status directly from SHM + local ok = fetch_health_status_from_shm(shm, checker_name, + node.host, port or node.port, host, ins.name) if ok then transform_instances(new_instances, ins) - elseif err then - core.log.warn("failed to get health check target status, addr: ", - node.host, ":", port or node.port, ", host: ", host, ", err: ", err) end else + -- No SHM available at all, add instance by default transform_instances(new_instances, ins) end end @@ -302,15 +341,22 @@ end function _M.construct_upstream(instance) local upstream = {} local node = instance._dns_value + + -- If _dns_value doesn't exist (e.g., when called from timer), + -- calculate it from the instance config if not node then - return nil, "failed to resolve endpoint for instance: " .. instance.name + core.log.info("instance._dns_value not found, calculating from config for instance: ", instance.name) + node = calculate_dns_node(instance) + if not node then + return nil, "failed to calculate endpoint for instance: " .. instance.name + end end if not node.host or not node.port then return nil, "invalid upstream node: " .. core.json.encode(node) end - local node = { + local upstream_node = { host = node.host, port = node.port, scheme = node.scheme, @@ -318,7 +364,7 @@ function _M.construct_upstream(instance) priority = instance.priority or 0, name = instance.name, } - upstream.nodes = {node} + upstream.nodes = {upstream_node} upstream.checks = instance.checks upstream._nodes_ver = instance._nodes_ver return upstream @@ -345,24 +391,28 @@ local function pick_target(ctx, conf, ups_tab) instances[i]._dns_value = instance._dns_value instances[i]._nodes_ver = instance._nodes_ver local checker = healthcheck_manager.fetch_checker(resource_path, resource_version) - checkers = checkers or {} - checkers[instance.name] = checker + if checker then + checkers = checkers or {} + checkers[instance.name] = checker + end end end + -- Use status_ver for cache key - ensures immediate refresh when health state changes local version = plugin.conf_version(conf) if checkers then local status_ver = get_checkers_status_ver(checkers) - version = version .. "#" .. status_ver + version = version .. "#s" .. status_ver end + -- Use LRU cache to reduce SHM access - cache refreshes when status_ver changes local server_picker = ctx.server_picker if not server_picker then server_picker = lrucache_server_picker(ctx.matched_route.key, version, create_server_picker, conf, ups_tab, checkers) end if not server_picker then - return nil, nil, "failed to fetch server picker" + return nil, nil, "failed to create server picker" end ctx.server_picker = server_picker diff --git a/test-nginx b/test-nginx new file mode 160000 index 000000000000..ed378bfe1170 --- /dev/null +++ b/test-nginx @@ -0,0 +1 @@ +Subproject commit ed378bfe1170e6b8f6c68cac3a83bbffc5e6a508 From a95c6e75fcfe69ad2342ffe5cd499f5ec58edde7 Mon Sep 17 00:00:00 2001 From: elizax Date: Wed, 4 Feb 2026 11:59:44 +0000 Subject: [PATCH 2/2] fix(ai-proxy-multi): health check not work