Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ UT_PACKAGES_DISPATCHER := ./pkg/sink/cloudstorage/... ./pkg/sink/mysql/... ./pkg
UT_PACKAGES_MAINTAINER := ./maintainer/... ./pkg/scheduler/...
UT_PACKAGES_COORDINATOR := ./coordinator/...
UT_PACKAGES_LOGSERVICE := ./logservice/...
UT_PACKAGES_OTHERS := ./pkg/eventservice/... ./pkg/version/... ./utils/dynstream/... ./pkg/common/event/... ./pkg/common/... ./api/middleware/...
UT_PACKAGES_OTHERS := ./pkg/eventservice/... ./pkg/version/... ./utils/dynstream/... ./pkg/common/event/... ./pkg/common/... ./api/middleware/... ./pkg/keyspace/...

include tools/Makefile

Expand Down Expand Up @@ -281,6 +281,7 @@ unit_test_in_verify_ci_next_gen: check_failpoint_ctl tools/bin/gotestsum tools/b
tools/bin/gocov convert "$(TEST_DIR)/cov.unit.out" | tools/bin/gocov-xml > cdc-coverage.xml
$(FAILPOINT_DISABLE)

# Usage: PKG=./api/middleware/... make unit_test_pkg
unit_test_pkg: check_failpoint_ctl tools/bin/gotestsum tools/bin/gocov tools/bin/gocov-xml
mkdir -p "$(TEST_DIR)"
$(FAILPOINT_ENABLE)
Expand All @@ -294,6 +295,7 @@ unit_test_pkg: check_failpoint_ctl tools/bin/gotestsum tools/bin/gocov tools/bin
tools/bin/gocov convert "$(TEST_DIR)/cov.unit.out" | tools/bin/gocov-xml > cdc-coverage.xml
$(FAILPOINT_DISABLE)

