From 914ef3bded1304870c7ee7678daabd283de40229 Mon Sep 17 00:00:00 2001 From: tenfyzhong Date: Mon, 20 Oct 2025 12:54:39 +0800 Subject: [PATCH 01/15] refactor(keyspace): Rename KeyspaceManager to Manager - Rename the `KeyspaceManager` interface to `Manager`. - Rename the `keyspaceManager` struct to `manager`. - Update all usages of `KeyspaceManager` to `Manager` across the codebase. - Update the constructor function from `NewKeyspaceManager` to `NewManager`. Signed-off-by: tenfyzhong --- api/middleware/middleware.go | 2 +- api/v2/changefeed.go | 8 ++++---- api/v2/unsafe.go | 4 ++-- coordinator/coordinator.go | 4 ++-- .../dispatchermanager/dispatcher_manager.go | 2 +- logservice/schemastore/schema_store.go | 4 ++-- logservice/txnutil/lock_resolver.go | 2 +- maintainer/maintainer_manager.go | 4 ++-- maintainer/maintainer_manager_test.go | 2 +- pkg/keyspace/keyspace_manager.go | 16 ++++++++-------- pkg/txnutil/gc/gc_manager.go | 2 +- server/server.go | 2 +- 12 files changed, 26 insertions(+), 26 deletions(-) diff --git a/api/middleware/middleware.go b/api/middleware/middleware.go index 13447f86bf..5b4bacf720 100644 --- a/api/middleware/middleware.go +++ b/api/middleware/middleware.go @@ -238,7 +238,7 @@ func KeyspaceCheckerMiddleware() gin.HandlerFunc { return } - keyspaceManager := appcontext.GetService[keyspace.KeyspaceManager](appcontext.KeyspaceManager) + keyspaceManager := appcontext.GetService[keyspace.Manager](appcontext.KeyspaceManager) meta, err := keyspaceManager.LoadKeyspace(c.Request.Context(), ks) if errors.IsKeyspaceNotExistError(err) { c.IndentedJSON(http.StatusBadRequest, errors.ErrAPIInvalidParam) diff --git a/api/v2/changefeed.go b/api/v2/changefeed.go index 4558e32681..d048f90bbf 100644 --- a/api/v2/changefeed.go +++ b/api/v2/changefeed.go @@ -103,7 +103,7 @@ func (h *OpenAPIV2) CreateChangefeed(c *gin.Context) { return } - keyspaceManager := appcontext.GetService[keyspace.KeyspaceManager](appcontext.KeyspaceManager) + keyspaceManager := appcontext.GetService[keyspace.Manager](appcontext.KeyspaceManager) keyspaceMeta, err := keyspaceManager.LoadKeyspace(ctx, keyspaceName) if err != nil { _ = c.Error(errors.WrapError(errors.ErrKeyspaceNotFound, err)) @@ -381,7 +381,7 @@ func (h *OpenAPIV2) VerifyTable(c *gin.Context) { } protocol, _ := config.ParseSinkProtocolFromString(util.GetOrZero(replicaCfg.Sink.Protocol)) - keyspaceManager := appcontext.GetService[keyspace.KeyspaceManager](appcontext.KeyspaceManager) + keyspaceManager := appcontext.GetService[keyspace.Manager](appcontext.KeyspaceManager) keyspaceName := GetKeyspaceValueWithDefault(c) kvStorage, err := keyspaceManager.GetStorage(keyspaceName) if err != nil { @@ -663,7 +663,7 @@ func (h *OpenAPIV2) ResumeChangefeed(c *gin.Context) { newCheckpointTs = cfg.OverwriteCheckpointTs } - keyspaceManager := appcontext.GetService[keyspace.KeyspaceManager](appcontext.KeyspaceManager) + keyspaceManager := appcontext.GetService[keyspace.Manager](appcontext.KeyspaceManager) keyspaceMeta, err := keyspaceManager.LoadKeyspace(ctx, keyspaceName) if err != nil { _ = c.Error(errors.WrapError(errors.ErrKeyspaceNotFound, err)) @@ -820,7 +820,7 @@ func (h *OpenAPIV2) UpdateChangefeed(c *gin.Context) { } protocol, _ := config.ParseSinkProtocolFromString(util.GetOrZero(oldCfInfo.Config.Sink.Protocol)) - keyspaceManager := appcontext.GetService[keyspace.KeyspaceManager](appcontext.KeyspaceManager) + keyspaceManager := appcontext.GetService[keyspace.Manager](appcontext.KeyspaceManager) kvStorage, err := keyspaceManager.GetStorage(keyspaceName) if err != nil { diff --git a/api/v2/unsafe.go b/api/v2/unsafe.go index 275b3c6cb3..b184e24aca 100644 --- a/api/v2/unsafe.go +++ b/api/v2/unsafe.go @@ -54,7 +54,7 @@ func (h *OpenAPIV2) ResolveLock(c *gin.Context) { keyspaceName := GetKeyspaceValueWithDefault(c) - keyspaceManager := appcontext.GetService[keyspace.KeyspaceManager](appcontext.KeyspaceManager) + keyspaceManager := appcontext.GetService[keyspace.Manager](appcontext.KeyspaceManager) // The ctx's lifecycle is the same as the HTTP request. // The schema store may use the context to fetch database information asynchronously. // Therefore, we cannot use the context of the HTTP request. @@ -93,7 +93,7 @@ func (h *OpenAPIV2) DeleteServiceGcSafePoint(c *gin.Context) { defer pdClient.Close() keyspaceName := GetKeyspaceValueWithDefault(c) - keyspaceManager := appcontext.GetService[keyspace.KeyspaceManager](appcontext.KeyspaceManager) + keyspaceManager := appcontext.GetService[keyspace.Manager](appcontext.KeyspaceManager) keyspaceMeta, err := keyspaceManager.LoadKeyspace(c.Request.Context(), keyspaceName) if err != nil { _ = c.Error(cerror.WrapError(cerror.ErrKeyspaceNotFound, err)) diff --git a/coordinator/coordinator.go b/coordinator/coordinator.go index 402a9b06d7..58c15a2820 100644 --- a/coordinator/coordinator.go +++ b/coordinator/coordinator.go @@ -287,7 +287,7 @@ func (c *coordinator) handleStateChange( log.Info("changefeed is resumed or created successfully, try to delete its safeguard gc safepoint", zap.String("changefeed", event.changefeedID.String())) - keyspaceManager := appcontext.GetService[keyspace.KeyspaceManager](appcontext.KeyspaceManager) + keyspaceManager := appcontext.GetService[keyspace.Manager](appcontext.KeyspaceManager) keyspaceMeta, err := keyspaceManager.LoadKeyspace(ctx, event.changefeedID.Keyspace()) if err != nil { log.Warn("failed to load keyspace", zap.String("keyspace", event.changefeedID.Keyspace()), zap.Error(err)) @@ -473,7 +473,7 @@ func (c *coordinator) updateAllKeyspaceGcBarriers(ctx context.Context) error { func (c *coordinator) updateKeyspaceGcBarrier(ctx context.Context, barrierMap map[string]uint64, keyspaceName string) error { // Obtain keyspace metadata from PD - keyspaceManager := appcontext.GetService[keyspace.KeyspaceManager](appcontext.KeyspaceManager) + keyspaceManager := appcontext.GetService[keyspace.Manager](appcontext.KeyspaceManager) keyspaceMeta, err := keyspaceManager.LoadKeyspace(ctx, keyspaceName) if err != nil { return cerror.WrapError(cerror.ErrLoadKeyspaceFailed, err) diff --git a/downstreamadapter/dispatchermanager/dispatcher_manager.go b/downstreamadapter/dispatchermanager/dispatcher_manager.go index 9fedd2d981..16173b18bc 100644 --- a/downstreamadapter/dispatchermanager/dispatcher_manager.go +++ b/downstreamadapter/dispatchermanager/dispatcher_manager.go @@ -158,7 +158,7 @@ func NewDispatcherManager( ctx, cancel := context.WithCancel(context.Background()) pdClock := appcontext.GetService[pdutil.Clock](appcontext.DefaultPDClock) - keyspaceManager := appcontext.GetService[keyspace.KeyspaceManager](appcontext.KeyspaceManager) + keyspaceManager := appcontext.GetService[keyspace.Manager](appcontext.KeyspaceManager) keyspaceMeta, err := keyspaceManager.LoadKeyspace(ctx, changefeedID.Keyspace()) if err != nil { cancel() diff --git a/logservice/schemastore/schema_store.go b/logservice/schemastore/schema_store.go index 22dba39751..8615e5a9c9 100644 --- a/logservice/schemastore/schema_store.go +++ b/logservice/schemastore/schema_store.go @@ -250,7 +250,7 @@ func (s *schemaStore) getKeyspaceSchemaStore(keyspaceID uint32) (*keyspaceSchema // If the schemastore does not contain the keyspace, it means it is not a maintainer node. // It should register the keyspace when it try to get keyspace schema_store. - keyspaceManager := appcontext.GetService[keyspace.KeyspaceManager](appcontext.KeyspaceManager) + keyspaceManager := appcontext.GetService[keyspace.Manager](appcontext.KeyspaceManager) keyspaceMeta, err := keyspaceManager.GetKeyspaceByID(ctx, keyspaceID) if err != nil { return nil, errors.Trace(err) @@ -443,7 +443,7 @@ func (s *schemaStore) RegisterKeyspace( s.keyspaceLocker.Lock() defer s.keyspaceLocker.Unlock() - keyspaceManager := appcontext.GetService[keyspace.KeyspaceManager](appcontext.KeyspaceManager) + keyspaceManager := appcontext.GetService[keyspace.Manager](appcontext.KeyspaceManager) keyspaceMeta, err := keyspaceManager.LoadKeyspace(ctx, keyspaceName) if err != nil { return errors.Trace(err) diff --git a/logservice/txnutil/lock_resolver.go b/logservice/txnutil/lock_resolver.go index a5e006733b..d9a77f067a 100644 --- a/logservice/txnutil/lock_resolver.go +++ b/logservice/txnutil/lock_resolver.go @@ -69,7 +69,7 @@ func (r *resolver) Resolve(ctx context.Context, keyspaceID uint32, regionID uint Limit: scanLockLimit, }) - keyspaceManager := appcontext.GetService[keyspace.KeyspaceManager](appcontext.KeyspaceManager) + keyspaceManager := appcontext.GetService[keyspace.Manager](appcontext.KeyspaceManager) keyspaceMeta, err := keyspaceManager.GetKeyspaceByID(ctx, keyspaceID) if err != nil { return err diff --git a/maintainer/maintainer_manager.go b/maintainer/maintainer_manager.go index 097f452c6a..b76370cc5e 100644 --- a/maintainer/maintainer_manager.go +++ b/maintainer/maintainer_manager.go @@ -231,7 +231,7 @@ func (m *Manager) onAddMaintainerRequest(req *heartbeatpb.AddMaintainerRequest) } ctx := context.Background() - keyspaceManager := appcontext.GetService[keyspace.KeyspaceManager](appcontext.KeyspaceManager) + keyspaceManager := appcontext.GetService[keyspace.Manager](appcontext.KeyspaceManager) keyspaceMeta, err := keyspaceManager.LoadKeyspace(ctx, cfID.Keyspace()) if err != nil { // BUG tenfyzhong 2025-09-11 17:29:08 how to process err @@ -261,7 +261,7 @@ func (m *Manager) onRemoveMaintainerRequest(msg *messaging.TargetMessage) *heart } ctx := context.Background() - keyspaceManager := appcontext.GetService[keyspace.KeyspaceManager](appcontext.KeyspaceManager) + keyspaceManager := appcontext.GetService[keyspace.Manager](appcontext.KeyspaceManager) keyspaceMeta, err := keyspaceManager.LoadKeyspace(ctx, cfID.Keyspace()) if err != nil { // BUG tenfyzhong 2025-09-11 17:29:08 how to process err diff --git a/maintainer/maintainer_manager_test.go b/maintainer/maintainer_manager_test.go index 434cd9f1e5..e21e7e5650 100644 --- a/maintainer/maintainer_manager_test.go +++ b/maintainer/maintainer_manager_test.go @@ -61,7 +61,7 @@ func TestMaintainerSchedulesNodeChanges(t *testing.T) { } mockPDClock := pdutil.NewClock4Test() appcontext.SetService(appcontext.DefaultPDClock, mockPDClock) - keyspaceManager := keyspace.NewKeyspaceManager([]string{"127.0.0.1:2379"}) + keyspaceManager := keyspace.NewManager([]string{"127.0.0.1:2379"}) appcontext.SetService(appcontext.KeyspaceManager, keyspaceManager) appcontext.SetService(appcontext.SchemaStore, store) diff --git a/pkg/keyspace/keyspace_manager.go b/pkg/keyspace/keyspace_manager.go index 393b8da3e0..02ad8f8464 100644 --- a/pkg/keyspace/keyspace_manager.go +++ b/pkg/keyspace/keyspace_manager.go @@ -32,15 +32,15 @@ import ( "go.uber.org/zap" ) -type KeyspaceManager interface { +type Manager interface { LoadKeyspace(ctx context.Context, keyspace string) (*keyspacepb.KeyspaceMeta, error) GetKeyspaceByID(ctx context.Context, keyspaceID uint32) (*keyspacepb.KeyspaceMeta, error) GetStorage(keyspace string) (kv.Storage, error) Close() } -func NewKeyspaceManager(pdEndpoints []string) KeyspaceManager { - return &keyspaceManager{ +func NewManager(pdEndpoints []string) Manager { + return &manager{ pdEndpoints: pdEndpoints, keyspaceMap: make(map[string]*keyspacepb.KeyspaceMeta), keyspaceIDMap: make(map[uint32]*keyspacepb.KeyspaceMeta), @@ -48,7 +48,7 @@ func NewKeyspaceManager(pdEndpoints []string) KeyspaceManager { } } -type keyspaceManager struct { +type manager struct { pdEndpoints []string // TODO tenfyzhong 2025-09-16 23:46:01 update keyspaceMeta periodicity @@ -60,7 +60,7 @@ type keyspaceManager struct { storageMu sync.Mutex } -func (k *keyspaceManager) LoadKeyspace(ctx context.Context, keyspace string) (*keyspacepb.KeyspaceMeta, error) { +func (k *manager) LoadKeyspace(ctx context.Context, keyspace string) (*keyspacepb.KeyspaceMeta, error) { if kerneltype.IsClassic() { return &keyspacepb.KeyspaceMeta{ Name: common.DefaultKeyspace, @@ -101,7 +101,7 @@ func (k *keyspaceManager) LoadKeyspace(ctx context.Context, keyspace string) (*k return meta, nil } -func (k *keyspaceManager) GetKeyspaceByID(ctx context.Context, keyspaceID uint32) (*keyspacepb.KeyspaceMeta, error) { +func (k *manager) GetKeyspaceByID(ctx context.Context, keyspaceID uint32) (*keyspacepb.KeyspaceMeta, error) { if kerneltype.IsClassic() { return &keyspacepb.KeyspaceMeta{ Name: common.DefaultKeyspace, @@ -142,7 +142,7 @@ func (k *keyspaceManager) GetKeyspaceByID(ctx context.Context, keyspaceID uint32 return meta, nil } -func (k *keyspaceManager) GetStorage(keyspace string) (kv.Storage, error) { +func (k *manager) GetStorage(keyspace string) (kv.Storage, error) { k.storageMu.Lock() defer k.storageMu.Unlock() @@ -161,7 +161,7 @@ func (k *keyspaceManager) GetStorage(keyspace string) (kv.Storage, error) { return kvStorage, nil } -func (k *keyspaceManager) Close() { +func (k *manager) Close() { k.storageMu.Lock() defer k.storageMu.Unlock() diff --git a/pkg/txnutil/gc/gc_manager.go b/pkg/txnutil/gc/gc_manager.go index 8e892fac36..29f81c067b 100644 --- a/pkg/txnutil/gc/gc_manager.go +++ b/pkg/txnutil/gc/gc_manager.go @@ -182,7 +182,7 @@ func checkStaleCheckpointTs( } func (m *gcManager) checkStaleCheckpointTsKeyspace(ctx context.Context, changefeedID common.ChangeFeedID, checkpointTs common.Ts) error { - keyspaceManager := appcontext.GetService[keyspace.KeyspaceManager](appcontext.KeyspaceManager) + keyspaceManager := appcontext.GetService[keyspace.Manager](appcontext.KeyspaceManager) keyspaceMeta, err := keyspaceManager.LoadKeyspace(ctx, changefeedID.Keyspace()) if err != nil { return err diff --git a/server/server.go b/server/server.go index 2f6675253f..a0131c8f74 100644 --- a/server/server.go +++ b/server/server.go @@ -274,7 +274,7 @@ func (c *server) setPreServices(ctx context.Context) error { appctx.SetService(appctx.DispatcherOrchestrator, dispatcherOrchestrator) c.preServices = append(c.preServices, dispatcherOrchestrator) - keyspaceManager := keyspace.NewKeyspaceManager(c.pdEndpoints) + keyspaceManager := keyspace.NewManager(c.pdEndpoints) appctx.SetService(appctx.KeyspaceManager, keyspaceManager) c.preServices = append(c.preServices, keyspaceManager) From e1e2f028b1ddd38175524d7ece9202302510c9db Mon Sep 17 00:00:00 2001 From: tenfyzhong Date: Mon, 20 Oct 2025 13:23:47 +0800 Subject: [PATCH 02/15] feat(keyspace): Add periodic keyspace meta update - Initialize a ticker to periodically update keyspace metadata. - Implement a background goroutine that fetches and updates keyspace metadata at regular intervals. - Ensure that updates only occur in non-classic kernel environments. - Stop the ticker when the manager is closed to prevent resource leaks. Signed-off-by: tenfyzhong --- pkg/keyspace/keyspace_manager.go | 56 ++++++++++++++++++++++++++++---- 1 file changed, 50 insertions(+), 6 deletions(-) diff --git a/pkg/keyspace/keyspace_manager.go b/pkg/keyspace/keyspace_manager.go index 02ad8f8464..06a35a98a8 100644 --- a/pkg/keyspace/keyspace_manager.go +++ b/pkg/keyspace/keyspace_manager.go @@ -17,6 +17,7 @@ import ( "context" "strings" "sync" + "time" "github.com/pingcap/kvproto/pkg/keyspacepb" "github.com/pingcap/log" @@ -40,24 +41,29 @@ type Manager interface { } func NewManager(pdEndpoints []string) Manager { - return &manager{ + m := &manager{ pdEndpoints: pdEndpoints, keyspaceMap: make(map[string]*keyspacepb.KeyspaceMeta), keyspaceIDMap: make(map[uint32]*keyspacepb.KeyspaceMeta), storageMap: make(map[string]kv.Storage), } + + m.update() + + return m } type manager struct { pdEndpoints []string - // TODO tenfyzhong 2025-09-16 23:46:01 update keyspaceMeta periodicity keyspaceMap map[string]*keyspacepb.KeyspaceMeta keyspaceIDMap map[uint32]*keyspacepb.KeyspaceMeta keyspaceMu sync.Mutex storageMap map[string]kv.Storage storageMu sync.Mutex + + ticker *time.Ticker } func (k *manager) LoadKeyspace(ctx context.Context, keyspace string) (*keyspacepb.KeyspaceMeta, error) { @@ -74,6 +80,11 @@ func (k *manager) LoadKeyspace(ctx context.Context, keyspace string) (*keyspacep return meta, nil } + return k.forceLoadKeyspace(ctx, keyspace) +} + +func (k *manager) forceLoadKeyspace(ctx context.Context, keyspace string) (*keyspacepb.KeyspaceMeta, error) { + var meta *keyspacepb.KeyspaceMeta var err error pdAPIClient := appcontext.GetService[pdutil.PDAPIClient](appcontext.PDAPIClient) err = retry.Do(ctx, func() error { @@ -90,10 +101,6 @@ func (k *manager) LoadKeyspace(ctx context.Context, keyspace string) (*keyspacep k.keyspaceMu.Lock() defer k.keyspaceMu.Unlock() - // Double check, another goroutine might have fetched and stored it. - if meta, ok := k.keyspaceMap[keyspace]; ok { - return meta, nil - } k.keyspaceMap[keyspace] = meta k.keyspaceIDMap[meta.Id] = meta @@ -171,4 +178,41 @@ func (k *manager) Close() { log.Error("close storage", zap.String("keyspace", storage.GetKeyspace()), zap.Error(err)) } } + + k.storageMap = make(map[string]kv.Storage) + + k.ticker.Stop() +} + +func (k *manager) update() { + if kerneltype.IsClassic() { + return + } + + mu := sync.Mutex{} + k.ticker = time.NewTicker(time.Second * 60) + + go func() { + // If we cannot get the lock, we don't need to do anything + // because that means the previous process is still running. + if !mu.TryLock() { + return + } + + mu.Lock() + defer mu.Unlock() + + for range k.ticker.C { + k.keyspaceMu.Lock() + keyspaces := make([]string, 0, len(k.keyspaceMap)) + k.keyspaceMu.Unlock() + ctx := context.Background() + for _, keyspace := range keyspaces { + _, err := k.forceLoadKeyspace(ctx, keyspace) + if err != nil { + log.Warn("force load keyspace", zap.String("keyspace", keyspace), zap.Error(err)) + } + } + } + }() } From 020d2c9c5be4347ce5f57af1c71f32fc826bc9cf Mon Sep 17 00:00:00 2001 From: tenfyzhong Date: Mon, 20 Oct 2025 13:30:47 +0800 Subject: [PATCH 03/15] feat(keyspace): Use constant for periodic update interval - Define `periodicUpdateKeyspace` constant for the ticker interval. - Update the ticker initialization to use the new constant. Signed-off-by: tenfyzhong --- pkg/keyspace/keyspace_manager.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/pkg/keyspace/keyspace_manager.go b/pkg/keyspace/keyspace_manager.go index 06a35a98a8..554c152006 100644 --- a/pkg/keyspace/keyspace_manager.go +++ b/pkg/keyspace/keyspace_manager.go @@ -33,6 +33,10 @@ import ( "go.uber.org/zap" ) +const ( + periodicUpdateKeyspace = 60 * time.Second +) + type Manager interface { LoadKeyspace(ctx context.Context, keyspace string) (*keyspacepb.KeyspaceMeta, error) GetKeyspaceByID(ctx context.Context, keyspaceID uint32) (*keyspacepb.KeyspaceMeta, error) @@ -190,7 +194,7 @@ func (k *manager) update() { } mu := sync.Mutex{} - k.ticker = time.NewTicker(time.Second * 60) + k.ticker = time.NewTicker(periodicUpdateKeyspace) go func() { // If we cannot get the lock, we don't need to do anything From 18b9727c70b1a66cda26acd4a161d7d0d8ece786 Mon Sep 17 00:00:00 2001 From: tenfyzhong Date: Mon, 20 Oct 2025 13:32:49 +0800 Subject: [PATCH 04/15] feat(keyspace): Add documentation comments to Manager interface - Add GoDoc comments to explain the purpose of each method in the Manager interface. - This improves code readability and maintainability by clarifying the expected behavior of the interface. Signed-off-by: tenfyzhong --- pkg/keyspace/keyspace_manager.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pkg/keyspace/keyspace_manager.go b/pkg/keyspace/keyspace_manager.go index 554c152006..bed02abf9f 100644 --- a/pkg/keyspace/keyspace_manager.go +++ b/pkg/keyspace/keyspace_manager.go @@ -38,9 +38,13 @@ const ( ) type Manager interface { + // LoadKeyspace loads keyspace metadata by name LoadKeyspace(ctx context.Context, keyspace string) (*keyspacepb.KeyspaceMeta, error) + // GetKeyspaceByID loads keyspace metadata by id GetKeyspaceByID(ctx context.Context, keyspaceID uint32) (*keyspacepb.KeyspaceMeta, error) + // GetStorage get a storag for the keyspace GetStorage(keyspace string) (kv.Storage, error) + // Close close the manager Close() } From 2dff535601a79bc1eaedb0b56c7950b89ef24bc1 Mon Sep 17 00:00:00 2001 From: tenfyzhong Date: Mon, 20 Oct 2025 13:37:57 +0800 Subject: [PATCH 05/15] refactor(keyspace): Move mutex lock out of update loop - The mutex lock was being acquired and released on every ticker tick, which is inefficient. - Moved the mutex lock to be acquired once before the loop and released after the loop. - This change improves performance by reducing lock contention. Signed-off-by: tenfyzhong --- pkg/keyspace/keyspace_manager.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/keyspace/keyspace_manager.go b/pkg/keyspace/keyspace_manager.go index bed02abf9f..a3d3259760 100644 --- a/pkg/keyspace/keyspace_manager.go +++ b/pkg/keyspace/keyspace_manager.go @@ -207,7 +207,6 @@ func (k *manager) update() { return } - mu.Lock() defer mu.Unlock() for range k.ticker.C { From 878e39a73a0043bef49d0ffdadbb3177a71d5239 Mon Sep 17 00:00:00 2001 From: tenfyzhong Date: Mon, 20 Oct 2025 15:07:34 +0800 Subject: [PATCH 06/15] feat(pdutil): generate mock for PDAPIClient - Add mock for PDAPIClient to facilitate testing of components that interact with PD. - Update generate-mock.sh to include the new mock generation. Signed-off-by: tenfyzhong --- pkg/pdutil/api_client_mock.go | 152 ++++++++++++++++++++++++++++++++++ scripts/generate-mock.sh | 1 + 2 files changed, 153 insertions(+) create mode 100644 pkg/pdutil/api_client_mock.go diff --git a/pkg/pdutil/api_client_mock.go b/pkg/pdutil/api_client_mock.go new file mode 100644 index 0000000000..a2662ce45b --- /dev/null +++ b/pkg/pdutil/api_client_mock.go @@ -0,0 +1,152 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: pkg/pdutil/api_client.go + +// Package pdutil is a generated GoMock package. +package pdutil + +import ( + context "context" + reflect "reflect" + + gomock "github.com/golang/mock/gomock" + keyspacepb "github.com/pingcap/kvproto/pkg/keyspacepb" + heartbeatpb "github.com/pingcap/ticdc/heartbeatpb" +) + +// MockPDAPIClient is a mock of PDAPIClient interface. +type MockPDAPIClient struct { + ctrl *gomock.Controller + recorder *MockPDAPIClientMockRecorder +} + +// MockPDAPIClientMockRecorder is the mock recorder for MockPDAPIClient. +type MockPDAPIClientMockRecorder struct { + mock *MockPDAPIClient +} + +// NewMockPDAPIClient creates a new mock instance. +func NewMockPDAPIClient(ctrl *gomock.Controller) *MockPDAPIClient { + mock := &MockPDAPIClient{ctrl: ctrl} + mock.recorder = &MockPDAPIClientMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockPDAPIClient) EXPECT() *MockPDAPIClientMockRecorder { + return m.recorder +} + +// Close mocks base method. +func (m *MockPDAPIClient) Close() { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Close") +} + +// Close indicates an expected call of Close. +func (mr *MockPDAPIClientMockRecorder) Close() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockPDAPIClient)(nil).Close)) +} + +// CollectMemberEndpoints mocks base method. +func (m *MockPDAPIClient) CollectMemberEndpoints(ctx context.Context) ([]string, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CollectMemberEndpoints", ctx) + ret0, _ := ret[0].([]string) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// CollectMemberEndpoints indicates an expected call of CollectMemberEndpoints. +func (mr *MockPDAPIClientMockRecorder) CollectMemberEndpoints(ctx interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CollectMemberEndpoints", reflect.TypeOf((*MockPDAPIClient)(nil).CollectMemberEndpoints), ctx) +} + +// GetKeyspaceMetaByID mocks base method. +func (m *MockPDAPIClient) GetKeyspaceMetaByID(ctx context.Context, keyspaceID uint32) (*keyspacepb.KeyspaceMeta, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetKeyspaceMetaByID", ctx, keyspaceID) + ret0, _ := ret[0].(*keyspacepb.KeyspaceMeta) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetKeyspaceMetaByID indicates an expected call of GetKeyspaceMetaByID. +func (mr *MockPDAPIClientMockRecorder) GetKeyspaceMetaByID(ctx, keyspaceID interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetKeyspaceMetaByID", reflect.TypeOf((*MockPDAPIClient)(nil).GetKeyspaceMetaByID), ctx, keyspaceID) +} + +// Healthy mocks base method. +func (m *MockPDAPIClient) Healthy(ctx context.Context, endpoint string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Healthy", ctx, endpoint) + ret0, _ := ret[0].(error) + return ret0 +} + +// Healthy indicates an expected call of Healthy. +func (mr *MockPDAPIClientMockRecorder) Healthy(ctx, endpoint interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Healthy", reflect.TypeOf((*MockPDAPIClient)(nil).Healthy), ctx, endpoint) +} + +// ListGcServiceSafePoint mocks base method. +func (m *MockPDAPIClient) ListGcServiceSafePoint(ctx context.Context) (*ListServiceGCSafepoint, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ListGcServiceSafePoint", ctx) + ret0, _ := ret[0].(*ListServiceGCSafepoint) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ListGcServiceSafePoint indicates an expected call of ListGcServiceSafePoint. +func (mr *MockPDAPIClientMockRecorder) ListGcServiceSafePoint(ctx interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListGcServiceSafePoint", reflect.TypeOf((*MockPDAPIClient)(nil).ListGcServiceSafePoint), ctx) +} + +// LoadKeyspace mocks base method. +func (m *MockPDAPIClient) LoadKeyspace(ctx context.Context, name string) (*keyspacepb.KeyspaceMeta, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "LoadKeyspace", ctx, name) + ret0, _ := ret[0].(*keyspacepb.KeyspaceMeta) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// LoadKeyspace indicates an expected call of LoadKeyspace. +func (mr *MockPDAPIClientMockRecorder) LoadKeyspace(ctx, name interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LoadKeyspace", reflect.TypeOf((*MockPDAPIClient)(nil).LoadKeyspace), ctx, name) +} + +// ScanRegions mocks base method. +func (m *MockPDAPIClient) ScanRegions(ctx context.Context, span heartbeatpb.TableSpan) ([]RegionInfo, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ScanRegions", ctx, span) + ret0, _ := ret[0].([]RegionInfo) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ScanRegions indicates an expected call of ScanRegions. +func (mr *MockPDAPIClientMockRecorder) ScanRegions(ctx, span interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ScanRegions", reflect.TypeOf((*MockPDAPIClient)(nil).ScanRegions), ctx, span) +} + +// UpdateMetaLabel mocks base method. +func (m *MockPDAPIClient) UpdateMetaLabel(ctx context.Context) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "UpdateMetaLabel", ctx) + ret0, _ := ret[0].(error) + return ret0 +} + +// UpdateMetaLabel indicates an expected call of UpdateMetaLabel. +func (mr *MockPDAPIClientMockRecorder) UpdateMetaLabel(ctx interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateMetaLabel", reflect.TypeOf((*MockPDAPIClient)(nil).UpdateMetaLabel), ctx) +} diff --git a/scripts/generate-mock.sh b/scripts/generate-mock.sh index 798fd69a59..f1ad125daa 100755 --- a/scripts/generate-mock.sh +++ b/scripts/generate-mock.sh @@ -33,3 +33,4 @@ fi "$MOCKGEN" -source pkg/api/v2/processor.go -destination pkg/api/v2/mock/processor_mock.go -package mock "$MOCKGEN" -source pkg/api/v2/changefeed.go -destination pkg/api/v2/mock/changefeed_mock.go -package mock "$MOCKGEN" -source pkg/sink/codec/simple/marshaller.go -destination pkg/sink/codec/simple/mock/marshaller.go +"$MOCKGEN" -source pkg/pdutil/api_client.go -destination pkg/pdutil/api_client_mock.go -package pdutil From a3cb79ae6f180c96adaf8fdc7ae04e3e7635c4b0 Mon Sep 17 00:00:00 2001 From: tenfyzhong Date: Mon, 20 Oct 2025 15:16:01 +0800 Subject: [PATCH 07/15] feat(keyspace): Add Run method and periodic update for keyspace manager - Add a `Run` method to the `keyspace.Manager` interface and implementation. - This method initializes a ticker and starts a goroutine for periodic updates of keyspace metadata. - The `updatePeriodicity` function now correctly handles locking and updating keyspace metadata. - The `NewManager` function no longer calls `update()` directly. - The `server.go` file now calls `keyspaceManager.Run()` after creating the manager. - Add `pkg/keyspace` to `UT_PACKAGES_OTHERS` in the Makefile. - Add a new test file `pkg/keyspace/keyspace_manager_test.go` to cover the `update` functionality. Signed-off-by: tenfyzhong --- Makefile | 2 +- pkg/keyspace/keyspace_manager.go | 63 +++++++++------ pkg/keyspace/keyspace_manager_test.go | 109 ++++++++++++++++++++++++++ server/server.go | 1 + 4 files changed, 148 insertions(+), 27 deletions(-) create mode 100644 pkg/keyspace/keyspace_manager_test.go diff --git a/Makefile b/Makefile index b3f2135127..6693698011 100644 --- a/Makefile +++ b/Makefile @@ -124,7 +124,7 @@ UT_PACKAGES_DISPATCHER := ./pkg/sink/cloudstorage/... ./pkg/sink/mysql/... ./pkg UT_PACKAGES_MAINTAINER := ./maintainer/... ./pkg/scheduler/... UT_PACKAGES_COORDINATOR := ./coordinator/... UT_PACKAGES_LOGSERVICE := ./logservice/... -UT_PACKAGES_OTHERS := ./pkg/eventservice/... ./pkg/version/... ./utils/dynstream/... ./pkg/common/event/... ./pkg/common/... ./api/middleware/... +UT_PACKAGES_OTHERS := ./pkg/eventservice/... ./pkg/version/... ./utils/dynstream/... ./pkg/common/event/... ./pkg/common/... ./api/middleware/... ./pkg/keyspace/... include tools/Makefile diff --git a/pkg/keyspace/keyspace_manager.go b/pkg/keyspace/keyspace_manager.go index a3d3259760..056a6bdb57 100644 --- a/pkg/keyspace/keyspace_manager.go +++ b/pkg/keyspace/keyspace_manager.go @@ -34,10 +34,12 @@ import ( ) const ( - periodicUpdateKeyspace = 60 * time.Second + updateDuration = 60 * time.Second ) type Manager interface { + // Run + Run() // LoadKeyspace loads keyspace metadata by name LoadKeyspace(ctx context.Context, keyspace string) (*keyspacepb.KeyspaceMeta, error) // GetKeyspaceByID loads keyspace metadata by id @@ -56,8 +58,6 @@ func NewManager(pdEndpoints []string) Manager { storageMap: make(map[string]kv.Storage), } - m.update() - return m } @@ -74,6 +74,11 @@ type manager struct { ticker *time.Ticker } +func (k *manager) Run() { + k.ticker = time.NewTicker(updateDuration) + go k.updatePeriodicity() +} + func (k *manager) LoadKeyspace(ctx context.Context, keyspace string) (*keyspacepb.KeyspaceMeta, error) { if kerneltype.IsClassic() { return &keyspacepb.KeyspaceMeta{ @@ -192,34 +197,40 @@ func (k *manager) Close() { k.ticker.Stop() } -func (k *manager) update() { +func (k *manager) updatePeriodicity() { if kerneltype.IsClassic() { return } - mu := sync.Mutex{} - k.ticker = time.NewTicker(periodicUpdateKeyspace) + mu := &sync.Mutex{} - go func() { - // If we cannot get the lock, we don't need to do anything - // because that means the previous process is still running. - if !mu.TryLock() { - return - } + for range k.ticker.C { + k.update(mu) + } +} + +func (k *manager) update(mu *sync.Mutex) { + // If we cannot get the lock, we don't need to do anything + // because that means the previous process is still running. + if !mu.TryLock() { + log.Info("update keyspace lock failed") + return + } + + defer mu.Unlock() + + k.keyspaceMu.Lock() + keyspaces := make([]string, 0, len(k.keyspaceMap)) + for _, keyspace := range k.keyspaceMap { + keyspaces = append(keyspaces, keyspace.Name) + } + k.keyspaceMu.Unlock() - defer mu.Unlock() - - for range k.ticker.C { - k.keyspaceMu.Lock() - keyspaces := make([]string, 0, len(k.keyspaceMap)) - k.keyspaceMu.Unlock() - ctx := context.Background() - for _, keyspace := range keyspaces { - _, err := k.forceLoadKeyspace(ctx, keyspace) - if err != nil { - log.Warn("force load keyspace", zap.String("keyspace", keyspace), zap.Error(err)) - } - } + ctx := context.Background() + for _, keyspace := range keyspaces { + _, err := k.forceLoadKeyspace(ctx, keyspace) + if err != nil { + log.Warn("force load keyspace", zap.String("keyspace", keyspace), zap.Error(err)) } - }() + } } diff --git a/pkg/keyspace/keyspace_manager_test.go b/pkg/keyspace/keyspace_manager_test.go new file mode 100644 index 0000000000..392968a175 --- /dev/null +++ b/pkg/keyspace/keyspace_manager_test.go @@ -0,0 +1,109 @@ +// Copyright 2025 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package keyspace + +import ( + "context" + "sync" + "testing" + + "github.com/golang/mock/gomock" + "github.com/pingcap/kvproto/pkg/keyspacepb" + appcontext "github.com/pingcap/ticdc/pkg/common/context" + "github.com/pingcap/ticdc/pkg/pdutil" + "github.com/pingcap/tidb/pkg/kv" + "github.com/stretchr/testify/require" +) + +func Test_manager_update(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + mockClient := pdutil.NewMockPDAPIClient(ctrl) + + appcontext.SetService(appcontext.PDAPIClient, mockClient) + + const keyspace = "ks1" + mu := &sync.Mutex{} + + // step 1, load a metadata to make the map has a member + meta1 := &keyspacepb.KeyspaceMeta{ + Id: 1, + Name: keyspace, + State: 0, + CreatedAt: 1, + StateChangedAt: 1, + Config: map[string]string{}, + } + mockClient.EXPECT().LoadKeyspace(gomock.Any(), gomock.Eq(keyspace)).Return(meta1, nil).Times(1) + + m := &manager{ + keyspaceMap: make(map[string]*keyspacepb.KeyspaceMeta), + keyspaceIDMap: make(map[uint32]*keyspacepb.KeyspaceMeta), + storageMap: make(map[string]kv.Storage), + } + m.forceLoadKeyspace(context.Background(), keyspace) + require.EqualValues(t, map[string]*keyspacepb.KeyspaceMeta{ + "ks1": meta1, + }, m.keyspaceMap) + require.EqualValues(t, map[uint32]*keyspacepb.KeyspaceMeta{ + uint32(1): meta1, + }, m.keyspaceIDMap) + + // step 2, lock success + meta2 := &keyspacepb.KeyspaceMeta{ + Id: 1, + Name: keyspace, + State: 0, + CreatedAt: 1, + StateChangedAt: 2, + Config: map[string]string{}, + } + mockClient.EXPECT().LoadKeyspace(gomock.Any(), gomock.Eq(keyspace)).Return(meta2, nil).Times(1) + m.update(mu) + require.EqualValues(t, map[string]*keyspacepb.KeyspaceMeta{ + "ks1": meta2, + }, m.keyspaceMap) + require.EqualValues(t, map[uint32]*keyspacepb.KeyspaceMeta{ + uint32(1): meta2, + }, m.keyspaceIDMap) + + // step 3, lock failed + mu.Lock() + m.update(mu) + mu.Unlock() + require.EqualValues(t, map[string]*keyspacepb.KeyspaceMeta{ + "ks1": meta2, + }, m.keyspaceMap) + require.EqualValues(t, map[uint32]*keyspacepb.KeyspaceMeta{ + uint32(1): meta2, + }, m.keyspaceIDMap) + + // step 4, lock success again + meta3 := &keyspacepb.KeyspaceMeta{ + Id: 1, + Name: keyspace, + State: 0, + CreatedAt: 1, + StateChangedAt: 3, + Config: map[string]string{}, + } + mockClient.EXPECT().LoadKeyspace(gomock.Any(), gomock.Eq(keyspace)).Return(meta3, nil).Times(1) + m.update(mu) + require.EqualValues(t, map[string]*keyspacepb.KeyspaceMeta{ + "ks1": meta3, + }, m.keyspaceMap) + require.EqualValues(t, map[uint32]*keyspacepb.KeyspaceMeta{ + uint32(1): meta3, + }, m.keyspaceIDMap) +} diff --git a/server/server.go b/server/server.go index a0131c8f74..cabdd3dc90 100644 --- a/server/server.go +++ b/server/server.go @@ -275,6 +275,7 @@ func (c *server) setPreServices(ctx context.Context) error { c.preServices = append(c.preServices, dispatcherOrchestrator) keyspaceManager := keyspace.NewManager(c.pdEndpoints) + keyspaceManager.Run() appctx.SetService(appctx.KeyspaceManager, keyspaceManager) c.preServices = append(c.preServices, keyspaceManager) From 032f98af776eebc2b2792a29635854b701887c2e Mon Sep 17 00:00:00 2001 From: tenfyzhong Date: Mon, 20 Oct 2025 15:35:36 +0800 Subject: [PATCH 08/15] feat(keyspace): Add tests for GetKeyspaceByID and LoadKeyspace - Add unit tests for the `GetKeyspaceByID` and `LoadKeyspace` methods of the `manager` struct in the `nextgen` build mode. - These tests cover scenarios where keyspace metadata is fetched from the PD API client and then cached locally. - Ensure that subsequent calls to these methods retrieve data from the local cache. - Add corresponding tests for the `classic` build mode, which do not interact with the PD API client. Signed-off-by: tenfyzhong --- pkg/keyspace/keyspace_manager_classic_test.go | 62 +++++++++++++ ...t.go => keyspace_manager_next_gen_test.go} | 90 +++++++++++++++++++ 2 files changed, 152 insertions(+) create mode 100644 pkg/keyspace/keyspace_manager_classic_test.go rename pkg/keyspace/{keyspace_manager_test.go => keyspace_manager_next_gen_test.go} (53%) diff --git a/pkg/keyspace/keyspace_manager_classic_test.go b/pkg/keyspace/keyspace_manager_classic_test.go new file mode 100644 index 0000000000..5d1ce50c0f --- /dev/null +++ b/pkg/keyspace/keyspace_manager_classic_test.go @@ -0,0 +1,62 @@ +// Copyright 2025 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build !nextgen + +package keyspace + +import ( + "context" + "testing" + + "github.com/pingcap/kvproto/pkg/keyspacepb" + "github.com/pingcap/ticdc/pkg/common" + "github.com/pingcap/tidb/pkg/kv" + "github.com/stretchr/testify/require" +) + +func Test_manager_GetKeyspaceByID(t *testing.T) { + m := &manager{ + keyspaceMap: make(map[string]*keyspacepb.KeyspaceMeta), + keyspaceIDMap: make(map[uint32]*keyspacepb.KeyspaceMeta), + storageMap: make(map[string]kv.Storage), + } + + meta := &keyspacepb.KeyspaceMeta{ + Id: 0, + Name: common.DefaultKeyspace, + } + actual1, err := m.GetKeyspaceByID(context.Background(), 1) + require.NoError(t, err) + require.EqualValues(t, meta, actual1) + require.Equal(t, map[string]*keyspacepb.KeyspaceMeta{}, m.keyspaceMap) + require.Equal(t, map[uint32]*keyspacepb.KeyspaceMeta{}, m.keyspaceIDMap) +} + +func Test_manager_LoadKeyspace(t *testing.T) { + m := &manager{ + keyspaceMap: make(map[string]*keyspacepb.KeyspaceMeta), + keyspaceIDMap: make(map[uint32]*keyspacepb.KeyspaceMeta), + storageMap: make(map[string]kv.Storage), + } + + meta := &keyspacepb.KeyspaceMeta{ + Id: 0, + Name: common.DefaultKeyspace, + } + actual1, err := m.LoadKeyspace(context.Background(), "ks1") + require.NoError(t, err) + require.EqualValues(t, meta, actual1) + require.Equal(t, map[string]*keyspacepb.KeyspaceMeta{}, m.keyspaceMap) + require.Equal(t, map[uint32]*keyspacepb.KeyspaceMeta{}, m.keyspaceIDMap) +} diff --git a/pkg/keyspace/keyspace_manager_test.go b/pkg/keyspace/keyspace_manager_next_gen_test.go similarity index 53% rename from pkg/keyspace/keyspace_manager_test.go rename to pkg/keyspace/keyspace_manager_next_gen_test.go index 392968a175..7543eb01b7 100644 --- a/pkg/keyspace/keyspace_manager_test.go +++ b/pkg/keyspace/keyspace_manager_next_gen_test.go @@ -11,6 +11,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +//go:build nextgen + package keyspace import ( @@ -107,3 +109,91 @@ func Test_manager_update(t *testing.T) { uint32(1): meta3, }, m.keyspaceIDMap) } + +func Test_manager_GetKeyspaceByID(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + mockClient := pdutil.NewMockPDAPIClient(ctrl) + + appcontext.SetService(appcontext.PDAPIClient, mockClient) + + m := &manager{ + keyspaceMap: make(map[string]*keyspacepb.KeyspaceMeta), + keyspaceIDMap: make(map[uint32]*keyspacepb.KeyspaceMeta), + storageMap: make(map[string]kv.Storage), + } + + meta1 := &keyspacepb.KeyspaceMeta{ + Id: 1, + Name: "ks1", + State: 0, + CreatedAt: 1, + StateChangedAt: 1, + Config: map[string]string{}, + } + mockClient.EXPECT().GetKeyspaceMetaByID(gomock.Any(), gomock.Eq(uint32(1))).Return(meta1, nil).Times(1) + actual1, err := m.GetKeyspaceByID(context.Background(), 1) + require.NoError(t, err) + require.EqualValues(t, meta1, actual1) + require.Equal(t, map[string]*keyspacepb.KeyspaceMeta{ + "ks1": meta1, + }, m.keyspaceMap) + require.Equal(t, map[uint32]*keyspacepb.KeyspaceMeta{ + 1: meta1, + }, m.keyspaceIDMap) + + // GetKeyspaceByID again and it will load from local cache + actual2, err := m.GetKeyspaceByID(context.Background(), 1) + require.NoError(t, err) + require.EqualValues(t, meta1, actual2) + require.Equal(t, map[string]*keyspacepb.KeyspaceMeta{ + "ks1": meta1, + }, m.keyspaceMap) + require.Equal(t, map[uint32]*keyspacepb.KeyspaceMeta{ + 1: meta1, + }, m.keyspaceIDMap) +} + +func Test_manager_LoadKeyspace(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + mockClient := pdutil.NewMockPDAPIClient(ctrl) + + appcontext.SetService(appcontext.PDAPIClient, mockClient) + + m := &manager{ + keyspaceMap: make(map[string]*keyspacepb.KeyspaceMeta), + keyspaceIDMap: make(map[uint32]*keyspacepb.KeyspaceMeta), + storageMap: make(map[string]kv.Storage), + } + + meta1 := &keyspacepb.KeyspaceMeta{ + Id: 1, + Name: "ks1", + State: 0, + CreatedAt: 1, + StateChangedAt: 1, + Config: map[string]string{}, + } + mockClient.EXPECT().LoadKeyspace(gomock.Any(), gomock.Eq("ks1")).Return(meta1, nil).Times(1) + actual1, err := m.LoadKeyspace(context.Background(), "ks1") + require.NoError(t, err) + require.EqualValues(t, meta1, actual1) + require.Equal(t, map[string]*keyspacepb.KeyspaceMeta{ + "ks1": meta1, + }, m.keyspaceMap) + require.Equal(t, map[uint32]*keyspacepb.KeyspaceMeta{ + 1: meta1, + }, m.keyspaceIDMap) + + // GetKeyspaceByID again and it will load from local cache + actual2, err := m.LoadKeyspace(context.Background(), "ks1") + require.NoError(t, err) + require.EqualValues(t, meta1, actual2) + require.Equal(t, map[string]*keyspacepb.KeyspaceMeta{ + "ks1": meta1, + }, m.keyspaceMap) + require.Equal(t, map[uint32]*keyspacepb.KeyspaceMeta{ + 1: meta1, + }, m.keyspaceIDMap) +} From 7298d2a7a6d47994b7f1b90e7a5bb712401caa05 Mon Sep 17 00:00:00 2001 From: tenfyzhong Date: Mon, 20 Oct 2025 15:41:29 +0800 Subject: [PATCH 09/15] feat(keyspace): Add comment to Run method - Clarify the purpose of the Run method in the Manager interface. Signed-off-by: tenfyzhong --- pkg/keyspace/keyspace_manager.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/keyspace/keyspace_manager.go b/pkg/keyspace/keyspace_manager.go index 056a6bdb57..b247c73ea4 100644 --- a/pkg/keyspace/keyspace_manager.go +++ b/pkg/keyspace/keyspace_manager.go @@ -38,7 +38,7 @@ const ( ) type Manager interface { - // Run + // Run starts the manager Run() // LoadKeyspace loads keyspace metadata by name LoadKeyspace(ctx context.Context, keyspace string) (*keyspacepb.KeyspaceMeta, error) From a72cffba4d9e68c3a24cda964ad0096500b01ea0 Mon Sep 17 00:00:00 2001 From: tenfyzhong Date: Mon, 20 Oct 2025 16:00:04 +0800 Subject: [PATCH 10/15] feat(keyspace): Use internal mutex for update periodicity - Removed external mutex from `update` function. - Added internal `updateMu` to `manager` struct. - Updated `update` function to use `k.updateMu` for locking. - Added nil check for ticker in `Close` function. - Updated tests to reflect the internal mutex usage. Signed-off-by: tenfyzhong --- pkg/keyspace/keyspace_manager.go | 18 ++++++++++-------- pkg/keyspace/keyspace_manager_next_gen_test.go | 12 +++++------- 2 files changed, 15 insertions(+), 15 deletions(-) diff --git a/pkg/keyspace/keyspace_manager.go b/pkg/keyspace/keyspace_manager.go index b247c73ea4..3dcae58b2f 100644 --- a/pkg/keyspace/keyspace_manager.go +++ b/pkg/keyspace/keyspace_manager.go @@ -71,7 +71,8 @@ type manager struct { storageMap map[string]kv.Storage storageMu sync.Mutex - ticker *time.Ticker + ticker *time.Ticker + updateMu sync.Mutex } func (k *manager) Run() { @@ -194,7 +195,10 @@ func (k *manager) Close() { k.storageMap = make(map[string]kv.Storage) - k.ticker.Stop() + if k.ticker != nil { + k.ticker.Stop() + k.ticker = nil + } } func (k *manager) updatePeriodicity() { @@ -202,22 +206,20 @@ func (k *manager) updatePeriodicity() { return } - mu := &sync.Mutex{} - for range k.ticker.C { - k.update(mu) + k.update() } } -func (k *manager) update(mu *sync.Mutex) { +func (k *manager) update() { // If we cannot get the lock, we don't need to do anything // because that means the previous process is still running. - if !mu.TryLock() { + if !k.updateMu.TryLock() { log.Info("update keyspace lock failed") return } - defer mu.Unlock() + defer k.updateMu.Unlock() k.keyspaceMu.Lock() keyspaces := make([]string, 0, len(k.keyspaceMap)) diff --git a/pkg/keyspace/keyspace_manager_next_gen_test.go b/pkg/keyspace/keyspace_manager_next_gen_test.go index 7543eb01b7..9b5053586e 100644 --- a/pkg/keyspace/keyspace_manager_next_gen_test.go +++ b/pkg/keyspace/keyspace_manager_next_gen_test.go @@ -17,7 +17,6 @@ package keyspace import ( "context" - "sync" "testing" "github.com/golang/mock/gomock" @@ -36,7 +35,6 @@ func Test_manager_update(t *testing.T) { appcontext.SetService(appcontext.PDAPIClient, mockClient) const keyspace = "ks1" - mu := &sync.Mutex{} // step 1, load a metadata to make the map has a member meta1 := &keyspacepb.KeyspaceMeta{ @@ -72,7 +70,7 @@ func Test_manager_update(t *testing.T) { Config: map[string]string{}, } mockClient.EXPECT().LoadKeyspace(gomock.Any(), gomock.Eq(keyspace)).Return(meta2, nil).Times(1) - m.update(mu) + m.update() require.EqualValues(t, map[string]*keyspacepb.KeyspaceMeta{ "ks1": meta2, }, m.keyspaceMap) @@ -81,9 +79,9 @@ func Test_manager_update(t *testing.T) { }, m.keyspaceIDMap) // step 3, lock failed - mu.Lock() - m.update(mu) - mu.Unlock() + m.updateMu.Lock() + m.update() + m.updateMu.Unlock() require.EqualValues(t, map[string]*keyspacepb.KeyspaceMeta{ "ks1": meta2, }, m.keyspaceMap) @@ -101,7 +99,7 @@ func Test_manager_update(t *testing.T) { Config: map[string]string{}, } mockClient.EXPECT().LoadKeyspace(gomock.Any(), gomock.Eq(keyspace)).Return(meta3, nil).Times(1) - m.update(mu) + m.update() require.EqualValues(t, map[string]*keyspacepb.KeyspaceMeta{ "ks1": meta3, }, m.keyspaceMap) From 9e81d9758b69849a900b81dac01397497796c35f Mon Sep 17 00:00:00 2001 From: tenfyzhong Date: Mon, 20 Oct 2025 16:06:27 +0800 Subject: [PATCH 11/15] feat(keyspace): Introduce ForceLoadKeyspace method - Rename `forceLoadKeyspace` to `ForceLoadKeyspace` for clarity and consistency with naming conventions. - Update the `LoadKeyspace` method to explicitly call `ForceLoadKeyspace` when the local cache is not found, ensuring data is always fetched from the PD. - Modify the `KeyspaceCheckerMiddleware` to use `ForceLoadKeyspace` to ensure the latest keyspace metadata is loaded. - Update tests to reflect the new method name and behavior. Signed-off-by: tenfyzhong --- api/middleware/middleware.go | 2 +- pkg/keyspace/keyspace_manager.go | 11 +++++++---- pkg/keyspace/keyspace_manager_next_gen_test.go | 2 +- 3 files changed, 9 insertions(+), 6 deletions(-) diff --git a/api/middleware/middleware.go b/api/middleware/middleware.go index 5b4bacf720..6419600b14 100644 --- a/api/middleware/middleware.go +++ b/api/middleware/middleware.go @@ -239,7 +239,7 @@ func KeyspaceCheckerMiddleware() gin.HandlerFunc { } keyspaceManager := appcontext.GetService[keyspace.Manager](appcontext.KeyspaceManager) - meta, err := keyspaceManager.LoadKeyspace(c.Request.Context(), ks) + meta, err := keyspaceManager.ForceLoadKeyspace(c.Request.Context(), ks) if errors.IsKeyspaceNotExistError(err) { c.IndentedJSON(http.StatusBadRequest, errors.ErrAPIInvalidParam) c.Abort() diff --git a/pkg/keyspace/keyspace_manager.go b/pkg/keyspace/keyspace_manager.go index 3dcae58b2f..a0d812acec 100644 --- a/pkg/keyspace/keyspace_manager.go +++ b/pkg/keyspace/keyspace_manager.go @@ -40,8 +40,11 @@ const ( type Manager interface { // Run starts the manager Run() - // LoadKeyspace loads keyspace metadata by name + // LoadKeyspace loads keyspace metadata by name from local + // If the local cache is not found, it will load from pd LoadKeyspace(ctx context.Context, keyspace string) (*keyspacepb.KeyspaceMeta, error) + // ForceLoadKeyspace force loads keyspace metadata from pd and update local cache + ForceLoadKeyspace(ctx context.Context, keyspace string) (*keyspacepb.KeyspaceMeta, error) // GetKeyspaceByID loads keyspace metadata by id GetKeyspaceByID(ctx context.Context, keyspaceID uint32) (*keyspacepb.KeyspaceMeta, error) // GetStorage get a storag for the keyspace @@ -94,10 +97,10 @@ func (k *manager) LoadKeyspace(ctx context.Context, keyspace string) (*keyspacep return meta, nil } - return k.forceLoadKeyspace(ctx, keyspace) + return k.ForceLoadKeyspace(ctx, keyspace) } -func (k *manager) forceLoadKeyspace(ctx context.Context, keyspace string) (*keyspacepb.KeyspaceMeta, error) { +func (k *manager) ForceLoadKeyspace(ctx context.Context, keyspace string) (*keyspacepb.KeyspaceMeta, error) { var meta *keyspacepb.KeyspaceMeta var err error pdAPIClient := appcontext.GetService[pdutil.PDAPIClient](appcontext.PDAPIClient) @@ -230,7 +233,7 @@ func (k *manager) update() { ctx := context.Background() for _, keyspace := range keyspaces { - _, err := k.forceLoadKeyspace(ctx, keyspace) + _, err := k.ForceLoadKeyspace(ctx, keyspace) if err != nil { log.Warn("force load keyspace", zap.String("keyspace", keyspace), zap.Error(err)) } diff --git a/pkg/keyspace/keyspace_manager_next_gen_test.go b/pkg/keyspace/keyspace_manager_next_gen_test.go index 9b5053586e..556e6facef 100644 --- a/pkg/keyspace/keyspace_manager_next_gen_test.go +++ b/pkg/keyspace/keyspace_manager_next_gen_test.go @@ -52,7 +52,7 @@ func Test_manager_update(t *testing.T) { keyspaceIDMap: make(map[uint32]*keyspacepb.KeyspaceMeta), storageMap: make(map[string]kv.Storage), } - m.forceLoadKeyspace(context.Background(), keyspace) + m.ForceLoadKeyspace(context.Background(), keyspace) require.EqualValues(t, map[string]*keyspacepb.KeyspaceMeta{ "ks1": meta1, }, m.keyspaceMap) From 096201863efd21e9f4d7629da1709def7ec16128 Mon Sep 17 00:00:00 2001 From: tenfyzhong Date: Mon, 20 Oct 2025 16:32:03 +0800 Subject: [PATCH 12/15] feat(api): Remove redundant keyspace state check - The keyspace state check in `KeyspaceCheckerMiddleware` was redundant. - The state check has been moved to the `CreateChangefeed` handler in `api/v2/changefeed.go`. - This change simplifies the middleware and ensures the check is performed at the correct level of the API. Signed-off-by: tenfyzhong --- api/middleware/middleware.go | 9 +-------- api/v2/changefeed.go | 7 +++++++ 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/api/middleware/middleware.go b/api/middleware/middleware.go index 6419600b14..f6a823558d 100644 --- a/api/middleware/middleware.go +++ b/api/middleware/middleware.go @@ -20,7 +20,6 @@ import ( "time" "github.com/gin-gonic/gin" - "github.com/pingcap/kvproto/pkg/keyspacepb" "github.com/pingcap/log" "github.com/pingcap/ticdc/pkg/api" appcontext "github.com/pingcap/ticdc/pkg/common/context" @@ -239,7 +238,7 @@ func KeyspaceCheckerMiddleware() gin.HandlerFunc { } keyspaceManager := appcontext.GetService[keyspace.Manager](appcontext.KeyspaceManager) - meta, err := keyspaceManager.ForceLoadKeyspace(c.Request.Context(), ks) + _, err := keyspaceManager.ForceLoadKeyspace(c.Request.Context(), ks) if errors.IsKeyspaceNotExistError(err) { c.IndentedJSON(http.StatusBadRequest, errors.ErrAPIInvalidParam) c.Abort() @@ -250,12 +249,6 @@ func KeyspaceCheckerMiddleware() gin.HandlerFunc { return } - if meta.State != keyspacepb.KeyspaceState_ENABLED { - c.IndentedJSON(http.StatusBadRequest, errors.ErrAPIInvalidParam) - c.Abort() - return - } - c.Next() } } diff --git a/api/v2/changefeed.go b/api/v2/changefeed.go index d048f90bbf..3737f5c2f7 100644 --- a/api/v2/changefeed.go +++ b/api/v2/changefeed.go @@ -24,6 +24,7 @@ import ( "time" "github.com/gin-gonic/gin" + "github.com/pingcap/kvproto/pkg/keyspacepb" "github.com/pingcap/log" "github.com/pingcap/ticdc/api/middleware" "github.com/pingcap/ticdc/downstreamadapter/sink" @@ -110,6 +111,12 @@ func (h *OpenAPIV2) CreateChangefeed(c *gin.Context) { return } + if keyspaceMeta.State != keyspacepb.KeyspaceState_ENABLED { + c.IndentedJSON(http.StatusBadRequest, errors.ErrAPIInvalidParam) + c.Abort() + return + } + co, err := h.server.GetCoordinator() if err != nil { _ = c.Error(err) From 5bd1661c813e627de100791044478f35dffd4983 Mon Sep 17 00:00:00 2001 From: tenfyzhong Date: Mon, 20 Oct 2025 17:01:18 +0800 Subject: [PATCH 13/15] feat(changefeed): Validate keyspace state before operations - Add validation to `ResumeChangefeed` to ensure the keyspace is enabled. - Add validation to `UpdateChangefeed` to ensure the keyspace is enabled. Signed-off-by: tenfyzhong --- api/v2/changefeed.go | 20 ++++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/api/v2/changefeed.go b/api/v2/changefeed.go index 3737f5c2f7..bd08789b52 100644 --- a/api/v2/changefeed.go +++ b/api/v2/changefeed.go @@ -677,6 +677,12 @@ func (h *OpenAPIV2) ResumeChangefeed(c *gin.Context) { return } + if keyspaceMeta.State != keyspacepb.KeyspaceState_ENABLED { + c.IndentedJSON(http.StatusBadRequest, errors.ErrAPIInvalidParam) + c.Abort() + return + } + resumeGcServiceID := h.server.GetEtcdClient().GetEnsureGCServiceID(gc.EnsureGCServiceResuming) if err := verifyResumeChangefeedConfig( ctx, @@ -757,6 +763,18 @@ func (h *OpenAPIV2) UpdateChangefeed(c *gin.Context) { return } + keyspaceManager := appcontext.GetService[keyspace.Manager](appcontext.KeyspaceManager) + keyspaceMeta, err := keyspaceManager.LoadKeyspace(ctx, keyspaceName) + if err != nil { + _ = c.Error(errors.WrapError(errors.ErrKeyspaceNotFound, err)) + return + } + if keyspaceMeta.State != keyspacepb.KeyspaceState_ENABLED { + c.IndentedJSON(http.StatusBadRequest, errors.ErrAPIInvalidParam) + c.Abort() + return + } + oldCfInfo, status, err := co.GetChangefeed(c, changefeedDisplayName) if err != nil { _ = c.Error(err) @@ -827,8 +845,6 @@ func (h *OpenAPIV2) UpdateChangefeed(c *gin.Context) { } protocol, _ := config.ParseSinkProtocolFromString(util.GetOrZero(oldCfInfo.Config.Sink.Protocol)) - keyspaceManager := appcontext.GetService[keyspace.Manager](appcontext.KeyspaceManager) - kvStorage, err := keyspaceManager.GetStorage(keyspaceName) if err != nil { _ = c.Error(err) From ba8f7e2759965f97431c92e67c1ac996a607ec11 Mon Sep 17 00:00:00 2001 From: tenfyzhong Date: Tue, 21 Oct 2025 16:29:32 +0800 Subject: [PATCH 14/15] refactor(keyspace): Rename MockKeyspaceManager to MockManager - Renamed `MockKeyspaceManager` to `MockManager` to better reflect its purpose. - Updated type references in middleware and maintainer tests. - Added `ForceLoadKeyspace` method to the mock to align with the interface change. Signed-off-by: tenfyzhong --- api/middleware/authenticate_middleware.go | 2 +- api/middleware/middleware_nextgen_test.go | 16 ++--- maintainer/maintainer_manager_test.go | 6 +- pkg/keyspace/keyspace_manager_mock.go | 73 ++++++++++++++++------- 4 files changed, 62 insertions(+), 35 deletions(-) diff --git a/api/middleware/authenticate_middleware.go b/api/middleware/authenticate_middleware.go index 033e4abf6c..4797decc41 100644 --- a/api/middleware/authenticate_middleware.go +++ b/api/middleware/authenticate_middleware.go @@ -102,7 +102,7 @@ func verify(ctx *gin.Context, etcdCli etcd.Client) error { // fetchTiDBTopology parses the TiDB topology from etcd. func fetchTiDBTopology(ctx context.Context, etcdClient etcd.Client, ks string) ([]upstream.TidbInstance, error) { - keyspaceManager := appcontext.GetService[keyspace.KeyspaceManager](appcontext.KeyspaceManager) + keyspaceManager := appcontext.GetService[keyspace.Manager](appcontext.KeyspaceManager) meta, err := keyspaceManager.LoadKeyspace(ctx, ks) if err != nil { return nil, err diff --git a/api/middleware/middleware_nextgen_test.go b/api/middleware/middleware_nextgen_test.go index 1fbf8243ae..223f4d1903 100644 --- a/api/middleware/middleware_nextgen_test.go +++ b/api/middleware/middleware_nextgen_test.go @@ -37,7 +37,7 @@ func TestKeyspaceCheckerMiddleware(t *testing.T) { tests := []struct { name string keyspace string - init func(t *testing.T, mock *keyspace.MockKeyspaceManager) + init func(t *testing.T, mock *keyspace.MockManager) expectedStatus int expectedAbort bool expectedBodyContains string @@ -51,8 +51,8 @@ func TestKeyspaceCheckerMiddleware(t *testing.T) { { name: "keyspace not exist", keyspace: "not-exist", - init: func(t *testing.T, mock *keyspace.MockKeyspaceManager) { - mock.EXPECT().LoadKeyspace(gomock.Any(), "not-exist").Return(nil, errors.New(pdpb.ErrorType_ENTRY_NOT_FOUND.String())) + init: func(t *testing.T, mock *keyspace.MockManager) { + mock.EXPECT().ForceLoadKeyspace(gomock.Any(), "not-exist").Return(nil, errors.New(pdpb.ErrorType_ENTRY_NOT_FOUND.String())) }, expectedStatus: http.StatusBadRequest, expectedAbort: true, @@ -61,8 +61,8 @@ func TestKeyspaceCheckerMiddleware(t *testing.T) { { name: "internal server error", keyspace: "internal-error", - init: func(t *testing.T, mock *keyspace.MockKeyspaceManager) { - mock.EXPECT().LoadKeyspace(gomock.Any(), "internal-error").Return(nil, errors.New("internal error")) + init: func(t *testing.T, mock *keyspace.MockManager) { + mock.EXPECT().ForceLoadKeyspace(gomock.Any(), "internal-error").Return(nil, errors.New("internal error")) }, expectedStatus: http.StatusInternalServerError, expectedAbort: true, @@ -71,8 +71,8 @@ func TestKeyspaceCheckerMiddleware(t *testing.T) { { name: "success", keyspace: "success", - init: func(t *testing.T, mock *keyspace.MockKeyspaceManager) { - mock.EXPECT().LoadKeyspace(gomock.Any(), "success").Return(&keyspacepb.KeyspaceMeta{ + init: func(t *testing.T, mock *keyspace.MockManager) { + mock.EXPECT().ForceLoadKeyspace(gomock.Any(), "success").Return(&keyspacepb.KeyspaceMeta{ State: keyspacepb.KeyspaceState_ENABLED, }, nil) }, @@ -87,7 +87,7 @@ func TestKeyspaceCheckerMiddleware(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() - mock := keyspace.NewMockKeyspaceManager(ctrl) + mock := keyspace.NewMockManager(ctrl) if tt.init != nil { tt.init(t, mock) diff --git a/maintainer/maintainer_manager_test.go b/maintainer/maintainer_manager_test.go index ac51695bd9..c600ce34be 100644 --- a/maintainer/maintainer_manager_test.go +++ b/maintainer/maintainer_manager_test.go @@ -77,7 +77,7 @@ func TestMaintainerSchedulesNodeChanges(t *testing.T) { Name: "ks1", } } - keyspaceManager := keyspace.NewMockKeyspaceManager(ctrl) + keyspaceManager := keyspace.NewMockManager(ctrl) keyspaceManager.EXPECT().LoadKeyspace(gomock.Any(), gomock.Any()).Return(meta, nil) appcontext.SetService(appcontext.KeyspaceManager, keyspaceManager) @@ -304,7 +304,7 @@ func TestMaintainerBootstrapWithTablesReported(t *testing.T) { Name: "ks1", } } - keyspaceManager := keyspace.NewMockKeyspaceManager(ctrl) + keyspaceManager := keyspace.NewMockManager(ctrl) keyspaceManager.EXPECT().LoadKeyspace(gomock.Any(), gomock.Any()).Return(meta, nil) appcontext.SetService(appcontext.KeyspaceManager, keyspaceManager) @@ -440,7 +440,7 @@ func TestStopNotExistsMaintainer(t *testing.T) { Name: "ks1", } } - keyspaceManager := keyspace.NewMockKeyspaceManager(ctrl) + keyspaceManager := keyspace.NewMockManager(ctrl) keyspaceManager.EXPECT().LoadKeyspace(gomock.Any(), gomock.Any()).Return(meta, nil).AnyTimes() appcontext.SetService(appcontext.KeyspaceManager, keyspaceManager) diff --git a/pkg/keyspace/keyspace_manager_mock.go b/pkg/keyspace/keyspace_manager_mock.go index 195b044a82..84fd84d8b8 100644 --- a/pkg/keyspace/keyspace_manager_mock.go +++ b/pkg/keyspace/keyspace_manager_mock.go @@ -13,43 +13,58 @@ import ( kv "github.com/pingcap/tidb/pkg/kv" ) -// MockKeyspaceManager is a mock of KeyspaceManager interface. -type MockKeyspaceManager struct { +// MockManager is a mock of Manager interface. +type MockManager struct { ctrl *gomock.Controller - recorder *MockKeyspaceManagerMockRecorder + recorder *MockManagerMockRecorder } -// MockKeyspaceManagerMockRecorder is the mock recorder for MockKeyspaceManager. -type MockKeyspaceManagerMockRecorder struct { - mock *MockKeyspaceManager +// MockManagerMockRecorder is the mock recorder for MockManager. +type MockManagerMockRecorder struct { + mock *MockManager } -// NewMockKeyspaceManager creates a new mock instance. -func NewMockKeyspaceManager(ctrl *gomock.Controller) *MockKeyspaceManager { - mock := &MockKeyspaceManager{ctrl: ctrl} - mock.recorder = &MockKeyspaceManagerMockRecorder{mock} +// NewMockManager creates a new mock instance. +func NewMockManager(ctrl *gomock.Controller) *MockManager { + mock := &MockManager{ctrl: ctrl} + mock.recorder = &MockManagerMockRecorder{mock} return mock } // EXPECT returns an object that allows the caller to indicate expected use. -func (m *MockKeyspaceManager) EXPECT() *MockKeyspaceManagerMockRecorder { +func (m *MockManager) EXPECT() *MockManagerMockRecorder { return m.recorder } // Close mocks base method. -func (m *MockKeyspaceManager) Close() { +func (m *MockManager) Close() { m.ctrl.T.Helper() m.ctrl.Call(m, "Close") } // Close indicates an expected call of Close. -func (mr *MockKeyspaceManagerMockRecorder) Close() *gomock.Call { +func (mr *MockManagerMockRecorder) Close() *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockKeyspaceManager)(nil).Close)) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockManager)(nil).Close)) +} + +// ForceLoadKeyspace mocks base method. +func (m *MockManager) ForceLoadKeyspace(ctx context.Context, keyspace string) (*keyspacepb.KeyspaceMeta, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ForceLoadKeyspace", ctx, keyspace) + ret0, _ := ret[0].(*keyspacepb.KeyspaceMeta) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ForceLoadKeyspace indicates an expected call of ForceLoadKeyspace. +func (mr *MockManagerMockRecorder) ForceLoadKeyspace(ctx, keyspace interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ForceLoadKeyspace", reflect.TypeOf((*MockManager)(nil).ForceLoadKeyspace), ctx, keyspace) } // GetKeyspaceByID mocks base method. -func (m *MockKeyspaceManager) GetKeyspaceByID(ctx context.Context, keyspaceID uint32) (*keyspacepb.KeyspaceMeta, error) { +func (m *MockManager) GetKeyspaceByID(ctx context.Context, keyspaceID uint32) (*keyspacepb.KeyspaceMeta, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "GetKeyspaceByID", ctx, keyspaceID) ret0, _ := ret[0].(*keyspacepb.KeyspaceMeta) @@ -58,13 +73,13 @@ func (m *MockKeyspaceManager) GetKeyspaceByID(ctx context.Context, keyspaceID ui } // GetKeyspaceByID indicates an expected call of GetKeyspaceByID. -func (mr *MockKeyspaceManagerMockRecorder) GetKeyspaceByID(ctx, keyspaceID interface{}) *gomock.Call { +func (mr *MockManagerMockRecorder) GetKeyspaceByID(ctx, keyspaceID interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetKeyspaceByID", reflect.TypeOf((*MockKeyspaceManager)(nil).GetKeyspaceByID), ctx, keyspaceID) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetKeyspaceByID", reflect.TypeOf((*MockManager)(nil).GetKeyspaceByID), ctx, keyspaceID) } // GetStorage mocks base method. -func (m *MockKeyspaceManager) GetStorage(keyspace string) (kv.Storage, error) { +func (m *MockManager) GetStorage(keyspace string) (kv.Storage, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "GetStorage", keyspace) ret0, _ := ret[0].(kv.Storage) @@ -73,13 +88,13 @@ func (m *MockKeyspaceManager) GetStorage(keyspace string) (kv.Storage, error) { } // GetStorage indicates an expected call of GetStorage. -func (mr *MockKeyspaceManagerMockRecorder) GetStorage(keyspace interface{}) *gomock.Call { +func (mr *MockManagerMockRecorder) GetStorage(keyspace interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetStorage", reflect.TypeOf((*MockKeyspaceManager)(nil).GetStorage), keyspace) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetStorage", reflect.TypeOf((*MockManager)(nil).GetStorage), keyspace) } // LoadKeyspace mocks base method. -func (m *MockKeyspaceManager) LoadKeyspace(ctx context.Context, keyspace string) (*keyspacepb.KeyspaceMeta, error) { +func (m *MockManager) LoadKeyspace(ctx context.Context, keyspace string) (*keyspacepb.KeyspaceMeta, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "LoadKeyspace", ctx, keyspace) ret0, _ := ret[0].(*keyspacepb.KeyspaceMeta) @@ -88,7 +103,19 @@ func (m *MockKeyspaceManager) LoadKeyspace(ctx context.Context, keyspace string) } // LoadKeyspace indicates an expected call of LoadKeyspace. -func (mr *MockKeyspaceManagerMockRecorder) LoadKeyspace(ctx, keyspace interface{}) *gomock.Call { +func (mr *MockManagerMockRecorder) LoadKeyspace(ctx, keyspace interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LoadKeyspace", reflect.TypeOf((*MockManager)(nil).LoadKeyspace), ctx, keyspace) +} + +// Run mocks base method. +func (m *MockManager) Run() { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Run") +} + +// Run indicates an expected call of Run. +func (mr *MockManagerMockRecorder) Run() *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LoadKeyspace", reflect.TypeOf((*MockKeyspaceManager)(nil).LoadKeyspace), ctx, keyspace) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Run", reflect.TypeOf((*MockManager)(nil).Run)) } From 86d884d3b0438974229fd3bdb371e1313e31f6d4 Mon Sep 17 00:00:00 2001 From: tenfyzhong Date: Tue, 21 Oct 2025 17:08:20 +0800 Subject: [PATCH 15/15] docs(makefile): Add usage instructions for unit_test_pkg targets - Add a comment to `unit_test_pkg` target explaining how to specify a package. - Add a comment to `unit_test_pkg_next_gen` target explaining how to specify a package. Signed-off-by: tenfyzhong --- Makefile | 2 ++ 1 file changed, 2 insertions(+) diff --git a/Makefile b/Makefile index 7589b0345f..ad79712812 100644 --- a/Makefile +++ b/Makefile @@ -281,6 +281,7 @@ unit_test_in_verify_ci_next_gen: check_failpoint_ctl tools/bin/gotestsum tools/b tools/bin/gocov convert "$(TEST_DIR)/cov.unit.out" | tools/bin/gocov-xml > cdc-coverage.xml $(FAILPOINT_DISABLE) +# Usage: PKG=./api/middleware/... make unit_test_pkg unit_test_pkg: check_failpoint_ctl tools/bin/gotestsum tools/bin/gocov tools/bin/gocov-xml mkdir -p "$(TEST_DIR)" $(FAILPOINT_ENABLE) @@ -294,6 +295,7 @@ unit_test_pkg: check_failpoint_ctl tools/bin/gotestsum tools/bin/gocov tools/bin tools/bin/gocov convert "$(TEST_DIR)/cov.unit.out" | tools/bin/gocov-xml > cdc-coverage.xml $(FAILPOINT_DISABLE) +# Usage: PKG=./api/middleware/... make unit_test_pkg_next_gen unit_test_pkg_next_gen: check_failpoint_ctl tools/bin/gotestsum tools/bin/gocov tools/bin/gocov-xml mkdir -p "$(TEST_DIR)" $(FAILPOINT_ENABLE)