diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index e15c86e..8ef210a 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -22,7 +22,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - version: ["1.10.15", "2.8.4", "2.10.6", "2.10.7-gc64-amd64", "2.11.0", "2.11.1"] + version: ["1.10.15", "2.8.4", "2.10.6", "2.10.7-gc64-amd64", "2.11.0", "2.11.1", "2.11.2"] steps: - uses: actions/checkout@master - uses: docker/setup-buildx-action@v2 diff --git a/config.lua b/config.lua index 3541f8f..a24802e 100644 --- a/config.lua +++ b/config.lua @@ -9,6 +9,7 @@ local yaml = require 'yaml'.new() local digest = require 'digest' local fiber = require 'fiber' local clock = require 'clock' +local uri = require 'uri' json.cfg{ encode_invalid_as_nil = true } yaml.cfg{ encode_use_tostring = true } @@ -590,6 +591,11 @@ local function etcd_load( M, etcd_conf, local_cfg ) etcd:discovery() local all_cfg = etcd:get_all() + + function M.etcd.get_all_cached() + return all_cfg + end + if etcd_conf.print_config then print("Loaded config from etcd",yaml.encode(all_cfg)) end @@ -599,7 +605,9 @@ local function etcd_load( M, etcd_conf, local_cfg ) local instance_cfg = all_instances_cfg[instance_name] assert(instance_cfg,"Instance name "..instance_name.." is not known to etcd") - local all_clusters_cfg = all_cfg.clusters or all_cfg.shards + all_cfg.clusters = all_cfg.clusters or all_cfg.shards + all_cfg.shards = nil + local all_clusters_cfg = all_cfg.clusters local master_selection_policy local cluster_cfg @@ -810,6 +818,62 @@ local M } end, _load_cfg = load_cfg, + sharding = function() + local all_cfg = M.etcd.get_all_cached() + + -- common vshard cfg + local cfg = table.deepcopy(all_cfg.common.vshard or {}) + + local creds = M.get('credentials.sharding', {}) + if type(creds) ~= 'table' then + creds = {} + end + -- default sharding creds are guest:"" + creds.login = creds.login or 'guest' + creds.password = creds.password or '' + + local rebalancer_name = cfg.rebalancer + cfg.rebalancer = nil + + local sharding = {} + -- construct sharding table + for rs_name, replicaset in pairs(all_cfg.clusters) do + local replicas = {} + for instance_name, instance in pairs(all_cfg.instances) do + if instance.cluster == rs_name then + local remote = assert(uri.parse(instance.box.remote_addr or instance.box.listen)) + remote.login = creds.login + remote.password = creds.password + local replica_zone = nil + if type(instance.vshard) == 'table' then + replica_zone = instance.vshard.zone -- can be nil + end + replicas[instance.box.instance_uuid] = { + name = instance_name, + uuid = instance.box.uuid, + uri = uri.format(remote, true), + master = replicaset.master == instance_name, + zone = replica_zone, + } + end + end + local rs = {} + rs.replicas = replicas + if type(replicaset.vshard) == 'table' then + rs.weight = replicaset.vshard.weight + rs.lock = replicaset.vshard.lock + end + + if rebalancer_name then + rs.rebalancer = rebalancer_name == rs_name + end + + sharding[replicaset.replicaset_uuid] = rs + end + + -- all other fields from common/vshard are left without any changes + return sharding + end, },{ ---Reinitiates moonlibs.config ---@param args moonlibs.config.opts diff --git a/run_test_in_docker.sh b/run_test_in_docker.sh index d8bd305..761709a 100755 --- a/run_test_in_docker.sh +++ b/run_test_in_docker.sh @@ -3,4 +3,4 @@ pwd rm -rf /root/.cache/ cp -ar /root/.rocks /source/config/ -/source/config/.rocks/bin/luatest --coverage -v spec/ +/source/config/.rocks/bin/luatest --coverage -vv spec/ diff --git a/spec/01_single_test.lua b/spec/01_single_test.lua index 52020c4..445b637 100644 --- a/spec/01_single_test.lua +++ b/spec/01_single_test.lua @@ -1,5 +1,4 @@ local t = require 'luatest' --[[@as luatest]] -local uri = require 'uri' ---@class test.config.single:luatest.group local g = t.group('single', { diff --git a/spec/03_cluster_vshard_test.lua b/spec/03_cluster_vshard_test.lua new file mode 100644 index 0000000..03d2c86 --- /dev/null +++ b/spec/03_cluster_vshard_test.lua @@ -0,0 +1,340 @@ +local t = require 'luatest' --[[@as luatest]] +local uuid = require 'uuid' +local fiber = require 'fiber' + +---@class test.config.master:luatest.group +local g = t.group('vshard', { + { + cluster = 'vshard', + instances = { + first_01_1 = '127.0.0.1:3311', first_01_2 = '127.0.0.1:3312', + first_02_1 = '127.0.0.1:3321', first_02_2 = '127.0.0.1:3322', + proxy_01 = '127.0.0.1:4001', + proxy_02 = '127.0.0.1:4002', + }, + shards = { + first_01_1 = 'first_01', + first_01_2 = 'first_01', + first_02_1 = 'first_02', + first_02_2 = 'first_02', + }, + has_router = { + proxy_01 = true, + proxy_02 = true, + }, + run = {'first_01_1', 'first_01_2', 'first_02_1', 'first_02_2', 'proxy_01', 'proxy_02'} + }, +}) + +local this_file = debug.getinfo(1, "S").source:sub(2) +local fio = require 'fio' + +local root = fio.dirname(this_file) +local init_lua = fio.pathjoin(root, 'mock', 'vshard', 'init.lua') + +local base_env + +local h = require 'spec.helper' + +---@class moonlibs.config.test.tarantool +---@field server luatest.server +---@field net_box_port number +---@field env table +---@field name string + +---@class moonlibs.config.test.context +---@field tts moonlibs.config.test.tarantool[] +---@field env table +---@field etcd_config table +---@field params table + +---@type table +local test_ctx = {} + +g.before_each(function(cg) + local working_dir = h.create_workdir() + base_env = { + TT_ETCD_PREFIX = '/apps/vshard', + TT_CONFIG = fio.pathjoin(root, 'mock', 'vshard', 'conf.lua'), + TT_MASTER_SELECTION_POLICY = 'etcd.cluster.vshard', + TT_ETCD_ENDPOINTS = os.getenv('TT_ETCD_ENDPOINTS') or "http://127.0.0.1:2379", + } + + base_env.TT_WAL_DIR = working_dir + base_env.TT_MEMTX_DIR = working_dir + + local base_config = { + apps = { + vshard = { + common = { + etcd = { fencing_enabled = true }, + box = { log_level = 5 }, + }, + clusters = { + first_01 = { + master = 'first_01_1', + replicaset_uuid = uuid.str(), + }, + first_02 = { + master = 'first_02_1', + replicaset_uuid = uuid.str(), + }, + }, + } + }, + } + h.clear_etcd() + + local etcd_config = table.deepcopy(base_config) + etcd_config.apps.vshard.instances = {} + for instance_name, listen_uri in pairs(cg.params.instances) do + etcd_config.apps.vshard.instances[instance_name] = { + box = { listen = listen_uri, instance_uuid = uuid.str() }, + cluster = cg.params.shards[instance_name], + router = cg.params.has_router[instance_name], + } + end + + local this_ctx = { tts = {}, env = base_env, etcd_config = etcd_config, params = cg.params } + test_ctx[cg.name] = this_ctx + + h.upload_to_etcd(etcd_config) +end) + +g.after_each(function() + for _, info in pairs(test_ctx) do + for _, tt in pairs(info.tts) do + tt.server:stop() + end + -- h.clean_directory(info.env.TT_WAL_DIR) + -- h.clean_directory(info.env.TT_MEMTX_DIR) + end + + h.clear_etcd() +end) + +function g.test_run_instances(cg) + local ctx = test_ctx[cg.name] + + -- Start tarantools + h.start_all_tarantools(ctx, init_lua, root, ctx.etcd_config.apps.vshard.instances) + + -- Check configuration + for _, tnt in ipairs(ctx.tts) do + tnt.server:connect_net_box() + local box_cfg = tnt.server:get_box_cfg() + + local is_ro = false + local listen = ctx.etcd_config.apps.vshard.instances[tnt.name].box.listen + if cg.params.shards[tnt.name] then + is_ro = ctx.etcd_config.apps.vshard.clusters[cg.params.shards[tnt.name]].master ~= tnt.name + listen = 'guest:@'..listen + end + t.assert_covers(box_cfg, { + log_level = ctx.etcd_config.apps.vshard.common.box.log_level, + listen = listen, + read_only = is_ro, + }, 'box.cfg is correct') + + local conn = tnt.server --[[@as luatest.server]] + local ret = conn:exec(function() + local r = table.deepcopy(config.get('sys')) + for k, v in pairs(r) do + if type(v) == 'function' then + r[k] = nil + end + end + return r + end) + + t.assert_covers(ret, { + instance_name = tnt.name, + master_selection_policy = 'etcd.cluster.vshard', + file = base_env.TT_CONFIG, + }, 'get("sys") has correct fields') + end + + -- restart+check configuration + for _, tt in ipairs(ctx.tts) do + h.restart_tarantool(tt.server) + + local box_cfg = tt.server:get_box_cfg() + local is_ro = false + local listen = ctx.etcd_config.apps.vshard.instances[tt.name].box.listen + if cg.params.shards[tt.name] then + is_ro = ctx.etcd_config.apps.vshard.clusters[cg.params.shards[tt.name]].master ~= tt.name + listen = 'guest:@'..listen + end + + t.assert_covers(box_cfg, { + log_level = ctx.etcd_config.apps.vshard.common.box.log_level, + listen = listen, + read_only = is_ro, + }, 'box.cfg is correct after restart') + + local ret = tt.server:exec(function() + local r = table.deepcopy(config.get('sys')) + for k, v in pairs(r) do + if type(v) == 'function' then + r[k] = nil + end + end + return r + end) + + t.assert_covers(ret, { + instance_name = tt.name, + master_selection_policy = 'etcd.cluster.vshard', + file = base_env.TT_CONFIG, + }, 'get("sys") has correct fields after restart') + + local vshard_cfg = tt.server:exec(function() + local r = table.deepcopy(config.sharding()) + for k, v in pairs(r) do + if type(v) == 'function' then + r[k] = nil + end + end + return r + end) + require'log'.info("vshard: %s", require'json'.encode(vshard_cfg)) + end +end + +function g.test_reload(cg) + local ctx = test_ctx[cg.name] + + -- Start tarantools + h.start_all_tarantools(ctx, init_lua, root, ctx.etcd_config.apps.vshard.instances) + + -- reload+check configuration + for _, tt in ipairs(ctx.tts) do + h.reload_tarantool(tt.server) + + local box_cfg = tt.server:get_box_cfg() + local is_ro = false + local listen = ctx.etcd_config.apps.vshard.instances[tt.name].box.listen + if cg.params.shards[tt.name] then + is_ro = ctx.etcd_config.apps.vshard.clusters[cg.params.shards[tt.name]].master ~= tt.name + listen = 'guest:@'..listen + end + + t.assert_covers(box_cfg, { + log_level = ctx.etcd_config.apps.vshard.common.box.log_level, + listen = listen, + read_only = is_ro, + }, 'box.cfg is correct after restart') + + local ret = tt.server:exec(function() + local r = table.deepcopy(config.get('sys')) + for k, v in pairs(r) do + if type(v) == 'function' then + r[k] = nil + end + end + return r + end) + + t.assert_covers(ret, { + instance_name = tt.name, + master_selection_policy = 'etcd.cluster.vshard', + file = base_env.TT_CONFIG, + }, 'get("sys") has correct fields after restart') + end +end + +function g.test_fencing(cg) + local ctx = test_ctx[cg.name] + t.skip_if(not ctx.etcd_config.apps.vshard.common.etcd.fencing_enabled, "fencing disabled") + + t.skip("no fencing for vshard") + + -- Start tarantools + h.start_all_tarantools(ctx, init_lua, root, ctx.etcd_config.apps.vshard.instances) + + -- Check configuration + for _, tnt in ipairs(ctx.tts) do + tnt.server:connect_net_box() + local box_cfg = tnt.server:get_box_cfg() + t.assert_covers(box_cfg, { + log_level = ctx.etcd_config.apps.vshard.common.box.log_level, + listen = 'guest:@'..ctx.etcd_config.apps.vshard.instances[tnt.name].box.listen, + read_only = ctx.etcd_config.apps.vshard.clusters[cg.params.shards[tnt.name]].master ~= tnt.name, + }, 'box.cfg is correct') + + local conn = tnt.server --[[@as luatest.server]] + local ret = conn:exec(function() + local r = table.deepcopy(config.get('sys')) + for k, v in pairs(r) do + if type(v) == 'function' then + r[k] = nil + end + end + return r + end) + + t.assert_covers(ret, { + instance_name = tnt.name, + master_selection_policy = 'etcd.cluster.vshard', + file = base_env.TT_CONFIG, + }, 'get("sys") has correct fields') + end + + local master_name = ctx.params.master + + ---@type moonlibs.config.test.tarantool + local master + for _, tt in ipairs(ctx.tts) do + if tt.name == master_name then + master = tt + break + end + end + + t.assert(master, 'master is not connected') + + local ret = master.server:exec(function() + return { cfg_ro = box.cfg.read_only, ro = box.info.ro } + end) + + t.assert_equals(ret.cfg_ro, false, 'box.cfg.read_only == false (before fencing)') + t.assert_equals(ret.ro, false, 'box.info.ro == false (before fencing)') + + ctx.etcd_config.apps.vshard.clusters.single.master = 'not_exists' + h.upload_to_etcd(ctx.etcd_config) + + local fencing_cfg = ctx.etcd_config.apps.vshard.common.etcd + local fencing_timeout = fencing_cfg.fencing_timeout or 10 + local fencing_pause = fencing_cfg.fencing_pause or fencing_timeout/2 + + t.helpers.retrying({ + timeout = fencing_pause, + delay = 0.1, + }, function () + local ret = master.server:exec(function() + return { cfg_ro = box.cfg.read_only, ro = box.info.ro } + end) + assert(ret.cfg_ro, "cfg.read_only must be true") + assert(ret.ro, "info.ro must be true") + end) + + ret = master.server:exec(function() + return { cfg_ro = box.cfg.read_only, ro = box.info.ro } + end) + + t.assert_equals(ret.cfg_ro, true, 'box.cfg.read_only == true') + t.assert_equals(ret.ro, true, 'box.info.ro == true') + + ctx.etcd_config.apps.vshard.clusters.single.master = master_name + h.upload_to_etcd(ctx.etcd_config) + + local deadline = 2*fencing_timeout+fiber.time() + while fiber.time() < deadline do + local ret2 = master.server:exec(function() + return { cfg_ro = box.cfg.read_only, ro = box.info.ro } + end) + + t.assert_equals(ret2.cfg_ro, true, 'box.cfg.read_only == true (double check)') + t.assert_equals(ret2.ro, true, 'box.info.ro == true (double check)') + end +end diff --git a/spec/helper.lua b/spec/helper.lua index c66bf92..e423868 100644 --- a/spec/helper.lua +++ b/spec/helper.lua @@ -60,7 +60,7 @@ function h.get_etcd() debug = true, } - t.helpers.retrying({ timeout = 5 }, function() etcd:discovery() end) + t.helpers.retrying({ timeout = 15 }, function() etcd:discovery() end) return etcd end diff --git a/spec/mock/single/conf.lua b/spec/mock/single/conf.lua index 41fe7d9..e672749 100644 --- a/spec/mock/single/conf.lua +++ b/spec/mock/single/conf.lua @@ -10,4 +10,5 @@ etcd = { box = { wal_dir = os.getenv('TT_WAL_DIR') ..'/' .. instance_name, memtx_dir = os.getenv('TT_MEMTX_DIR') .. '/' .. instance_name, + log = os.getenv('TT_MEMTX_DIR') .. '/' .. instance_name .. '.log', } diff --git a/spec/mock/vshard/conf.lua b/spec/mock/vshard/conf.lua new file mode 100644 index 0000000..e672749 --- /dev/null +++ b/spec/mock/vshard/conf.lua @@ -0,0 +1,14 @@ +local tt_etcd_endpoints = assert(os.getenv('TT_ETCD_ENDPOINTS')) +local endpoints = tt_etcd_endpoints:gsub(',+', ','):gsub(',$',''):split(',') + +etcd = { + instance_name = instance_name, + endpoints = endpoints, + prefix = os.getenv('TT_ETCD_PREFIX'), +} + +box = { + wal_dir = os.getenv('TT_WAL_DIR') ..'/' .. instance_name, + memtx_dir = os.getenv('TT_MEMTX_DIR') .. '/' .. instance_name, + log = os.getenv('TT_MEMTX_DIR') .. '/' .. instance_name .. '.log', +} diff --git a/spec/mock/vshard/init.lua b/spec/mock/vshard/init.lua new file mode 100644 index 0000000..92b3a23 --- /dev/null +++ b/spec/mock/vshard/init.lua @@ -0,0 +1,35 @@ +#!/usr/bin/env tarantool +local vshard = require 'vshard' +rawset(_G, 'vshard', vshard) + +require 'package.reload' +require 'config' { + mkdir = true, + print_config = true, + instance_name = os.getenv('TT_INSTANCE_NAME'), + file = os.getenv('TT_CONFIG'), + master_selection_policy = os.getenv('TT_MASTER_SELECTION_POLICY'), + on_after_cfg = function(_,cfg) + if cfg.cluster then + vshard.storage.cfg({ + bucket_count = config.get('vshard.bucket_count'), + sharding = config.sharding(), + zone = config.get('vshard.zone'), + weights = config.get('vshard.weights'), + rebalancer_max_receiving = config.get('vshard.rebalancer_max_receiving'), + rebalancer_max_sending = config.get('vshard.rebalancer_max_sending'), + sync_timeout = config.get('vshard.sync_timeout'), + discovery_mode = config.get('vshard.discovery_mode'), + identification_mode = 'uuid_as_key', + }, box.info.uuid) + end + if cfg.router then + vshard.router.cfg({ + bucket_count = config.get('vshard.bucket_count'), + sharding = config.sharding(), + zone = config.get('vshard.zone'), + zone_weights = config.get('vshard.zone_weights'), + }) + end + end, +}