# Usage: PKG=./api/middleware/... make unit_test_pkg_next_gen
unit_test_pkg_next_gen: check_failpoint_ctl tools/bin/gotestsum tools/bin/gocov tools/bin/gocov-xml
mkdir -p "$(TEST_DIR)"
$(FAILPOINT_ENABLE)
Expand Down
2 changes: 1 addition & 1 deletion api/middleware/authenticate_middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func verify(ctx *gin.Context, etcdCli etcd.Client) error {

// fetchTiDBTopology parses the TiDB topology from etcd.
func fetchTiDBTopology(ctx context.Context, etcdClient etcd.Client, ks string) ([]upstream.TidbInstance, error) {
keyspaceManager := appcontext.GetService[keyspace.KeyspaceManager](appcontext.KeyspaceManager)
keyspaceManager := appcontext.GetService[keyspace.Manager](appcontext.KeyspaceManager)
meta, err := keyspaceManager.LoadKeyspace(ctx, ks)
if err != nil {
return nil, err
Expand Down
11 changes: 2 additions & 9 deletions api/middleware/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"time"

"github.com/gin-gonic/gin"
"github.com/pingcap/kvproto/pkg/keyspacepb"
"github.com/pingcap/log"
"github.com/pingcap/ticdc/pkg/api"
appcontext "github.com/pingcap/ticdc/pkg/common/context"
Expand Down Expand Up @@ -238,8 +237,8 @@ func KeyspaceCheckerMiddleware() gin.HandlerFunc {
return
}

keyspaceManager := appcontext.GetService[keyspace.KeyspaceManager](appcontext.KeyspaceManager)
meta, err := keyspaceManager.LoadKeyspace(c.Request.Context(), ks)
keyspaceManager := appcontext.GetService[keyspace.Manager](appcontext.KeyspaceManager)
_, err := keyspaceManager.ForceLoadKeyspace(c.Request.Context(), ks)
if errors.IsKeyspaceNotExistError(err) {
c.IndentedJSON(http.StatusBadRequest, errors.ErrAPIInvalidParam)
c.Abort()
Expand All @@ -250,12 +249,6 @@ func KeyspaceCheckerMiddleware() gin.HandlerFunc {
return
}

if meta.State != keyspacepb.KeyspaceState_ENABLED {
c.IndentedJSON(http.StatusBadRequest, errors.ErrAPIInvalidParam)
c.Abort()
return
}

c.Next()
}
}
16 changes: 8 additions & 8 deletions api/middleware/middleware_nextgen_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func TestKeyspaceCheckerMiddleware(t *testing.T) {
tests := []struct {
name string
keyspace string
init func(t *testing.T, mock *keyspace.MockKeyspaceManager)
init func(t *testing.T, mock *keyspace.MockManager)
expectedStatus int
expectedAbort bool
expectedBodyContains string
Expand All @@ -51,8 +51,8 @@ func TestKeyspaceCheckerMiddleware(t *testing.T) {
{
name: "keyspace not exist",
keyspace: "not-exist",
init: func(t *testing.T, mock *keyspace.MockKeyspaceManager) {
mock.EXPECT().LoadKeyspace(gomock.Any(), "not-exist").Return(nil, errors.New(pdpb.ErrorType_ENTRY_NOT_FOUND.String()))
init: func(t *testing.T, mock *keyspace.MockManager) {
mock.EXPECT().ForceLoadKeyspace(gomock.Any(), "not-exist").Return(nil, errors.New(pdpb.ErrorType_ENTRY_NOT_FOUND.String()))
},
expectedStatus: http.StatusBadRequest,
expectedAbort: true,
Expand All @@ -61,8 +61,8 @@ func TestKeyspaceCheckerMiddleware(t *testing.T) {
{
name: "internal server error",
keyspace: "internal-error",
init: func(t *testing.T, mock *keyspace.MockKeyspaceManager) {
mock.EXPECT().LoadKeyspace(gomock.Any(), "internal-error").Return(nil, errors.New("internal error"))
init: func(t *testing.T, mock *keyspace.MockManager) {
mock.EXPECT().ForceLoadKeyspace(gomock.Any(), "internal-error").Return(nil, errors.New("internal error"))
},
expectedStatus: http.StatusInternalServerError,
expectedAbort: true,
Expand All @@ -71,8 +71,8 @@ func TestKeyspaceCheckerMiddleware(t *testing.T) {
{
name: "success",
keyspace: "success",
init: func(t *testing.T, mock *keyspace.MockKeyspaceManager) {
mock.EXPECT().LoadKeyspace(gomock.Any(), "success").Return(&keyspacepb.KeyspaceMeta{
init: func(t *testing.T, mock *keyspace.MockManager) {
mock.EXPECT().ForceLoadKeyspace(gomock.Any(), "success").Return(&keyspacepb.KeyspaceMeta{
State: keyspacepb.KeyspaceState_ENABLED,
}, nil)
},
Expand All @@ -87,7 +87,7 @@ func TestKeyspaceCheckerMiddleware(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

mock := keyspace.NewMockKeyspaceManager(ctrl)
mock := keyspace.NewMockManager(ctrl)

if tt.init != nil {
tt.init(t, mock)
Expand Down
33 changes: 28 additions & 5 deletions api/v2/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"time"

"github.com/gin-gonic/gin"
"github.com/pingcap/kvproto/pkg/keyspacepb"
"github.com/pingcap/log"
"github.com/pingcap/ticdc/api/middleware"
"github.com/pingcap/ticdc/downstreamadapter/sink"
Expand Down Expand Up @@ -103,13 +104,19 @@ func (h *OpenAPIV2) CreateChangefeed(c *gin.Context) {
return
}

keyspaceManager := appcontext.GetService[keyspace.KeyspaceManager](appcontext.KeyspaceManager)
keyspaceManager := appcontext.GetService[keyspace.Manager](appcontext.KeyspaceManager)
keyspaceMeta, err := keyspaceManager.LoadKeyspace(ctx, keyspaceName)
if err != nil {
_ = c.Error(errors.WrapError(errors.ErrKeyspaceNotFound, err))
return
}

if keyspaceMeta.State != keyspacepb.KeyspaceState_ENABLED {
c.IndentedJSON(http.StatusBadRequest, errors.ErrAPIInvalidParam)
c.Abort()
return
}

co, err := h.server.GetCoordinator()
if err != nil {
_ = c.Error(err)
Expand Down Expand Up @@ -381,7 +388,7 @@ func (h *OpenAPIV2) VerifyTable(c *gin.Context) {
}
protocol, _ := config.ParseSinkProtocolFromString(util.GetOrZero(replicaCfg.Sink.Protocol))

keyspaceManager := appcontext.GetService[keyspace.KeyspaceManager](appcontext.KeyspaceManager)
keyspaceManager := appcontext.GetService[keyspace.Manager](appcontext.KeyspaceManager)
keyspaceName := GetKeyspaceValueWithDefault(c)
kvStorage, err := keyspaceManager.GetStorage(keyspaceName)
if err != nil {
Expand Down Expand Up @@ -663,13 +670,19 @@ func (h *OpenAPIV2) ResumeChangefeed(c *gin.Context) {
newCheckpointTs = cfg.OverwriteCheckpointTs
}

keyspaceManager := appcontext.GetService[keyspace.KeyspaceManager](appcontext.KeyspaceManager)
keyspaceManager := appcontext.GetService[keyspace.Manager](appcontext.KeyspaceManager)
keyspaceMeta, err := keyspaceManager.LoadKeyspace(ctx, keyspaceName)
if err != nil {
_ = c.Error(errors.WrapError(errors.ErrKeyspaceNotFound, err))
return
}

if keyspaceMeta.State != keyspacepb.KeyspaceState_ENABLED {
c.IndentedJSON(http.StatusBadRequest, errors.ErrAPIInvalidParam)
c.Abort()
return
}

resumeGcServiceID := h.server.GetEtcdClient().GetEnsureGCServiceID(gc.EnsureGCServiceResuming)
if err := verifyResumeChangefeedConfig(
ctx,
Expand Down Expand Up @@ -750,6 +763,18 @@ func (h *OpenAPIV2) UpdateChangefeed(c *gin.Context) {
return
}

keyspaceManager := appcontext.GetService[keyspace.Manager](appcontext.KeyspaceManager)
keyspaceMeta, err := keyspaceManager.LoadKeyspace(ctx, keyspaceName)
if err != nil {
_ = c.Error(errors.WrapError(errors.ErrKeyspaceNotFound, err))
return
}
if keyspaceMeta.State != keyspacepb.KeyspaceState_ENABLED {
c.IndentedJSON(http.StatusBadRequest, errors.ErrAPIInvalidParam)
c.Abort()
return
}

oldCfInfo, status, err := co.GetChangefeed(c, changefeedDisplayName)
if err != nil {
_ = c.Error(err)
Expand Down Expand Up @@ -820,8 +845,6 @@ func (h *OpenAPIV2) UpdateChangefeed(c *gin.Context) {
}
protocol, _ := config.ParseSinkProtocolFromString(util.GetOrZero(oldCfInfo.Config.Sink.Protocol))

keyspaceManager := appcontext.GetService[keyspace.KeyspaceManager](appcontext.KeyspaceManager)

kvStorage, err := keyspaceManager.GetStorage(keyspaceName)
if err != nil {
_ = c.Error(err)
Expand Down
4 changes: 2 additions & 2 deletions api/v2/unsafe.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (h *OpenAPIV2) ResolveLock(c *gin.Context) {

keyspaceName := GetKeyspaceValueWithDefault(c)

keyspaceManager := appcontext.GetService[keyspace.KeyspaceManager](appcontext.KeyspaceManager)
keyspaceManager := appcontext.GetService[keyspace.Manager](appcontext.KeyspaceManager)
// The ctx's lifecycle is the same as the HTTP request.
// The schema store may use the context to fetch database information asynchronously.
// Therefore, we cannot use the context of the HTTP request.
Expand Down Expand Up @@ -93,7 +93,7 @@ func (h *OpenAPIV2) DeleteServiceGcSafePoint(c *gin.Context) {
defer pdClient.Close()

keyspaceName := GetKeyspaceValueWithDefault(c)
keyspaceManager := appcontext.GetService[keyspace.KeyspaceManager](appcontext.KeyspaceManager)
keyspaceManager := appcontext.GetService[keyspace.Manager](appcontext.KeyspaceManager)
keyspaceMeta, err := keyspaceManager.LoadKeyspace(c.Request.Context(), keyspaceName)
if err != nil {
_ = c.Error(cerror.WrapError(cerror.ErrKeyspaceNotFound, err))
Expand Down
4 changes: 2 additions & 2 deletions coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ func (c *coordinator) handleStateChange(
log.Info("changefeed is resumed or created successfully, try to delete its safeguard gc safepoint",
zap.String("changefeed", event.changefeedID.String()))

keyspaceManager := appcontext.GetService[keyspace.KeyspaceManager](appcontext.KeyspaceManager)
keyspaceManager := appcontext.GetService[keyspace.Manager](appcontext.KeyspaceManager)
keyspaceMeta, err := keyspaceManager.LoadKeyspace(ctx, event.changefeedID.Keyspace())
if err != nil {
log.Warn("failed to load keyspace", zap.String("keyspace", event.changefeedID.Keyspace()), zap.Error(err))
Expand Down Expand Up @@ -473,7 +473,7 @@ func (c *coordinator) updateAllKeyspaceGcBarriers(ctx context.Context) error {

func (c *coordinator) updateKeyspaceGcBarrier(ctx context.Context, barrierMap map[string]uint64, keyspaceName string) error {
// Obtain keyspace metadata from PD
keyspaceManager := appcontext.GetService[keyspace.KeyspaceManager](appcontext.KeyspaceManager)
keyspaceManager := appcontext.GetService[keyspace.Manager](appcontext.KeyspaceManager)
keyspaceMeta, err := keyspaceManager.LoadKeyspace(ctx, keyspaceName)
if err != nil {
return cerror.WrapError(cerror.ErrLoadKeyspaceFailed, err)
Expand Down
2 changes: 1 addition & 1 deletion downstreamadapter/dispatchermanager/dispatcher_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func NewDispatcherManager(
ctx, cancel := context.WithCancel(context.Background())
pdClock := appcontext.GetService[pdutil.Clock](appcontext.DefaultPDClock)

keyspaceManager := appcontext.GetService[keyspace.KeyspaceManager](appcontext.KeyspaceManager)
keyspaceManager := appcontext.GetService[keyspace.Manager](appcontext.KeyspaceManager)
keyspaceMeta, err := keyspaceManager.LoadKeyspace(ctx, changefeedID.Keyspace())
if err != nil {
cancel()
Expand Down
4 changes: 2 additions & 2 deletions logservice/schemastore/schema_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ func (s *schemaStore) getKeyspaceSchemaStore(keyspaceID uint32) (*keyspaceSchema

// If the schemastore does not contain the keyspace, it means it is not a maintainer node.
// It should register the keyspace when it try to get keyspace schema_store.
keyspaceManager := appcontext.GetService[keyspace.KeyspaceManager](appcontext.KeyspaceManager)
keyspaceManager := appcontext.GetService[keyspace.Manager](appcontext.KeyspaceManager)
keyspaceMeta, err := keyspaceManager.GetKeyspaceByID(ctx, keyspaceID)
if err != nil {
return nil, errors.Trace(err)
Expand Down Expand Up @@ -443,7 +443,7 @@ func (s *schemaStore) RegisterKeyspace(
s.keyspaceLocker.Lock()
defer s.keyspaceLocker.Unlock()

keyspaceManager := appcontext.GetService[keyspace.KeyspaceManager](appcontext.KeyspaceManager)
keyspaceManager := appcontext.GetService[keyspace.Manager](appcontext.KeyspaceManager)
keyspaceMeta, err := keyspaceManager.LoadKeyspace(ctx, keyspaceName)
if err != nil {
return errors.Trace(err)
Expand Down
2 changes: 1 addition & 1 deletion logservice/txnutil/lock_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (r *resolver) Resolve(ctx context.Context, keyspaceID uint32, regionID uint
Limit: scanLockLimit,
})

keyspaceManager := appcontext.GetService[keyspace.KeyspaceManager](appcontext.KeyspaceManager)
keyspaceManager := appcontext.GetService[keyspace.Manager](appcontext.KeyspaceManager)
keyspaceMeta, err := keyspaceManager.GetKeyspaceByID(ctx, keyspaceID)
if err != nil {
return err
Expand Down
4 changes: 2 additions & 2 deletions maintainer/maintainer_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ func (m *Manager) onAddMaintainerRequest(req *heartbeatpb.AddMaintainerRequest)
}

ctx := context.Background()
keyspaceManager := appcontext.GetService[keyspace.KeyspaceManager](appcontext.KeyspaceManager)
keyspaceManager := appcontext.GetService[keyspace.Manager](appcontext.KeyspaceManager)
keyspaceMeta, err := keyspaceManager.LoadKeyspace(ctx, cfID.Keyspace())
if err != nil {
// BUG tenfyzhong 2025-09-11 17:29:08 how to process err
Expand Down Expand Up @@ -261,7 +261,7 @@ func (m *Manager) onRemoveMaintainerRequest(msg *messaging.TargetMessage) *heart
}

ctx := context.Background()
keyspaceManager := appcontext.GetService[keyspace.KeyspaceManager](appcontext.KeyspaceManager)
keyspaceManager := appcontext.GetService[keyspace.Manager](appcontext.KeyspaceManager)
keyspaceMeta, err := keyspaceManager.LoadKeyspace(ctx, cfID.Keyspace())
if err != nil {
// BUG tenfyzhong 2025-09-11 17:29:08 how to process err
Expand Down
6 changes: 3 additions & 3 deletions maintainer/maintainer_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func TestMaintainerSchedulesNodeChanges(t *testing.T) {
Name: "ks1",
}
}
keyspaceManager := keyspace.NewMockKeyspaceManager(ctrl)
keyspaceManager := keyspace.NewMockManager(ctrl)
keyspaceManager.EXPECT().LoadKeyspace(gomock.Any(), gomock.Any()).Return(meta, nil)
appcontext.SetService(appcontext.KeyspaceManager, keyspaceManager)

Expand Down Expand Up @@ -304,7 +304,7 @@ func TestMaintainerBootstrapWithTablesReported(t *testing.T) {
Name: "ks1",
}
}
keyspaceManager := keyspace.NewMockKeyspaceManager(ctrl)
keyspaceManager := keyspace.NewMockManager(ctrl)
keyspaceManager.EXPECT().LoadKeyspace(gomock.Any(), gomock.Any()).Return(meta, nil)
appcontext.SetService(appcontext.KeyspaceManager, keyspaceManager)

Expand Down Expand Up @@ -440,7 +440,7 @@ func TestStopNotExistsMaintainer(t *testing.T) {
Name: "ks1",
}
}
keyspaceManager := keyspace.NewMockKeyspaceManager(ctrl)
keyspaceManager := keyspace.NewMockManager(ctrl)
keyspaceManager.EXPECT().LoadKeyspace(gomock.Any(), gomock.Any()).Return(meta, nil).AnyTimes()

appcontext.SetService(appcontext.KeyspaceManager, keyspaceManager)
Expand Down
Loading