Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 65 additions & 2 deletions apisix/discovery/kubernetes/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,11 @@ end


local function post_list(handle)
local _, err = handle.endpoint_dict:safe_set("discovery_ready",true)
if err then
core.log.error("set discovery_ready flag into discovery DICT failed, ", err)
end

if not handle.existing_keys or not handle.current_keys_hash then
return
end
Expand Down Expand Up @@ -436,7 +441,8 @@ local function start_fetch(handle)
ngx.timer.at(0, timer_runner)
end

local function get_endpoint_dict(id)

local function get_endpoint_dict_name(id)
local shm = "kubernetes"

if id and #id > 0 then
Expand All @@ -446,8 +452,13 @@ local function get_endpoint_dict(id)
if not is_http then
shm = shm .. "-stream"
end
return shm
end

return ngx.shared[shm]

local function get_endpoint_dict(id)
local dict_name = get_endpoint_dict_name(id)
return ngx.shared[dict_name]
end


Expand Down Expand Up @@ -684,6 +695,7 @@ local function dump_endpoints_from_dict(endpoint_dict)
return endpoints
end


function _M.dump_data()
local discovery_conf = local_conf.discovery.kubernetes
local eps = {}
Expand Down Expand Up @@ -715,4 +727,55 @@ function _M.dump_data()
end


local function check_ready(id)
local endpoint_dict = get_endpoint_dict(id)
if not endpoint_dict then
core.log.error("failed to get lua_shared_dict:", get_endpoint_dict_name(id),
", please check your APISIX version")
return false, "failed to get lua_shared_dict: " .. get_endpoint_dict_name(id)
.. ", please check your APISIX version"
end
-- check flag
local ready = endpoint_dict:get("discovery_ready")
if not ready then
core.log.warn("kubernetes discovery not ready")
return false, "kubernetes discovery not ready"
end
return true
end


local function single_mode_check_discovery_ready()
local _, err = check_ready()
if err then
return false, err
end
return true
end


local function multiple_mode_check_discovery_ready(confs)
for _, conf in ipairs(confs) do
local _, err = check_ready(conf.id)
if err then
return false, err
end
end
return true
end


function _M.check_discovery_ready()
local discovery_conf = local_conf.discovery and local_conf.discovery.kubernetes
if not discovery_conf then
return true
end
if #discovery_conf == 0 then
return single_mode_check_discovery_ready()
else
return multiple_mode_check_discovery_ready(discovery_conf)
end
end


return _M
101 changes: 60 additions & 41 deletions apisix/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ local debug = require("apisix.debug")
local pubsub_kafka = require("apisix.pubsub.kafka")
local resource = require("apisix.resource")
local trusted_addresses_util = require("apisix.utils.trusted-addresses")
local discovery = require("apisix.discovery.init").discovery
local ngx = ngx
local get_method = ngx.req.get_method
local ngx_exit = ngx.exit
Expand Down Expand Up @@ -121,7 +122,6 @@ function _M.http_init_worker()

core.lrucache.init_worker()

local discovery = require("apisix.discovery.init").discovery
if discovery and discovery.init_worker then
discovery.init_worker()
end
Expand Down Expand Up @@ -975,54 +975,74 @@ function _M.status()
core.response.exit(200, core.json.encode({ status = "ok" }))
end

function _M.status_ready()
local local_conf = core.config.local_conf()
local role = core.table.try_read_attr(local_conf, "deployment", "role")
local provider = core.table.try_read_attr(local_conf, "deployment", "role_" ..
role, "config_provider")
if provider == "yaml" or provider == "etcd" then
local status_shdict = ngx.shared["status-report"]
local ids = status_shdict:get_keys()
local error
local worker_count = ngx.worker.count()
if #ids ~= worker_count then
core.log.warn("worker count: ", worker_count, " but status report count: ", #ids)
error = "worker count: " .. ngx.worker.count() ..
" but status report count: " .. #ids
end
if error then
core.response.exit(503, core.json.encode({
status = "error",
error = error
}))
return
end
for _, id in ipairs(ids) do
local ready = status_shdict:get(id)

local function discovery_ready_check()
local discovery_type = local_conf.discovery
if not discovery_type then
return true
end
for discovery_name, _ in pairs(discovery_type) do
local dis_module = discovery[discovery_name]
if dis_module.check_discovery_ready then
local ready, message = dis_module.check_discovery_ready()
if not ready then
core.log.warn("worker id: ", id, " has not received configuration")
error = "worker id: " .. id ..
" has not received configuration"
break
return false, message
end
end
end
return true
end

if error then
core.response.exit(503, core.json.encode({
status = "error",
error = error
}))
return
local function config_ready_check()
local role = core.table.try_read_attr(local_conf, "deployment", "role")
local provider = core.table.try_read_attr(local_conf, "deployment", "role_" ..
role, "config_provider")
if provider ~= "yaml" and provider ~= "etcd" then
return false, "unknown config provider: " .. tostring(provider)
end

local status_shdict = ngx.shared["status-report"]
local ids = status_shdict:get_keys()

local worker_count = ngx.worker.count()
if #ids ~= worker_count then
local error = "worker count: " .. worker_count .. " but status report count: " .. #ids
core.log.warn(error)
return false, error
end
for _, id in ipairs(ids) do
local ready = status_shdict:get(id)
if not ready then
local error = "worker id: " .. id .. " has not received configuration"
core.log.warn(error)
return false, error
end
end

return true
end

core.response.exit(200, core.json.encode({ status = "ok" }))
function _M.status_ready()
local ready, message = config_ready_check()
if not ready then
core.response.exit(503, core.json.encode({
status = "error",
error = message
}))
return
end

core.response.exit(503, core.json.encode({
status = "error",
message = "unknown config provider: " .. tostring(provider)
}), { ["Content-Type"] = "application/json" })
ready, message = discovery_ready_check()
if not ready then
core.response.exit(503, core.json.encode({
status = "error",
error = message
}))
return
end

core.response.exit(200, core.json.encode({ status = "ok" }))
return
end


Expand Down Expand Up @@ -1181,7 +1201,6 @@ function _M.stream_init_worker()
-- for admin api of standalone mode, we need to startup background timer and patch schema etc.
require("apisix.admin.init").init_worker()

local discovery = require("apisix.discovery.init").discovery
if discovery and discovery.init_worker then
discovery.init_worker()
end
Expand Down
94 changes: 87 additions & 7 deletions t/kubernetes/discovery/kubernetes3.t
Original file line number Diff line number Diff line change
Expand Up @@ -242,10 +242,22 @@ _EOC_
}
}

