diff --git a/Makefile b/Makefile index c9d39ee9d1..ad79712812 100644 --- a/Makefile +++ b/Makefile @@ -124,7 +124,7 @@ UT_PACKAGES_DISPATCHER := ./pkg/sink/cloudstorage/... ./pkg/sink/mysql/... ./pkg UT_PACKAGES_MAINTAINER := ./maintainer/... ./pkg/scheduler/... UT_PACKAGES_COORDINATOR := ./coordinator/... UT_PACKAGES_LOGSERVICE := ./logservice/... -UT_PACKAGES_OTHERS := ./pkg/eventservice/... ./pkg/version/... ./utils/dynstream/... ./pkg/common/event/... ./pkg/common/... ./api/middleware/... +UT_PACKAGES_OTHERS := ./pkg/eventservice/... ./pkg/version/... ./utils/dynstream/... ./pkg/common/event/... ./pkg/common/... ./api/middleware/... ./pkg/keyspace/... include tools/Makefile @@ -281,6 +281,7 @@ unit_test_in_verify_ci_next_gen: check_failpoint_ctl tools/bin/gotestsum tools/b tools/bin/gocov convert "$(TEST_DIR)/cov.unit.out" | tools/bin/gocov-xml > cdc-coverage.xml $(FAILPOINT_DISABLE) +# Usage: PKG=./api/middleware/... make unit_test_pkg unit_test_pkg: check_failpoint_ctl tools/bin/gotestsum tools/bin/gocov tools/bin/gocov-xml mkdir -p "$(TEST_DIR)" $(FAILPOINT_ENABLE) @@ -294,6 +295,7 @@ unit_test_pkg: check_failpoint_ctl tools/bin/gotestsum tools/bin/gocov tools/bin tools/bin/gocov convert "$(TEST_DIR)/cov.unit.out" | tools/bin/gocov-xml > cdc-coverage.xml $(FAILPOINT_DISABLE) +# Usage: PKG=./api/middleware/... make unit_test_pkg_next_gen unit_test_pkg_next_gen: check_failpoint_ctl tools/bin/gotestsum tools/bin/gocov tools/bin/gocov-xml mkdir -p "$(TEST_DIR)" $(FAILPOINT_ENABLE) diff --git a/api/middleware/authenticate_middleware.go b/api/middleware/authenticate_middleware.go index 033e4abf6c..4797decc41 100644 --- a/api/middleware/authenticate_middleware.go +++ b/api/middleware/authenticate_middleware.go @@ -102,7 +102,7 @@ func verify(ctx *gin.Context, etcdCli etcd.Client) error { // fetchTiDBTopology parses the TiDB topology from etcd. func fetchTiDBTopology(ctx context.Context, etcdClient etcd.Client, ks string) ([]upstream.TidbInstance, error) { - keyspaceManager := appcontext.GetService[keyspace.KeyspaceManager](appcontext.KeyspaceManager) + keyspaceManager := appcontext.GetService[keyspace.Manager](appcontext.KeyspaceManager) meta, err := keyspaceManager.LoadKeyspace(ctx, ks) if err != nil { return nil, err diff --git a/api/middleware/middleware.go b/api/middleware/middleware.go index 13447f86bf..f6a823558d 100644 --- a/api/middleware/middleware.go +++ b/api/middleware/middleware.go @@ -20,7 +20,6 @@ import ( "time" "github.com/gin-gonic/gin" - "github.com/pingcap/kvproto/pkg/keyspacepb" "github.com/pingcap/log" "github.com/pingcap/ticdc/pkg/api" appcontext "github.com/pingcap/ticdc/pkg/common/context" @@ -238,8 +237,8 @@ func KeyspaceCheckerMiddleware() gin.HandlerFunc { return } - keyspaceManager := appcontext.GetService[keyspace.KeyspaceManager](appcontext.KeyspaceManager) - meta, err := keyspaceManager.LoadKeyspace(c.Request.Context(), ks) + keyspaceManager := appcontext.GetService[keyspace.Manager](appcontext.KeyspaceManager) + _, err := keyspaceManager.ForceLoadKeyspace(c.Request.Context(), ks) if errors.IsKeyspaceNotExistError(err) { c.IndentedJSON(http.StatusBadRequest, errors.ErrAPIInvalidParam) c.Abort() @@ -250,12 +249,6 @@ func KeyspaceCheckerMiddleware() gin.HandlerFunc { return } - if meta.State != keyspacepb.KeyspaceState_ENABLED { - c.IndentedJSON(http.StatusBadRequest, errors.ErrAPIInvalidParam) - c.Abort() - return - } - c.Next() } } diff --git a/api/middleware/middleware_nextgen_test.go b/api/middleware/middleware_nextgen_test.go index 1fbf8243ae..223f4d1903 100644 --- a/api/middleware/middleware_nextgen_test.go +++ b/api/middleware/middleware_nextgen_test.go @@ -37,7 +37,7 @@ func TestKeyspaceCheckerMiddleware(t *testing.T) { tests := []struct { name string keyspace string - init func(t *testing.T, mock *keyspace.MockKeyspaceManager) + init func(t *testing.T, mock *keyspace.MockManager) expectedStatus int expectedAbort bool expectedBodyContains string @@ -51,8 +51,8 @@ func TestKeyspaceCheckerMiddleware(t *testing.T) { { name: "keyspace not exist", keyspace: "not-exist", - init: func(t *testing.T, mock *keyspace.MockKeyspaceManager) { - mock.EXPECT().LoadKeyspace(gomock.Any(), "not-exist").Return(nil, errors.New(pdpb.ErrorType_ENTRY_NOT_FOUND.String())) + init: func(t *testing.T, mock *keyspace.MockManager) { + mock.EXPECT().ForceLoadKeyspace(gomock.Any(), "not-exist").Return(nil, errors.New(pdpb.ErrorType_ENTRY_NOT_FOUND.String())) }, expectedStatus: http.StatusBadRequest, expectedAbort: true, @@ -61,8 +61,8 @@ func TestKeyspaceCheckerMiddleware(t *testing.T) { { name: "internal server error", keyspace: "internal-error", - init: func(t *testing.T, mock *keyspace.MockKeyspaceManager) { - mock.EXPECT().LoadKeyspace(gomock.Any(), "internal-error").Return(nil, errors.New("internal error")) + init: func(t *testing.T, mock *keyspace.MockManager) { + mock.EXPECT().ForceLoadKeyspace(gomock.Any(), "internal-error").Return(nil, errors.New("internal error")) }, expectedStatus: http.StatusInternalServerError, expectedAbort: true, @@ -71,8 +71,8 @@ func TestKeyspaceCheckerMiddleware(t *testing.T) { { name: "success", keyspace: "success", - init: func(t *testing.T, mock *keyspace.MockKeyspaceManager) { - mock.EXPECT().LoadKeyspace(gomock.Any(), "success").Return(&keyspacepb.KeyspaceMeta{ + init: func(t *testing.T, mock *keyspace.MockManager) { + mock.EXPECT().ForceLoadKeyspace(gomock.Any(), "success").Return(&keyspacepb.KeyspaceMeta{ State: keyspacepb.KeyspaceState_ENABLED, }, nil) }, @@ -87,7 +87,7 @@ func TestKeyspaceCheckerMiddleware(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() - mock := keyspace.NewMockKeyspaceManager(ctrl) + mock := keyspace.NewMockManager(ctrl) if tt.init != nil { tt.init(t, mock) diff --git a/api/v2/changefeed.go b/api/v2/changefeed.go index 4558e32681..bd08789b52 100644 --- a/api/v2/changefeed.go +++ b/api/v2/changefeed.go @@ -24,6 +24,7 @@ import ( "time" "github.com/gin-gonic/gin" + "github.com/pingcap/kvproto/pkg/keyspacepb" "github.com/pingcap/log" "github.com/pingcap/ticdc/api/middleware" "github.com/pingcap/ticdc/downstreamadapter/sink" @@ -103,13 +104,19 @@ func (h *OpenAPIV2) CreateChangefeed(c *gin.Context) { return } - keyspaceManager := appcontext.GetService[keyspace.KeyspaceManager](appcontext.KeyspaceManager) + keyspaceManager := appcontext.GetService[keyspace.Manager](appcontext.KeyspaceManager) keyspaceMeta, err := keyspaceManager.LoadKeyspace(ctx, keyspaceName) if err != nil { _ = c.Error(errors.WrapError(errors.ErrKeyspaceNotFound, err)) return } + if keyspaceMeta.State != keyspacepb.KeyspaceState_ENABLED { + c.IndentedJSON(http.StatusBadRequest, errors.ErrAPIInvalidParam) + c.Abort() + return + } + co, err := h.server.GetCoordinator() if err != nil { _ = c.Error(err) @@ -381,7 +388,7 @@ func (h *OpenAPIV2) VerifyTable(c *gin.Context) { } protocol, _ := config.ParseSinkProtocolFromString(util.GetOrZero(replicaCfg.Sink.Protocol)) - keyspaceManager := appcontext.GetService[keyspace.KeyspaceManager](appcontext.KeyspaceManager) + keyspaceManager := appcontext.GetService[keyspace.Manager](appcontext.KeyspaceManager) keyspaceName := GetKeyspaceValueWithDefault(c) kvStorage, err := keyspaceManager.GetStorage(keyspaceName) if err != nil { @@ -663,13 +670,19 @@ func (h *OpenAPIV2) ResumeChangefeed(c *gin.Context) { newCheckpointTs = cfg.OverwriteCheckpointTs } - keyspaceManager := appcontext.GetService[keyspace.KeyspaceManager](appcontext.KeyspaceManager) + keyspaceManager := appcontext.GetService[keyspace.Manager](appcontext.KeyspaceManager) keyspaceMeta, err := keyspaceManager.LoadKeyspace(ctx, keyspaceName) if err != nil { _ = c.Error(errors.WrapError(errors.ErrKeyspaceNotFound, err)) return } + if keyspaceMeta.State != keyspacepb.KeyspaceState_ENABLED { + c.IndentedJSON(http.StatusBadRequest, errors.ErrAPIInvalidParam) + c.Abort() + return + } + resumeGcServiceID := h.server.GetEtcdClient().GetEnsureGCServiceID(gc.EnsureGCServiceResuming) if err := verifyResumeChangefeedConfig( ctx, @@ -750,6 +763,18 @@ func (h *OpenAPIV2) UpdateChangefeed(c *gin.Context) { return } + keyspaceManager := appcontext.GetService[keyspace.Manager](appcontext.KeyspaceManager) + keyspaceMeta, err := keyspaceManager.LoadKeyspace(ctx, keyspaceName) + if err != nil { + _ = c.Error(errors.WrapError(errors.ErrKeyspaceNotFound, err)) + return + } + if keyspaceMeta.State != keyspacepb.KeyspaceState_ENABLED { + c.IndentedJSON(http.StatusBadRequest, errors.ErrAPIInvalidParam) + c.Abort() + return + } + oldCfInfo, status, err := co.GetChangefeed(c, changefeedDisplayName) if err != nil { _ = c.Error(err) @@ -820,8 +845,6 @@ func (h *OpenAPIV2) UpdateChangefeed(c *gin.Context) { } protocol, _ := config.ParseSinkProtocolFromString(util.GetOrZero(oldCfInfo.Config.Sink.Protocol)) - keyspaceManager := appcontext.GetService[keyspace.KeyspaceManager](appcontext.KeyspaceManager) - kvStorage, err := keyspaceManager.GetStorage(keyspaceName) if err != nil { _ = c.Error(err) diff --git a/api/v2/unsafe.go b/api/v2/unsafe.go index 275b3c6cb3..b184e24aca 100644 --- a/api/v2/unsafe.go +++ b/api/v2/unsafe.go @@ -54,7 +54,7 @@ func (h *OpenAPIV2) ResolveLock(c *gin.Context) { keyspaceName := GetKeyspaceValueWithDefault(c) - keyspaceManager := appcontext.GetService[keyspace.KeyspaceManager](appcontext.KeyspaceManager) + keyspaceManager := appcontext.GetService[keyspace.Manager](appcontext.KeyspaceManager) // The ctx's lifecycle is the same as the HTTP request. // The schema store may use the context to fetch database information asynchronously. // Therefore, we cannot use the context of the HTTP request. @@ -93,7 +93,7 @@ func (h *OpenAPIV2) DeleteServiceGcSafePoint(c *gin.Context) { defer pdClient.Close() keyspaceName := GetKeyspaceValueWithDefault(c) - keyspaceManager := appcontext.GetService[keyspace.KeyspaceManager](appcontext.KeyspaceManager) + keyspaceManager := appcontext.GetService[keyspace.Manager](appcontext.KeyspaceManager) keyspaceMeta, err := keyspaceManager.LoadKeyspace(c.Request.Context(), keyspaceName) if err != nil { _ = c.Error(cerror.WrapError(cerror.ErrKeyspaceNotFound, err)) diff --git a/coordinator/coordinator.go b/coordinator/coordinator.go index 402a9b06d7..58c15a2820 100644 --- a/coordinator/coordinator.go +++ b/coordinator/coordinator.go @@ -287,7 +287,7 @@ func (c *coordinator) handleStateChange( log.Info("changefeed is resumed or created successfully, try to delete its safeguard gc safepoint", zap.String("changefeed", event.changefeedID.String())) - keyspaceManager := appcontext.GetService[keyspace.KeyspaceManager](appcontext.KeyspaceManager) + keyspaceManager := appcontext.GetService[keyspace.Manager](appcontext.KeyspaceManager) keyspaceMeta, err := keyspaceManager.LoadKeyspace(ctx, event.changefeedID.Keyspace()) if err != nil { log.Warn("failed to load keyspace", zap.String("keyspace", event.changefeedID.Keyspace()), zap.Error(err)) @@ -473,7 +473,7 @@ func (c *coordinator) updateAllKeyspaceGcBarriers(ctx context.Context) error { func (c *coordinator) updateKeyspaceGcBarrier(ctx context.Context, barrierMap map[string]uint64, keyspaceName string) error { // Obtain keyspace metadata from PD - keyspaceManager := appcontext.GetService[keyspace.KeyspaceManager](appcontext.KeyspaceManager) + keyspaceManager := appcontext.GetService[keyspace.Manager](appcontext.KeyspaceManager) keyspaceMeta, err := keyspaceManager.LoadKeyspace(ctx, keyspaceName) if err != nil { return cerror.WrapError(cerror.ErrLoadKeyspaceFailed, err) diff --git a/downstreamadapter/dispatchermanager/dispatcher_manager.go b/downstreamadapter/dispatchermanager/dispatcher_manager.go index 9fedd2d981..16173b18bc 100644 --- a/downstreamadapter/dispatchermanager/dispatcher_manager.go +++ b/downstreamadapter/dispatchermanager/dispatcher_manager.go @@ -158,7 +158,7 @@ func NewDispatcherManager( ctx, cancel := context.WithCancel(context.Background()) pdClock := appcontext.GetService[pdutil.Clock](appcontext.DefaultPDClock) - keyspaceManager := appcontext.GetService[keyspace.KeyspaceManager](appcontext.KeyspaceManager) + keyspaceManager := appcontext.GetService[keyspace.Manager](appcontext.KeyspaceManager) keyspaceMeta, err := keyspaceManager.LoadKeyspace(ctx, changefeedID.Keyspace()) if err != nil { cancel() diff --git a/logservice/schemastore/schema_store.go b/logservice/schemastore/schema_store.go index 22dba39751..8615e5a9c9 100644 --- a/logservice/schemastore/schema_store.go +++ b/logservice/schemastore/schema_store.go @@ -250,7 +250,7 @@ func (s *schemaStore) getKeyspaceSchemaStore(keyspaceID uint32) (*keyspaceSchema // If the schemastore does not contain the keyspace, it means it is not a maintainer node. // It should register the keyspace when it try to get keyspace schema_store. - keyspaceManager := appcontext.GetService[keyspace.KeyspaceManager](appcontext.KeyspaceManager) + keyspaceManager := appcontext.GetService[keyspace.Manager](appcontext.KeyspaceManager) keyspaceMeta, err := keyspaceManager.GetKeyspaceByID(ctx, keyspaceID) if err != nil { return nil, errors.Trace(err) @@ -443,7 +443,7 @@ func (s *schemaStore) RegisterKeyspace( s.keyspaceLocker.Lock() defer s.keyspaceLocker.Unlock() - keyspaceManager := appcontext.GetService[keyspace.KeyspaceManager](appcontext.KeyspaceManager) + keyspaceManager := appcontext.GetService[keyspace.Manager](appcontext.KeyspaceManager) keyspaceMeta, err := keyspaceManager.LoadKeyspace(ctx, keyspaceName) if err != nil { return errors.Trace(err) diff --git a/logservice/txnutil/lock_resolver.go b/logservice/txnutil/lock_resolver.go index a5e006733b..d9a77f067a 100644 --- a/logservice/txnutil/lock_resolver.go +++ b/logservice/txnutil/lock_resolver.go @@ -69,7 +69,7 @@ func (r *resolver) Resolve(ctx context.Context, keyspaceID uint32, regionID uint Limit: scanLockLimit, }) - keyspaceManager := appcontext.GetService[keyspace.KeyspaceManager](appcontext.KeyspaceManager) + keyspaceManager := appcontext.GetService[keyspace.Manager](appcontext.KeyspaceManager) keyspaceMeta, err := keyspaceManager.GetKeyspaceByID(ctx, keyspaceID) if err != nil { return err diff --git a/maintainer/maintainer_manager.go b/maintainer/maintainer_manager.go index 097f452c6a..b76370cc5e 100644 --- a/maintainer/maintainer_manager.go +++ b/maintainer/maintainer_manager.go @@ -231,7 +231,7 @@ func (m *Manager) onAddMaintainerRequest(req *heartbeatpb.AddMaintainerRequest) } ctx := context.Background() - keyspaceManager := appcontext.GetService[keyspace.KeyspaceManager](appcontext.KeyspaceManager) + keyspaceManager := appcontext.GetService[keyspace.Manager](appcontext.KeyspaceManager) keyspaceMeta, err := keyspaceManager.LoadKeyspace(ctx, cfID.Keyspace()) if err != nil { // BUG tenfyzhong 2025-09-11 17:29:08 how to process err @@ -261,7 +261,7 @@ func (m *Manager) onRemoveMaintainerRequest(msg *messaging.TargetMessage) *heart } ctx := context.Background() - keyspaceManager := appcontext.GetService[keyspace.KeyspaceManager](appcontext.KeyspaceManager) + keyspaceManager := appcontext.GetService[keyspace.Manager](appcontext.KeyspaceManager) keyspaceMeta, err := keyspaceManager.LoadKeyspace(ctx, cfID.Keyspace()) if err != nil { // BUG tenfyzhong 2025-09-11 17:29:08 how to process err diff --git a/maintainer/maintainer_manager_test.go b/maintainer/maintainer_manager_test.go index ac51695bd9..c600ce34be 100644 --- a/maintainer/maintainer_manager_test.go +++ b/maintainer/maintainer_manager_test.go @@ -77,7 +77,7 @@ func TestMaintainerSchedulesNodeChanges(t *testing.T) { Name: "ks1", } } - keyspaceManager := keyspace.NewMockKeyspaceManager(ctrl) + keyspaceManager := keyspace.NewMockManager(ctrl) keyspaceManager.EXPECT().LoadKeyspace(gomock.Any(), gomock.Any()).Return(meta, nil) appcontext.SetService(appcontext.KeyspaceManager, keyspaceManager) @@ -304,7 +304,7 @@ func TestMaintainerBootstrapWithTablesReported(t *testing.T) { Name: "ks1", } } - keyspaceManager := keyspace.NewMockKeyspaceManager(ctrl) + keyspaceManager := keyspace.NewMockManager(ctrl) keyspaceManager.EXPECT().LoadKeyspace(gomock.Any(), gomock.Any()).Return(meta, nil) appcontext.SetService(appcontext.KeyspaceManager, keyspaceManager) @@ -440,7 +440,7 @@ func TestStopNotExistsMaintainer(t *testing.T) { Name: "ks1", } } - keyspaceManager := keyspace.NewMockKeyspaceManager(ctrl) + keyspaceManager := keyspace.NewMockManager(ctrl) keyspaceManager.EXPECT().LoadKeyspace(gomock.Any(), gomock.Any()).Return(meta, nil).AnyTimes() appcontext.SetService(appcontext.KeyspaceManager, keyspaceManager) diff --git a/pkg/keyspace/keyspace_manager.go b/pkg/keyspace/keyspace_manager.go index 393b8da3e0..a0d812acec 100644 --- a/pkg/keyspace/keyspace_manager.go +++ b/pkg/keyspace/keyspace_manager.go @@ -17,6 +17,7 @@ import ( "context" "strings" "sync" + "time" "github.com/pingcap/kvproto/pkg/keyspacepb" "github.com/pingcap/log" @@ -32,35 +33,57 @@ import ( "go.uber.org/zap" ) -type KeyspaceManager interface { +const ( + updateDuration = 60 * time.Second +) + +type Manager interface { + // Run starts the manager + Run() + // LoadKeyspace loads keyspace metadata by name from local + // If the local cache is not found, it will load from pd LoadKeyspace(ctx context.Context, keyspace string) (*keyspacepb.KeyspaceMeta, error) + // ForceLoadKeyspace force loads keyspace metadata from pd and update local cache + ForceLoadKeyspace(ctx context.Context, keyspace string) (*keyspacepb.KeyspaceMeta, error) + // GetKeyspaceByID loads keyspace metadata by id GetKeyspaceByID(ctx context.Context, keyspaceID uint32) (*keyspacepb.KeyspaceMeta, error) + // GetStorage get a storag for the keyspace GetStorage(keyspace string) (kv.Storage, error) + // Close close the manager Close() } -func NewKeyspaceManager(pdEndpoints []string) KeyspaceManager { - return &keyspaceManager{ +func NewManager(pdEndpoints []string) Manager { + m := &manager{ pdEndpoints: pdEndpoints, keyspaceMap: make(map[string]*keyspacepb.KeyspaceMeta), keyspaceIDMap: make(map[uint32]*keyspacepb.KeyspaceMeta), storageMap: make(map[string]kv.Storage), } + + return m } -type keyspaceManager struct { +type manager struct { pdEndpoints []string - // TODO tenfyzhong 2025-09-16 23:46:01 update keyspaceMeta periodicity keyspaceMap map[string]*keyspacepb.KeyspaceMeta keyspaceIDMap map[uint32]*keyspacepb.KeyspaceMeta keyspaceMu sync.Mutex storageMap map[string]kv.Storage storageMu sync.Mutex + + ticker *time.Ticker + updateMu sync.Mutex } -func (k *keyspaceManager) LoadKeyspace(ctx context.Context, keyspace string) (*keyspacepb.KeyspaceMeta, error) { +func (k *manager) Run() { + k.ticker = time.NewTicker(updateDuration) + go k.updatePeriodicity() +} + +func (k *manager) LoadKeyspace(ctx context.Context, keyspace string) (*keyspacepb.KeyspaceMeta, error) { if kerneltype.IsClassic() { return &keyspacepb.KeyspaceMeta{ Name: common.DefaultKeyspace, @@ -74,6 +97,11 @@ func (k *keyspaceManager) LoadKeyspace(ctx context.Context, keyspace string) (*k return meta, nil } + return k.ForceLoadKeyspace(ctx, keyspace) +} + +func (k *manager) ForceLoadKeyspace(ctx context.Context, keyspace string) (*keyspacepb.KeyspaceMeta, error) { + var meta *keyspacepb.KeyspaceMeta var err error pdAPIClient := appcontext.GetService[pdutil.PDAPIClient](appcontext.PDAPIClient) err = retry.Do(ctx, func() error { @@ -90,10 +118,6 @@ func (k *keyspaceManager) LoadKeyspace(ctx context.Context, keyspace string) (*k k.keyspaceMu.Lock() defer k.keyspaceMu.Unlock() - // Double check, another goroutine might have fetched and stored it. - if meta, ok := k.keyspaceMap[keyspace]; ok { - return meta, nil - } k.keyspaceMap[keyspace] = meta k.keyspaceIDMap[meta.Id] = meta @@ -101,7 +125,7 @@ func (k *keyspaceManager) LoadKeyspace(ctx context.Context, keyspace string) (*k return meta, nil } -func (k *keyspaceManager) GetKeyspaceByID(ctx context.Context, keyspaceID uint32) (*keyspacepb.KeyspaceMeta, error) { +func (k *manager) GetKeyspaceByID(ctx context.Context, keyspaceID uint32) (*keyspacepb.KeyspaceMeta, error) { if kerneltype.IsClassic() { return &keyspacepb.KeyspaceMeta{ Name: common.DefaultKeyspace, @@ -142,7 +166,7 @@ func (k *keyspaceManager) GetKeyspaceByID(ctx context.Context, keyspaceID uint32 return meta, nil } -func (k *keyspaceManager) GetStorage(keyspace string) (kv.Storage, error) { +func (k *manager) GetStorage(keyspace string) (kv.Storage, error) { k.storageMu.Lock() defer k.storageMu.Unlock() @@ -161,7 +185,7 @@ func (k *keyspaceManager) GetStorage(keyspace string) (kv.Storage, error) { return kvStorage, nil } -func (k *keyspaceManager) Close() { +func (k *manager) Close() { k.storageMu.Lock() defer k.storageMu.Unlock() @@ -171,4 +195,47 @@ func (k *keyspaceManager) Close() { log.Error("close storage", zap.String("keyspace", storage.GetKeyspace()), zap.Error(err)) } } + + k.storageMap = make(map[string]kv.Storage) + + if k.ticker != nil { + k.ticker.Stop() + k.ticker = nil + } +} + +func (k *manager) updatePeriodicity() { + if kerneltype.IsClassic() { + return + } + + for range k.ticker.C { + k.update() + } +} + +func (k *manager) update() { + // If we cannot get the lock, we don't need to do anything + // because that means the previous process is still running. + if !k.updateMu.TryLock() { + log.Info("update keyspace lock failed") + return + } + + defer k.updateMu.Unlock() + + k.keyspaceMu.Lock() + keyspaces := make([]string, 0, len(k.keyspaceMap)) + for _, keyspace := range k.keyspaceMap { + keyspaces = append(keyspaces, keyspace.Name) + } + k.keyspaceMu.Unlock() + + ctx := context.Background() + for _, keyspace := range keyspaces { + _, err := k.ForceLoadKeyspace(ctx, keyspace) + if err != nil { + log.Warn("force load keyspace", zap.String("keyspace", keyspace), zap.Error(err)) + } + } } diff --git a/pkg/keyspace/keyspace_manager_classic_test.go b/pkg/keyspace/keyspace_manager_classic_test.go new file mode 100644 index 0000000000..5d1ce50c0f --- /dev/null +++ b/pkg/keyspace/keyspace_manager_classic_test.go @@ -0,0 +1,62 @@ +// Copyright 2025 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build !nextgen + +package keyspace + +import ( + "context" + "testing" + + "github.com/pingcap/kvproto/pkg/keyspacepb" + "github.com/pingcap/ticdc/pkg/common" + "github.com/pingcap/tidb/pkg/kv" + "github.com/stretchr/testify/require" +) + +func Test_manager_GetKeyspaceByID(t *testing.T) { + m := &manager{ + keyspaceMap: make(map[string]*keyspacepb.KeyspaceMeta), + keyspaceIDMap: make(map[uint32]*keyspacepb.KeyspaceMeta), + storageMap: make(map[string]kv.Storage), + } + + meta := &keyspacepb.KeyspaceMeta{ + Id: 0, + Name: common.DefaultKeyspace, + } + actual1, err := m.GetKeyspaceByID(context.Background(), 1) + require.NoError(t, err) + require.EqualValues(t, meta, actual1) + require.Equal(t, map[string]*keyspacepb.KeyspaceMeta{}, m.keyspaceMap) + require.Equal(t, map[uint32]*keyspacepb.KeyspaceMeta{}, m.keyspaceIDMap) +} + +func Test_manager_LoadKeyspace(t *testing.T) { + m := &manager{ + keyspaceMap: make(map[string]*keyspacepb.KeyspaceMeta), + keyspaceIDMap: make(map[uint32]*keyspacepb.KeyspaceMeta), + storageMap: make(map[string]kv.Storage), + } + + meta := &keyspacepb.KeyspaceMeta{ + Id: 0, + Name: common.DefaultKeyspace, + } + actual1, err := m.LoadKeyspace(context.Background(), "ks1") + require.NoError(t, err) + require.EqualValues(t, meta, actual1) + require.Equal(t, map[string]*keyspacepb.KeyspaceMeta{}, m.keyspaceMap) + require.Equal(t, map[uint32]*keyspacepb.KeyspaceMeta{}, m.keyspaceIDMap) +} diff --git a/pkg/keyspace/keyspace_manager_mock.go b/pkg/keyspace/keyspace_manager_mock.go index 195b044a82..84fd84d8b8 100644 --- a/pkg/keyspace/keyspace_manager_mock.go +++ b/pkg/keyspace/keyspace_manager_mock.go @@ -13,43 +13,58 @@ import ( kv "github.com/pingcap/tidb/pkg/kv" ) -// MockKeyspaceManager is a mock of KeyspaceManager interface. -type MockKeyspaceManager struct { +// MockManager is a mock of Manager interface. +type MockManager struct { ctrl *gomock.Controller - recorder *MockKeyspaceManagerMockRecorder + recorder *MockManagerMockRecorder } -// MockKeyspaceManagerMockRecorder is the mock recorder for MockKeyspaceManager. -type MockKeyspaceManagerMockRecorder struct { - mock *MockKeyspaceManager +// MockManagerMockRecorder is the mock recorder for MockManager. +type MockManagerMockRecorder struct { + mock *MockManager } -// NewMockKeyspaceManager creates a new mock instance. -func NewMockKeyspaceManager(ctrl *gomock.Controller) *MockKeyspaceManager { - mock := &MockKeyspaceManager{ctrl: ctrl} - mock.recorder = &MockKeyspaceManagerMockRecorder{mock} +// NewMockManager creates a new mock instance. +func NewMockManager(ctrl *gomock.Controller) *MockManager { + mock := &MockManager{ctrl: ctrl} + mock.recorder = &MockManagerMockRecorder{mock} return mock } // EXPECT returns an object that allows the caller to indicate expected use. -func (m *MockKeyspaceManager) EXPECT() *MockKeyspaceManagerMockRecorder { +func (m *MockManager) EXPECT() *MockManagerMockRecorder { return m.recorder } // Close mocks base method. -func (m *MockKeyspaceManager) Close() { +func (m *MockManager) Close() { m.ctrl.T.Helper() m.ctrl.Call(m, "Close") } // Close indicates an expected call of Close. -func (mr *MockKeyspaceManagerMockRecorder) Close() *gomock.Call { +func (mr *MockManagerMockRecorder) Close() *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockKeyspaceManager)(nil).Close)) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockManager)(nil).Close)) +} + +// ForceLoadKeyspace mocks base method. +func (m *MockManager) ForceLoadKeyspace(ctx context.Context, keyspace string) (*keyspacepb.KeyspaceMeta, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ForceLoadKeyspace", ctx, keyspace) + ret0, _ := ret[0].(*keyspacepb.KeyspaceMeta) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ForceLoadKeyspace indicates an expected call of ForceLoadKeyspace. +func (mr *MockManagerMockRecorder) ForceLoadKeyspace(ctx, keyspace interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ForceLoadKeyspace", reflect.TypeOf((*MockManager)(nil).ForceLoadKeyspace), ctx, keyspace) } // GetKeyspaceByID mocks base method. -func (m *MockKeyspaceManager) GetKeyspaceByID(ctx context.Context, keyspaceID uint32) (*keyspacepb.KeyspaceMeta, error) { +func (m *MockManager) GetKeyspaceByID(ctx context.Context, keyspaceID uint32) (*keyspacepb.KeyspaceMeta, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "GetKeyspaceByID", ctx, keyspaceID) ret0, _ := ret[0].(*keyspacepb.KeyspaceMeta) @@ -58,13 +73,13 @@ func (m *MockKeyspaceManager) GetKeyspaceByID(ctx context.Context, keyspaceID ui } // GetKeyspaceByID indicates an expected call of GetKeyspaceByID. -func (mr *MockKeyspaceManagerMockRecorder) GetKeyspaceByID(ctx, keyspaceID interface{}) *gomock.Call { +func (mr *MockManagerMockRecorder) GetKeyspaceByID(ctx, keyspaceID interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetKeyspaceByID", reflect.TypeOf((*MockKeyspaceManager)(nil).GetKeyspaceByID), ctx, keyspaceID) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetKeyspaceByID", reflect.TypeOf((*MockManager)(nil).GetKeyspaceByID), ctx, keyspaceID) } // GetStorage mocks base method. -func (m *MockKeyspaceManager) GetStorage(keyspace string) (kv.Storage, error) { +func (m *MockManager) GetStorage(keyspace string) (kv.Storage, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "GetStorage", keyspace) ret0, _ := ret[0].(kv.Storage) @@ -73,13 +88,13 @@ func (m *MockKeyspaceManager) GetStorage(keyspace string) (kv.Storage, error) { } // GetStorage indicates an expected call of GetStorage. -func (mr *MockKeyspaceManagerMockRecorder) GetStorage(keyspace interface{}) *gomock.Call { +func (mr *MockManagerMockRecorder) GetStorage(keyspace interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetStorage", reflect.TypeOf((*MockKeyspaceManager)(nil).GetStorage), keyspace) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetStorage", reflect.TypeOf((*MockManager)(nil).GetStorage), keyspace) } // LoadKeyspace mocks base method. -func (m *MockKeyspaceManager) LoadKeyspace(ctx context.Context, keyspace string) (*keyspacepb.KeyspaceMeta, error) { +func (m *MockManager) LoadKeyspace(ctx context.Context, keyspace string) (*keyspacepb.KeyspaceMeta, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "LoadKeyspace", ctx, keyspace) ret0, _ := ret[0].(*keyspacepb.KeyspaceMeta) @@ -88,7 +103,19 @@ func (m *MockKeyspaceManager) LoadKeyspace(ctx context.Context, keyspace string) } // LoadKeyspace indicates an expected call of LoadKeyspace. -func (mr *MockKeyspaceManagerMockRecorder) LoadKeyspace(ctx, keyspace interface{}) *gomock.Call { +func (mr *MockManagerMockRecorder) LoadKeyspace(ctx, keyspace interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LoadKeyspace", reflect.TypeOf((*MockManager)(nil).LoadKeyspace), ctx, keyspace) +} + +// Run mocks base method. +func (m *MockManager) Run() { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Run") +} + +// Run indicates an expected call of Run. +func (mr *MockManagerMockRecorder) Run() *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LoadKeyspace", reflect.TypeOf((*MockKeyspaceManager)(nil).LoadKeyspace), ctx, keyspace) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Run", reflect.TypeOf((*MockManager)(nil).Run)) } diff --git a/pkg/keyspace/keyspace_manager_next_gen_test.go b/pkg/keyspace/keyspace_manager_next_gen_test.go new file mode 100644 index 0000000000..556e6facef --- /dev/null +++ b/pkg/keyspace/keyspace_manager_next_gen_test.go @@ -0,0 +1,197 @@ +// Copyright 2025 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build nextgen + +package keyspace + +import ( + "context" + "testing" + + "github.com/golang/mock/gomock" + "github.com/pingcap/kvproto/pkg/keyspacepb" + appcontext "github.com/pingcap/ticdc/pkg/common/context" + "github.com/pingcap/ticdc/pkg/pdutil" + "github.com/pingcap/tidb/pkg/kv" + "github.com/stretchr/testify/require" +) + +func Test_manager_update(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + mockClient := pdutil.NewMockPDAPIClient(ctrl) + + appcontext.SetService(appcontext.PDAPIClient, mockClient) + + const keyspace = "ks1" + + // step 1, load a metadata to make the map has a member + meta1 := &keyspacepb.KeyspaceMeta{ + Id: 1, + Name: keyspace, + State: 0, + CreatedAt: 1, + StateChangedAt: 1, + Config: map[string]string{}, + } + mockClient.EXPECT().LoadKeyspace(gomock.Any(), gomock.Eq(keyspace)).Return(meta1, nil).Times(1) + + m := &manager{ + keyspaceMap: make(map[string]*keyspacepb.KeyspaceMeta), + keyspaceIDMap: make(map[uint32]*keyspacepb.KeyspaceMeta), + storageMap: make(map[string]kv.Storage), + } + m.ForceLoadKeyspace(context.Background(), keyspace) + require.EqualValues(t, map[string]*keyspacepb.KeyspaceMeta{ + "ks1": meta1, + }, m.keyspaceMap) + require.EqualValues(t, map[uint32]*keyspacepb.KeyspaceMeta{ + uint32(1): meta1, + }, m.keyspaceIDMap) + + // step 2, lock success + meta2 := &keyspacepb.KeyspaceMeta{ + Id: 1, + Name: keyspace, + State: 0, + CreatedAt: 1, + StateChangedAt: 2, + Config: map[string]string{}, + } + mockClient.EXPECT().LoadKeyspace(gomock.Any(), gomock.Eq(keyspace)).Return(meta2, nil).Times(1) + m.update() + require.EqualValues(t, map[string]*keyspacepb.KeyspaceMeta{ + "ks1": meta2, + }, m.keyspaceMap) + require.EqualValues(t, map[uint32]*keyspacepb.KeyspaceMeta{ + uint32(1): meta2, + }, m.keyspaceIDMap) + + // step 3, lock failed + m.updateMu.Lock() + m.update() + m.updateMu.Unlock() + require.EqualValues(t, map[string]*keyspacepb.KeyspaceMeta{ + "ks1": meta2, + }, m.keyspaceMap) + require.EqualValues(t, map[uint32]*keyspacepb.KeyspaceMeta{ + uint32(1): meta2, + }, m.keyspaceIDMap) + + // step 4, lock success again + meta3 := &keyspacepb.KeyspaceMeta{ + Id: 1, + Name: keyspace, + State: 0, + CreatedAt: 1, + StateChangedAt: 3, + Config: map[string]string{}, + } + mockClient.EXPECT().LoadKeyspace(gomock.Any(), gomock.Eq(keyspace)).Return(meta3, nil).Times(1) + m.update() + require.EqualValues(t, map[string]*keyspacepb.KeyspaceMeta{ + "ks1": meta3, + }, m.keyspaceMap) + require.EqualValues(t, map[uint32]*keyspacepb.KeyspaceMeta{ + uint32(1): meta3, + }, m.keyspaceIDMap) +} + +func Test_manager_GetKeyspaceByID(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + mockClient := pdutil.NewMockPDAPIClient(ctrl) + + appcontext.SetService(appcontext.PDAPIClient, mockClient) + + m := &manager{ + keyspaceMap: make(map[string]*keyspacepb.KeyspaceMeta), + keyspaceIDMap: make(map[uint32]*keyspacepb.KeyspaceMeta), + storageMap: make(map[string]kv.Storage), + } + + meta1 := &keyspacepb.KeyspaceMeta{ + Id: 1, + Name: "ks1", + State: 0, + CreatedAt: 1, + StateChangedAt: 1, + Config: map[string]string{}, + } + mockClient.EXPECT().GetKeyspaceMetaByID(gomock.Any(), gomock.Eq(uint32(1))).Return(meta1, nil).Times(1) + actual1, err := m.GetKeyspaceByID(context.Background(), 1) + require.NoError(t, err) + require.EqualValues(t, meta1, actual1) + require.Equal(t, map[string]*keyspacepb.KeyspaceMeta{ + "ks1": meta1, + }, m.keyspaceMap) + require.Equal(t, map[uint32]*keyspacepb.KeyspaceMeta{ + 1: meta1, + }, m.keyspaceIDMap) + + // GetKeyspaceByID again and it will load from local cache + actual2, err := m.GetKeyspaceByID(context.Background(), 1) + require.NoError(t, err) + require.EqualValues(t, meta1, actual2) + require.Equal(t, map[string]*keyspacepb.KeyspaceMeta{ + "ks1": meta1, + }, m.keyspaceMap) + require.Equal(t, map[uint32]*keyspacepb.KeyspaceMeta{ + 1: meta1, + }, m.keyspaceIDMap) +} + +func Test_manager_LoadKeyspace(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + mockClient := pdutil.NewMockPDAPIClient(ctrl) + + appcontext.SetService(appcontext.PDAPIClient, mockClient) + + m := &manager{ + keyspaceMap: make(map[string]*keyspacepb.KeyspaceMeta), + keyspaceIDMap: make(map[uint32]*keyspacepb.KeyspaceMeta), + storageMap: make(map[string]kv.Storage), + } + + meta1 := &keyspacepb.KeyspaceMeta{ + Id: 1, + Name: "ks1", + State: 0, + CreatedAt: 1, + StateChangedAt: 1, + Config: map[string]string{}, + } + mockClient.EXPECT().LoadKeyspace(gomock.Any(), gomock.Eq("ks1")).Return(meta1, nil).Times(1) + actual1, err := m.LoadKeyspace(context.Background(), "ks1") + require.NoError(t, err) + require.EqualValues(t, meta1, actual1) + require.Equal(t, map[string]*keyspacepb.KeyspaceMeta{ + "ks1": meta1, + }, m.keyspaceMap) + require.Equal(t, map[uint32]*keyspacepb.KeyspaceMeta{ + 1: meta1, + }, m.keyspaceIDMap) + + // GetKeyspaceByID again and it will load from local cache + actual2, err := m.LoadKeyspace(context.Background(), "ks1") + require.NoError(t, err) + require.EqualValues(t, meta1, actual2) + require.Equal(t, map[string]*keyspacepb.KeyspaceMeta{ + "ks1": meta1, + }, m.keyspaceMap) + require.Equal(t, map[uint32]*keyspacepb.KeyspaceMeta{ + 1: meta1, + }, m.keyspaceIDMap) +} diff --git a/pkg/pdutil/api_client_mock.go b/pkg/pdutil/api_client_mock.go new file mode 100644 index 0000000000..a2662ce45b --- /dev/null +++ b/pkg/pdutil/api_client_mock.go @@ -0,0 +1,152 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: pkg/pdutil/api_client.go + +// Package pdutil is a generated GoMock package. +package pdutil + +import ( + context "context" + reflect "reflect" + + gomock "github.com/golang/mock/gomock" + keyspacepb "github.com/pingcap/kvproto/pkg/keyspacepb" + heartbeatpb "github.com/pingcap/ticdc/heartbeatpb" +) + +// MockPDAPIClient is a mock of PDAPIClient interface. +type MockPDAPIClient struct { + ctrl *gomock.Controller + recorder *MockPDAPIClientMockRecorder +} + +// MockPDAPIClientMockRecorder is the mock recorder for MockPDAPIClient. +type MockPDAPIClientMockRecorder struct { + mock *MockPDAPIClient +} + +// NewMockPDAPIClient creates a new mock instance. +func NewMockPDAPIClient(ctrl *gomock.Controller) *MockPDAPIClient { + mock := &MockPDAPIClient{ctrl: ctrl} + mock.recorder = &MockPDAPIClientMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockPDAPIClient) EXPECT() *MockPDAPIClientMockRecorder { + return m.recorder +} + +// Close mocks base method. +func (m *MockPDAPIClient) Close() { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Close") +} + +// Close indicates an expected call of Close. +func (mr *MockPDAPIClientMockRecorder) Close() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockPDAPIClient)(nil).Close)) +} + +// CollectMemberEndpoints mocks base method. +func (m *MockPDAPIClient) CollectMemberEndpoints(ctx context.Context) ([]string, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CollectMemberEndpoints", ctx) + ret0, _ := ret[0].([]string) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// CollectMemberEndpoints indicates an expected call of CollectMemberEndpoints. +func (mr *MockPDAPIClientMockRecorder) CollectMemberEndpoints(ctx interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CollectMemberEndpoints", reflect.TypeOf((*MockPDAPIClient)(nil).CollectMemberEndpoints), ctx) +} + +// GetKeyspaceMetaByID mocks base method. +func (m *MockPDAPIClient) GetKeyspaceMetaByID(ctx context.Context, keyspaceID uint32) (*keyspacepb.KeyspaceMeta, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetKeyspaceMetaByID", ctx, keyspaceID) + ret0, _ := ret[0].(*keyspacepb.KeyspaceMeta) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetKeyspaceMetaByID indicates an expected call of GetKeyspaceMetaByID. +func (mr *MockPDAPIClientMockRecorder) GetKeyspaceMetaByID(ctx, keyspaceID interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetKeyspaceMetaByID", reflect.TypeOf((*MockPDAPIClient)(nil).GetKeyspaceMetaByID), ctx, keyspaceID) +} + +// Healthy mocks base method. +func (m *MockPDAPIClient) Healthy(ctx context.Context, endpoint string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Healthy", ctx, endpoint) + ret0, _ := ret[0].(error) + return ret0 +} + +// Healthy indicates an expected call of Healthy. +func (mr *MockPDAPIClientMockRecorder) Healthy(ctx, endpoint interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Healthy", reflect.TypeOf((*MockPDAPIClient)(nil).Healthy), ctx, endpoint) +} + +// ListGcServiceSafePoint mocks base method. +func (m *MockPDAPIClient) ListGcServiceSafePoint(ctx context.Context) (*ListServiceGCSafepoint, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ListGcServiceSafePoint", ctx) + ret0, _ := ret[0].(*ListServiceGCSafepoint) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ListGcServiceSafePoint indicates an expected call of ListGcServiceSafePoint. +func (mr *MockPDAPIClientMockRecorder) ListGcServiceSafePoint(ctx interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListGcServiceSafePoint", reflect.TypeOf((*MockPDAPIClient)(nil).ListGcServiceSafePoint), ctx) +} + +// LoadKeyspace mocks base method. +func (m *MockPDAPIClient) LoadKeyspace(ctx context.Context, name string) (*keyspacepb.KeyspaceMeta, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "LoadKeyspace", ctx, name) + ret0, _ := ret[0].(*keyspacepb.KeyspaceMeta) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// LoadKeyspace indicates an expected call of LoadKeyspace. +func (mr *MockPDAPIClientMockRecorder) LoadKeyspace(ctx, name interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LoadKeyspace", reflect.TypeOf((*MockPDAPIClient)(nil).LoadKeyspace), ctx, name) +} + +// ScanRegions mocks base method. +func (m *MockPDAPIClient) ScanRegions(ctx context.Context, span heartbeatpb.TableSpan) ([]RegionInfo, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ScanRegions", ctx, span) + ret0, _ := ret[0].([]RegionInfo) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ScanRegions indicates an expected call of ScanRegions. +func (mr *MockPDAPIClientMockRecorder) ScanRegions(ctx, span interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ScanRegions", reflect.TypeOf((*MockPDAPIClient)(nil).ScanRegions), ctx, span) +} + +// UpdateMetaLabel mocks base method. +func (m *MockPDAPIClient) UpdateMetaLabel(ctx context.Context) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "UpdateMetaLabel", ctx) + ret0, _ := ret[0].(error) + return ret0 +} + +// UpdateMetaLabel indicates an expected call of UpdateMetaLabel. +func (mr *MockPDAPIClientMockRecorder) UpdateMetaLabel(ctx interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateMetaLabel", reflect.TypeOf((*MockPDAPIClient)(nil).UpdateMetaLabel), ctx) +} diff --git a/pkg/txnutil/gc/gc_manager.go b/pkg/txnutil/gc/gc_manager.go index 8e892fac36..29f81c067b 100644 --- a/pkg/txnutil/gc/gc_manager.go +++ b/pkg/txnutil/gc/gc_manager.go @@ -182,7 +182,7 @@ func checkStaleCheckpointTs( } func (m *gcManager) checkStaleCheckpointTsKeyspace(ctx context.Context, changefeedID common.ChangeFeedID, checkpointTs common.Ts) error { - keyspaceManager := appcontext.GetService[keyspace.KeyspaceManager](appcontext.KeyspaceManager) + keyspaceManager := appcontext.GetService[keyspace.Manager](appcontext.KeyspaceManager) keyspaceMeta, err := keyspaceManager.LoadKeyspace(ctx, changefeedID.Keyspace()) if err != nil { return err diff --git a/scripts/generate-mock.sh b/scripts/generate-mock.sh index 11c2f99d5c..806ed0854d 100755 --- a/scripts/generate-mock.sh +++ b/scripts/generate-mock.sh @@ -34,3 +34,4 @@ fi "$MOCKGEN" -source pkg/api/v2/changefeed.go -destination pkg/api/v2/mock/changefeed_mock.go -package mock "$MOCKGEN" -source pkg/sink/codec/simple/marshaller.go -destination pkg/sink/codec/simple/mock/marshaller.go "$MOCKGEN" -source pkg/keyspace/keyspace_manager.go -destination pkg/keyspace/keyspace_manager_mock.go -package keyspace +"$MOCKGEN" -source pkg/pdutil/api_client.go -destination pkg/pdutil/api_client_mock.go -package pdutil diff --git a/server/server.go b/server/server.go index aa7645a78e..6c81ec4fc8 100644 --- a/server/server.go +++ b/server/server.go @@ -274,7 +274,8 @@ func (c *server) setPreServices(ctx context.Context) error { appctx.SetService(appctx.DispatcherOrchestrator, dispatcherOrchestrator) c.preServices = append(c.preServices, dispatcherOrchestrator) - keyspaceManager := keyspace.NewKeyspaceManager(c.pdEndpoints) + keyspaceManager := keyspace.NewManager(c.pdEndpoints) + keyspaceManager.Run() appctx.SetService(appctx.KeyspaceManager, keyspaceManager) c.preServices = append(c.preServices, keyspaceManager)