-
Notifications
You must be signed in to change notification settings - Fork 2.6k
refactor: add healthcheck manager to decouple upstream #12426
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 45 commits
72634ba
2e114fb
588ea22
275d179
6a169a3
40f5a79
40b0f36
4d3736a
be15b46
ef9ded6
f2485ff
d65748b
342674b
5dd2f5a
a0e0f3d
af9a26c
562105a
c3fbdf1
e551bbc
9f19c2c
45bba02
d962d76
8c3a884
c717105
b724093
6404912
413fdb2
a4ee844
31198de
0efb277
2588a40
c110c51
3de4f23
68796dc
3b94bd9
4621bed
47a04c6
cc363da
9288707
98a65e5
66fc383
9a56f59
f11e391
30f12a5
17b7397
15979b3
e725ad5
da27e15
4359182
1dca2f1
90e993b
0a4e23f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -27,7 +27,7 @@ local get_last_failure = balancer.get_last_failure | |
local set_timeouts = balancer.set_timeouts | ||
local ngx_now = ngx.now | ||
local str_byte = string.byte | ||
|
||
local healthcheck_manager = require("apisix.healthcheck_manager") | ||
|
||
local module_name = "balancer" | ||
local pickers = {} | ||
|
@@ -75,7 +75,8 @@ local function fetch_health_nodes(upstream, checker) | |
local port = upstream.checks and upstream.checks.active and upstream.checks.active.port | ||
local up_nodes = core.table.new(0, #nodes) | ||
for _, node in ipairs(nodes) do | ||
local ok, err = checker:get_target_status(node.host, port or node.port, host) | ||
local ok, err = healthcheck_manager.fetch_node_status(checker, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we can cache There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. do you mean using the lrucache with |
||
node.host, port or node.port, host) | ||
if ok then | ||
up_nodes = transform_node(up_nodes, node) | ||
elseif err then | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,6 +20,7 @@ local plugin = require("apisix.plugin") | |
local get_routes = require("apisix.router").http_routes | ||
local get_services = require("apisix.http.service").services | ||
local upstream_mod = require("apisix.upstream") | ||
local healthcheck_manager = require("apisix.healthcheck_manager") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It might make sense to put it in the same directory as the batch processor manager. The root directory isn't the right place for it. |
||
local get_upstreams = upstream_mod.upstreams | ||
local collectgarbage = collectgarbage | ||
local ipairs = ipairs | ||
|
@@ -66,14 +67,13 @@ function _M.schema() | |
return 200, schema | ||
end | ||
|
||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just a reminder, please try to avoid pointless formatting changes, this noise will affect review efficiency. |
||
local healthcheck | ||
local function extra_checker_info(value) | ||
if not healthcheck then | ||
healthcheck = require("resty.healthcheck") | ||
end | ||
|
||
local name = upstream_mod.get_healthchecker_name(value) | ||
local name = healthcheck_manager.get_healthchecker_name(value.value) | ||
local nodes, err = healthcheck.get_target_list(name, "upstream-healthcheck") | ||
if err then | ||
core.log.error("healthcheck.get_target_list failed: ", err) | ||
|
@@ -214,7 +214,6 @@ local function iter_and_find_healthcheck_info(values, src_type, src_id) | |
if not checks then | ||
return nil, str_format("no checker for %s[%s]", src_type, src_id) | ||
end | ||
|
||
local info = extra_checker_info(value) | ||
info.type = get_checker_type(checks) | ||
return info | ||
|
@@ -249,7 +248,6 @@ function _M.get_health_checker() | |
if not info then | ||
return 404, {error_msg = err} | ||
end | ||
|
||
local out, err = try_render_html({stats={info}}) | ||
if out then | ||
core.response.set_header("Content-Type", "text/html") | ||
|
@@ -266,9 +264,6 @@ local function iter_add_get_routes_info(values, route_id) | |
local infos = {} | ||
for _, route in core.config_util.iterate_values(values) do | ||
local new_route = core.table.deepcopy(route) | ||
if new_route.value.upstream and new_route.value.upstream.parent then | ||
new_route.value.upstream.parent = nil | ||
end | ||
-- remove healthcheck info | ||
new_route.checker = nil | ||
new_route.checker_idx = nil | ||
|
@@ -312,9 +307,6 @@ local function iter_add_get_upstream_info(values, upstream_id) | |
for _, upstream in core.config_util.iterate_values(values) do | ||
local new_upstream = core.table.deepcopy(upstream) | ||
core.table.insert(infos, new_upstream) | ||
if new_upstream.value and new_upstream.value.parent then | ||
new_upstream.value.parent = nil | ||
end | ||
-- check the upstream id | ||
if upstream_id and upstream.value.id == upstream_id then | ||
return new_upstream | ||
|
@@ -332,6 +324,7 @@ function _M.dump_all_upstreams_info() | |
return 200, infos | ||
end | ||
|
||
|
||
function _M.dump_upstream_info() | ||
local upstreams = get_upstreams() | ||
local uri_segs = core.utils.split_uri(ngx_var.uri) | ||
|
@@ -354,9 +347,6 @@ local function iter_add_get_services_info(values, svc_id) | |
local infos = {} | ||
for _, svc in core.config_util.iterate_values(values) do | ||
local new_svc = core.table.deepcopy(svc) | ||
if new_svc.value.upstream and new_svc.value.upstream.parent then | ||
new_svc.value.upstream.parent = nil | ||
end | ||
-- remove healthcheck info | ||
new_svc.checker = nil | ||
new_svc.checker_idx = nil | ||
|
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -0,0 +1,298 @@ | ||||||
-- | ||||||
-- Licensed to the Apache Software Foundation (ASF) under one or more | ||||||
-- contributor license agreements. See the NOTICE file distributed with | ||||||
-- this work for additional information regarding copyright ownership. | ||||||
-- The ASF licenses this file to You under the Apache License, Version 2.0 | ||||||
-- (the "License"); you may not use this file except in compliance with | ||||||
-- the License. You may obtain a copy of the License at | ||||||
-- | ||||||
-- http://www.apache.org/licenses/LICENSE-2.0 | ||||||
-- | ||||||
-- Unless required by applicable law or agreed to in writing, software | ||||||
-- distributed under the License is distributed on an "AS IS" BASIS, | ||||||
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||||
-- See the License for the specific language governing permissions and | ||||||
-- limitations under the License. | ||||||
-- | ||||||
local require = require | ||||||
local ipairs = ipairs | ||||||
local pcall = pcall | ||||||
local exiting = ngx.worker.exiting | ||||||
local pairs = pairs | ||||||
local tostring = tostring | ||||||
local core = require("apisix.core") | ||||||
local config_local = require("apisix.core.config_local") | ||||||
local healthcheck | ||||||
local events = require("apisix.events") | ||||||
local tab_clone = core.table.clone | ||||||
local timer_every = ngx.timer.every | ||||||
local _M = { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it is not safe to export We will never allow the "working_pool" or "waiting_pool" to be destroyed. they only can be use in this lua module |
||||||
working_pool = {}, -- resource_path -> {version = ver, checker = checker} | ||||||
waiting_pool = {} -- resource_path -> resource_ver | ||||||
} | ||||||
local DELAYED_CLEAR_TIMEOUT = 10 | ||||||
local healthcheck_shdict_name = "upstream-healthcheck" | ||||||
local is_http = ngx.config.subsystem == "http" | ||||||
if not is_http then | ||||||
healthcheck_shdict_name = healthcheck_shdict_name .. "-" .. ngx.config.subsystem | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Once again, is a separate shdict for stream a must? Can't it share data with http? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. removed |
||||||
end | ||||||
|
||||||
|
||||||
local function get_healthchecker_name(value) | ||||||
return "upstream#" .. (value.resource_key or value.upstream.resource_key) | ||||||
end | ||||||
_M.get_healthchecker_name = get_healthchecker_name | ||||||
|
||||||
|
||||||
local function fetch_latest_conf(resource_path) | ||||||
local resource_type, id | ||||||
-- Handle both formats: | ||||||
-- 1. /apisix/<resource_type>/<id> | ||||||
-- 2. /<resource_type>/<id> | ||||||
if resource_path:find("^/apisix/") then | ||||||
nic-chen marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
resource_type, id = resource_path:match("^/apisix/([^/]+)/([^/]+)$") | ||||||
else | ||||||
resource_type, id = resource_path:match("^/([^/]+)/([^/]+)$") | ||||||
end | ||||||
nic-6443 marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
if not resource_type or not id then | ||||||
core.log.error("invalid resource path: ", resource_path) | ||||||
return nil | ||||||
end | ||||||
|
||||||
local key | ||||||
if resource_type == "upstreams" then | ||||||
key = "/upstreams" | ||||||
elseif resource_type == "routes" then | ||||||
key = "/routes" | ||||||
elseif resource_type == "services" then | ||||||
key = "/services" | ||||||
elseif resource_type == "stream_routes" then | ||||||
key = "/stream_routes" | ||||||
else | ||||||
core.log.error("unsupported resource type: ", resource_type) | ||||||
return nil | ||||||
end | ||||||
|
||||||
local data = core.config.fetch_created_obj(key) | ||||||
if not data then | ||||||
core.log.error("failed to fetch configuration for type: ", key) | ||||||
return nil | ||||||
end | ||||||
local resource = data:get(id) | ||||||
if not resource then | ||||||
-- this can happen if the resource was deleted | ||||||
-- after the this function was called so we don't throw error | ||||||
core.log.warn("resource not found: ", id, " in ", key) | ||||||
Revolyssup marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
return nil | ||||||
Revolyssup marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
end | ||||||
|
||||||
return resource | ||||||
end | ||||||
|
||||||
|
||||||
local function create_checker(up_conf) | ||||||
local local_conf = config_local.local_conf() | ||||||
if local_conf and local_conf.apisix and local_conf.apisix.disable_upstream_healthcheck then | ||||||
core.log.info("healthchecker won't be created: disabled upstream healthcheck") | ||||||
return nil | ||||||
end | ||||||
core.log.info("creating healthchecker for upstream: ", up_conf.resource_key) | ||||||
if not healthcheck then | ||||||
healthcheck = require("resty.healthcheck") | ||||||
end | ||||||
|
||||||
local checker, err = healthcheck.new({ | ||||||
name = get_healthchecker_name(up_conf), | ||||||
shm_name = healthcheck_shdict_name, | ||||||
checks = up_conf.checks, | ||||||
events_module = events:get_healthcheck_events_modele(), | ||||||
}) | ||||||
|
||||||
if not checker then | ||||||
core.log.error("failed to create healthcheck: ", err) | ||||||
return nil | ||||||
end | ||||||
|
||||||
-- Add target nodes | ||||||
local host = up_conf.checks and up_conf.checks.active and up_conf.checks.active.host | ||||||
local port = up_conf.checks and up_conf.checks.active and up_conf.checks.active.port | ||||||
local up_hdr = up_conf.pass_host == "rewrite" and up_conf.upstream_host | ||||||
local use_node_hdr = up_conf.pass_host == "node" or nil | ||||||
|
||||||
for _, node in ipairs(up_conf.nodes) do | ||||||
local host_hdr = up_hdr or (use_node_hdr and node.domain) | ||||||
local ok, err = checker:add_target(node.host, port or node.port, host, | ||||||
true, host_hdr) | ||||||
if not ok then | ||||||
core.log.error("failed to add healthcheck target: ", node.host, ":", | ||||||
port or node.port, " err: ", err) | ||||||
end | ||||||
end | ||||||
|
||||||
return checker | ||||||
end | ||||||
|
||||||
|
||||||
function _M.fetch_checker(resource_path, resource_ver) | ||||||
local working_item = _M.working_pool[resource_path] | ||||||
if working_item and working_item.version == resource_ver then | ||||||
return working_item.checker | ||||||
end | ||||||
|
||||||
if _M.waiting_pool[resource_path] == resource_ver then | ||||||
return nil | ||||||
end | ||||||
|
||||||
-- Add to waiting pool with version | ||||||
core.log.info("adding ", resource_path, " to waiting pool with version: ", resource_ver) | ||||||
_M.waiting_pool[resource_path] = resource_ver | ||||||
return nil | ||||||
end | ||||||
|
||||||
|
||||||
function _M.fetch_node_status(checker, ip, port, hostname) | ||||||
-- check if the checker is valid | ||||||
if not checker or checker.dead then | ||||||
return true | ||||||
end | ||||||
|
||||||
return checker:get_target_status(ip, port, hostname) | ||||||
end | ||||||
|
||||||
|
||||||
local function add_working_pool(resource_path, resource_ver, checker) | ||||||
_M.working_pool[resource_path] = { | ||||||
version = resource_ver, | ||||||
checker = checker | ||||||
} | ||||||
end | ||||||
|
||||||
local function find_in_working_pool(resource_path, resource_ver) | ||||||
local checker = _M.working_pool[resource_path] | ||||||
if not checker then | ||||||
return nil -- not found | ||||||
end | ||||||
|
||||||
if checker.version ~= resource_ver then | ||||||
core.log.info("version mismatch for resource: ", resource_path, | ||||||
" current version: ", checker.version, " requested version: ", resource_ver) | ||||||
return nil -- version not match | ||||||
end | ||||||
return checker | ||||||
end | ||||||
|
||||||
|
||||||
function _M.upstream_version(index, nodes_ver) | ||||||
if not index then | ||||||
return | ||||||
end | ||||||
return index .. tostring(nodes_ver or '') | ||||||
end | ||||||
|
||||||
|
||||||
function _M.timer_create_checker() | ||||||
if core.table.nkeys(_M.waiting_pool) == 0 then | ||||||
return | ||||||
end | ||||||
|
||||||
local waiting_snapshot = tab_clone(_M.waiting_pool) | ||||||
for resource_path, resource_ver in pairs(waiting_snapshot) do | ||||||
do | ||||||
if find_in_working_pool(resource_path, resource_ver) then | ||||||
core.log.info("resource: ", resource_path, | ||||||
" already in working pool with version: ", | ||||||
resource_ver) | ||||||
goto continue | ||||||
end | ||||||
local res_conf = fetch_latest_conf(resource_path) | ||||||
if not res_conf then | ||||||
goto continue | ||||||
end | ||||||
local upstream = res_conf.value.upstream or res_conf.value | ||||||
nic-chen marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
local new_version = _M.upstream_version(res_conf.modifiedIndex, upstream._nodes_ver) | ||||||
core.log.info("checking waiting pool for resource: ", resource_path, | ||||||
" current version: ", new_version, " requested version: ", resource_ver) | ||||||
if resource_ver ~= new_version then | ||||||
goto continue | ||||||
end | ||||||
|
||||||
-- if a checker exists then delete it before creating a new one | ||||||
local existing_checker = _M.working_pool[resource_path] | ||||||
if existing_checker then | ||||||
existing_checker.checker:delayed_clear(10) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The magic number 10 (seconds) for delayed_clear should be defined as a named constant to improve maintainability and make the cleanup timeout configurable.
Suggested change
Copilot uses AI. Check for mistakes. Positive FeedbackNegative Feedback There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. pls remove this magic number |
||||||
existing_checker.checker:stop() | ||||||
core.log.info("releasing existing checker: ", tostring(existing_checker.checker)) | ||||||
membphis marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
end | ||||||
local checker = create_checker(upstream) | ||||||
if not checker then | ||||||
goto continue | ||||||
end | ||||||
core.log.info("create new checker: ", tostring(checker)) | ||||||
add_working_pool(resource_path, resource_ver, checker) | ||||||
end | ||||||
|
||||||
::continue:: | ||||||
_M.waiting_pool[resource_path] = nil | ||||||
end | ||||||
end | ||||||
|
||||||
|
||||||
function _M.timer_working_pool_check() | ||||||
if core.table.nkeys(_M.working_pool) == 0 then | ||||||
return | ||||||
end | ||||||
|
||||||
local working_snapshot = tab_clone(_M.working_pool) | ||||||
for resource_path, item in pairs(working_snapshot) do | ||||||
--- remove from working pool if resource doesn't exist | ||||||
membphis marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
local res_conf = fetch_latest_conf(resource_path) | ||||||
if not res_conf or not res_conf.value then | ||||||
membphis marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
item.checker:delayed_clear(DELAYED_CLEAR_TIMEOUT) | ||||||
item.checker:stop() | ||||||
core.log.info("try to release checker: ", tostring(item.checker)) | ||||||
_M.working_pool[resource_path] = nil | ||||||
goto continue | ||||||
end | ||||||
Revolyssup marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
local current_ver = _M.upstream_version(res_conf.modifiedIndex, | ||||||
res_conf.value._nodes_ver) | ||||||
core.log.info("checking working pool for resource: ", resource_path, | ||||||
" current version: ", current_ver, " item version: ", item.version) | ||||||
if item.version ~= current_ver then | ||||||
item.checker:delayed_clear(DELAYED_CLEAR_TIMEOUT) | ||||||
item.checker:stop() | ||||||
core.log.info("try to release checker: ", tostring(item.checker)) | ||||||
_M.working_pool[resource_path] = nil | ||||||
end | ||||||
|
||||||
::continue:: | ||||||
end | ||||||
end | ||||||
|
||||||
function _M.init_worker() | ||||||
local timer_create_checker_running = false | ||||||
local timer_working_pool_check_running = false | ||||||
timer_every(1, function () | ||||||
if not exiting() then | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why not use the |
||||||
if timer_create_checker_running then | ||||||
core.log.warn("timer_create_checker is already running, skipping this iteration") | ||||||
return | ||||||
end | ||||||
timer_create_checker_running = true | ||||||
pcall(_M.timer_create_checker) | ||||||
membphis marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
timer_create_checker_running = false | ||||||
end | ||||||
end) | ||||||
timer_every(1, function () | ||||||
if not exiting() then | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ditto |
||||||
if timer_working_pool_check_running then | ||||||
core.log.warn("timer_working_pool_check is already running skipping iteration") | ||||||
return | ||||||
end | ||||||
timer_working_pool_check_running = true | ||||||
pcall(_M.timer_working_pool_check) | ||||||
membphis marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
timer_working_pool_check_running = false | ||||||
end | ||||||
end) | ||||||
end | ||||||
|
||||||
return _M |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.