location /t {
location /ready_check {
content_by_lua_block {
ngx.sleep(2)
ngx.exit(200)
local http = require("resty.http")
local healthcheck_uri = "http://127.0.0.1:7085" .. "/status/ready"
for i = 1, 4 do
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a comment explaining the rationale for selecting the number of retries.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

APISIX takes time to load data from the Kubernetes API server. In scenarios with small amounts of data, it may complete in less than 1 second. The maximum wait time I set, 5 seconds, is solely to account for network fluctuations.

local httpc = http.new()
local res, _ = httpc:request_uri(healthcheck_uri, {method = "GET", keepalive = false})
if res.status == 200 then
ngx.status = res.status
return
end
ngx.sleep(1)
end
local httpc = http.new()
local res, _ = httpc:request_uri(healthcheck_uri, {method = "GET", keepalive = false})
ngx.status = res.status
}
}

Expand Down Expand Up @@ -516,7 +528,7 @@ GET /dump
core.log.error("set dirty_key to dict fail, err: ", err)
end
--- request
GET /t
GET /ready_check
--- no_error_log
[error]
--- grep_error_log eval
Expand All @@ -539,7 +551,7 @@ kubernetes discovery module find dirty data in shared dict
core.log.error("set dirty_key to dict fail, err: ", err)
end
--- request
GET /t
GET /ready_check
--- no_error_log
[error]
--- grep_error_log eval
Expand Down Expand Up @@ -576,7 +588,7 @@ discovery:
core.log.error("set dirty_key to dict fail, err: ", err)
end
--- request
GET /t
GET /ready_check
--- no_error_log
[error]
--- grep_error_log eval
Expand Down Expand Up @@ -622,10 +634,78 @@ discovery:
core.log.error("set dirty_key to dict fail, err: ", err)
end
--- request
GET /t
GET /ready_check
--- no_error_log
[error]
--- grep_error_log eval
qr/kubernetes discovery module find dirty data in shared dict/
--- grep_error_log_out
kubernetes discovery module find dirty data in shared dict



=== TEST 11: test healthcheck unready
--- log_level: warn
--- yaml_config
apisix:
node_listen: 1984
deployment:
role: data_plane
role_data_plane:
config_provider: yaml
discovery:
kubernetes:
- id: first
service:
host: "127.0.0.1"
port: "6443"
client:
token_file: "/tmp/var/run/secrets/kubernetes.io/serviceaccount/token"
watch_endpoint_slices: false
- id: second
service:
schema: "http"
host: "127.0.0.1"
port: "6446"
client:
token_file: "/tmp/var/run/secrets/kubernetes.io/serviceaccount/token"
watch_endpoint_slices: false
--- request
GET /ready_check
--- error_code: 503
--- grep_error_log eval
qr/connect apiserver failed/
--- grep_error_log_out
connect apiserver failed



=== TEST 12: test healthcheck ready
--- log_level: warn
--- yaml_config
apisix:
node_listen: 1984
deployment:
role: data_plane
role_data_plane:
config_provider: yaml
discovery:
kubernetes:
- id: first
service:
host: "127.0.0.1"
port: "6443"
client:
token_file: "/tmp/var/run/secrets/kubernetes.io/serviceaccount/token"
watch_endpoint_slices: false
- id: second
service:
schema: "http"
host: "127.0.0.1"
port: "6445"
client:
token_file: "/tmp/var/run/secrets/kubernetes.io/serviceaccount/token"
watch_endpoint_slices: false
--- request
GET /ready_check
--- error_code: 200
Loading