Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 2 additions & 20 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -123,14 +123,6 @@ FAILPOINT_DISABLE := $$(echo $(FAILPOINT_DIR) | xargs $(FAILPOINT) disable >/dev
# gotestsum -p parameter for unit tests
P=3

# The following packages are used in unit tests.
# Add new packages here if you want to include them in unit tests.
UT_PACKAGES_DISPATCHER := ./pkg/sink/cloudstorage/... ./pkg/sink/mysql/... ./pkg/sink/util/... ./downstreamadapter/sink/... ./downstreamadapter/dispatcher/... ./downstreamadapter/dispatchermanager/... ./downstreamadapter/eventcollector/... ./pkg/sink/...
UT_PACKAGES_MAINTAINER := ./maintainer/... ./pkg/scheduler/...
UT_PACKAGES_COORDINATOR := ./coordinator/...
UT_PACKAGES_LOGSERVICE := ./logservice/...
UT_PACKAGES_OTHERS := ./pkg/eventservice/... ./pkg/version/... ./utils/dynstream/... ./pkg/common/event/... ./pkg/common/... ./api/middleware/...

include tools/Makefile

go-generate: tools/bin/msgp tools/bin/stringer tools/bin/mockery
Expand Down Expand Up @@ -259,12 +251,7 @@ unit_test_in_verify_ci: check_failpoint_ctl tools/bin/gotestsum tools/bin/gocov
@export log_level=error;\
CGO_ENABLED=1 tools/bin/gotestsum --junitfile cdc-junit-report.xml -- -v -timeout 300s -p $(P) --race --tags=intest \
-parallel=16 \
-covermode=atomic -coverprofile="$(TEST_DIR)/cov.unit.out" \
$(UT_PACKAGES_DISPATCHER) \
$(UT_PACKAGES_MAINTAINER) \
$(UT_PACKAGES_COORDINATOR) \
$(UT_PACKAGES_LOGSERVICE) \
$(UT_PACKAGES_OTHERS) \
-covermode=atomic -coverprofile="$(TEST_DIR)/cov.unit.out" $(PACKAGES) \
|| { $(FAILPOINT_DISABLE); exit 1; }
tools/bin/gocov convert "$(TEST_DIR)/cov.unit.out" | tools/bin/gocov-xml > cdc-coverage.xml
$(FAILPOINT_DISABLE)
Expand All @@ -276,12 +263,7 @@ unit_test_in_verify_ci_next_gen: check_failpoint_ctl tools/bin/gotestsum tools/b
@export log_level=error;\
CGO_ENABLED=1 tools/bin/gotestsum --junitfile cdc-junit-report.xml -- -v -timeout 300s -p $(P) --race --tags=intest,nextgen \
-parallel=16 \
-covermode=atomic -coverprofile="$(TEST_DIR)/cov.unit.out" \
$(UT_PACKAGES_DISPATCHER) \
$(UT_PACKAGES_MAINTAINER) \
$(UT_PACKAGES_COORDINATOR) \
$(UT_PACKAGES_LOGSERVICE) \
$(UT_PACKAGES_OTHERS) \
-covermode=atomic -coverprofile="$(TEST_DIR)/cov.unit.out" $(PACKAGES) \
|| { $(FAILPOINT_DISABLE); exit 1; }
tools/bin/gocov convert "$(TEST_DIR)/cov.unit.out" | tools/bin/gocov-xml > cdc-coverage.xml
$(FAILPOINT_DISABLE)
Expand Down
66 changes: 62 additions & 4 deletions api/owner/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/ticdc/api/middleware"
"github.com/pingcap/ticdc/pkg/api"
"github.com/pingcap/ticdc/pkg/config"
cerror "github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/logger"
"github.com/pingcap/tiflow/cdc/api"
"github.com/pingcap/tiflow/cdc/capture"
"github.com/pingcap/tiflow/cdc/model"
"github.com/tikv/client-go/v2/oracle"
Expand Down Expand Up @@ -132,7 +132,7 @@ func (h *ownerAPI) handleChangefeedAdmin(w http.ResponseWriter, req *http.Reques
Type: model.AdminJobType(typ),
}

err = api.HandleOwnerJob(req.Context(), h.capture, job)
err = HandleOwnerJob(req.Context(), h.capture, job)
handleOwnerResp(w, err)
}

Expand All @@ -155,7 +155,7 @@ func (h *ownerAPI) handleRebalanceTrigger(w http.ResponseWriter, req *http.Reque
return
}

err = api.HandleOwnerBalance(req.Context(), h.capture, changefeedID)
err = HandleOwnerBalance(req.Context(), h.capture, changefeedID)
handleOwnerResp(w, err)
}

Expand Down Expand Up @@ -192,7 +192,7 @@ func (h *ownerAPI) handleMoveTable(w http.ResponseWriter, req *http.Request) {
return
}

err = api.HandleOwnerScheduleTable(
err = HandleOwnerScheduleTable(
req.Context(), h.capture, changefeedID, to, tableID)
handleOwnerResp(w, err)
}
Expand Down Expand Up @@ -268,3 +268,61 @@ func HandleAdminLogLevel(w http.ResponseWriter, r *http.Request) {

api.WriteData(w, struct{}{})
}

// HandleOwnerJob enqueue the admin job
func HandleOwnerJob(
ctx context.Context, capture capture.Capture, job model.AdminJob,
) error {
// Use buffered channel to prevent blocking owner from happening.
done := make(chan error, 1)
o, err := capture.GetOwner()
if err != nil {
return errors.Trace(err)
}
o.EnqueueJob(job, done)
select {
case <-ctx.Done():
return errors.Trace(ctx.Err())
case err := <-done:
return errors.Trace(err)
}
}

// HandleOwnerBalance balance the changefeed tables
func HandleOwnerBalance(
ctx context.Context, capture capture.Capture, changefeedID model.ChangeFeedID,
) error {
// Use buffered channel to prevernt blocking owner.
done := make(chan error, 1)
o, err := capture.GetOwner()
if err != nil {
return errors.Trace(err)
}
o.RebalanceTables(changefeedID, done)
select {
case <-ctx.Done():
return errors.Trace(ctx.Err())
case err := <-done:
return errors.Trace(err)
}
}

// HandleOwnerScheduleTable schedule tables
func HandleOwnerScheduleTable(
ctx context.Context, capture capture.Capture,
changefeedID model.ChangeFeedID, captureID string, tableID int64,
) error {
// Use buffered channel to prevent blocking owner.
done := make(chan error, 1)
o, err := capture.GetOwner()
if err != nil {
return errors.Trace(err)
}
o.ScheduleTable(changefeedID, captureID, tableID, done)
select {
case <-ctx.Done():
return errors.Trace(ctx.Err())
case err := <-done:
return errors.Trace(err)
}
}
6 changes: 3 additions & 3 deletions cmd/cdc/cli/cli_changefeed_create.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,13 +185,13 @@ func (o *createChangefeedOptions) completeReplicaCfg() error {
// validate checks that the provided attach options are specified.
func (o *createChangefeedOptions) validate(cmd *cobra.Command) error {
if o.timezone != "SYSTEM" {
cmd.Printf(color.HiYellowString("[WARN] --tz is deprecated in changefeed settings.\n"))
cmd.Printf("%s", color.HiYellowString("[WARN] --tz is deprecated in changefeed settings.\n"))
}

// user is not allowed to set sort-dir at changefeed level
if o.commonChangefeedOptions.sortDir != "" {
cmd.Printf(color.HiYellowString("[WARN] --sort-dir is deprecated in changefeed settings. " +
"Please use `cdc server --data-dir` to start the cdc server if possible, sort-dir will be set automatically. " +
cmd.Printf("%s", color.HiYellowString("[WARN] --sort-dir is deprecated in changefeed settings. "+
"Please use `cdc server --data-dir` to start the cdc server if possible, sort-dir will be set automatically. "+
"The --sort-dir here will be no-op\n"))
return errors.New("creating changefeed with `--sort-dir`, it's invalid")
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/cdc/cli/cli_unsafe_resolve_lock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,6 @@ func TestUnsafeResolveLockCli(t *testing.T) {
"--upstream-cert=cer",
"--upstream-key=key",
}
f.unsafes.EXPECT().ResolveLock(gomock.Any(), gomock.Any()).Return(nil)
f.unsafes.EXPECT().ResolveLock(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil)
require.Nil(t, cmd.Execute())
}
2 changes: 1 addition & 1 deletion cmd/cdc/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ func (o *options) complete(command *cobra.Command) error {
}

if cfg.DataDir == "" {
command.Printf(color.HiYellowString("[WARN] TiCDC server data-dir is not set. " +
command.Printf("%s", color.HiYellowString("[WARN] TiCDC server data-dir is not set. "+
"Please use `cdc server --data-dir` to start the cdc server if possible.\n"))
}

Expand Down
2 changes: 1 addition & 1 deletion downstreamadapter/eventcollector/event_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (

"github.com/pingcap/log"
"github.com/pingcap/ticdc/downstreamadapter/dispatcher"
"github.com/pingcap/ticdc/pkg/chann"
"github.com/pingcap/ticdc/pkg/common"
appcontext "github.com/pingcap/ticdc/pkg/common/context"
"github.com/pingcap/ticdc/pkg/common/event"
Expand All @@ -30,6 +29,7 @@ import (
"github.com/pingcap/ticdc/pkg/messaging"
"github.com/pingcap/ticdc/pkg/metrics"
"github.com/pingcap/ticdc/pkg/node"
"github.com/pingcap/ticdc/utils/chann"
"github.com/pingcap/ticdc/utils/dynstream"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
Expand Down
2 changes: 1 addition & 1 deletion downstreamadapter/eventcollector/log_coordinator_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@ import (
"github.com/pingcap/log"
"github.com/pingcap/ticdc/downstreamadapter/dispatcher"
"github.com/pingcap/ticdc/logservice/logservicepb"
"github.com/pingcap/ticdc/pkg/chann"
"github.com/pingcap/ticdc/pkg/common"
appcontext "github.com/pingcap/ticdc/pkg/common/context"
"github.com/pingcap/ticdc/pkg/config"
"github.com/pingcap/ticdc/pkg/messaging"
"github.com/pingcap/ticdc/pkg/node"
"github.com/pingcap/ticdc/utils/chann"
"go.uber.org/zap"
)

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ require (
github.com/mailru/easyjson v0.7.7
github.com/phayes/freeport v0.0.0-20180830031419-95f893ade6f2
github.com/pierrec/lz4/v4 v4.1.18
github.com/pingcap/check v0.0.0-20211026125417-57bd13f7b5f0
github.com/pingcap/errors v0.11.5-0.20250523034308-74f78ae071ee
github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86
github.com/pingcap/kvproto v0.0.0-20250915095348-efd5134a6d6c
Expand Down Expand Up @@ -280,7 +281,6 @@ require (
github.com/philhofer/fwd v1.1.1 // indirect
github.com/pierrec/lz4 v2.6.1+incompatible // indirect
github.com/pingcap/badger v1.5.1-0.20241015064302-38533b6cbf8d // indirect
github.com/pingcap/check v0.0.0-20211026125417-57bd13f7b5f0 // indirect
github.com/pingcap/fn v1.0.0 // indirect
github.com/pingcap/goleveldb v0.0.0-20191226122134-f82aafb29989 // indirect
github.com/pingcap/metering_sdk v0.0.0-20250918015914-468cd6feb1dc // indirect
Expand Down
2 changes: 1 addition & 1 deletion logservice/coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@ import (
"github.com/pingcap/log"
"github.com/pingcap/ticdc/heartbeatpb"
"github.com/pingcap/ticdc/logservice/logservicepb"
"github.com/pingcap/ticdc/pkg/chann"
"github.com/pingcap/ticdc/pkg/common"
appcontext "github.com/pingcap/ticdc/pkg/common/context"
"github.com/pingcap/ticdc/pkg/messaging"
"github.com/pingcap/ticdc/pkg/metrics"
"github.com/pingcap/ticdc/pkg/node"
"github.com/pingcap/ticdc/pkg/pdutil"
"github.com/pingcap/ticdc/server/watcher"
"github.com/pingcap/ticdc/utils/chann"
"github.com/prometheus/client_golang/prometheus"
"github.com/tikv/client-go/v2/oracle"
"go.uber.org/zap"
Expand Down
15 changes: 7 additions & 8 deletions maintainer/operator/operator_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"github.com/pingcap/ticdc/pkg/node"
"github.com/pingcap/ticdc/pkg/scheduler/operator"
"github.com/pingcap/ticdc/server/watcher"
"github.com/pingcap/tiflow/cdc/model"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -247,8 +246,8 @@ func (oc *Controller) pollQueueingOperator() (
delete(oc.lastWarnTime, opID)
oc.mu.Unlock()

metrics.OperatorCount.WithLabelValues(model.DefaultNamespace, oc.changefeedID.Name(), op.Type(), common.StringMode(oc.mode)).Dec()
metrics.OperatorDuration.WithLabelValues(model.DefaultNamespace, oc.changefeedID.Name(), op.Type(), common.StringMode(oc.mode)).Observe(time.Since(item.CreatedAt).Seconds())
metrics.OperatorCount.WithLabelValues(common.DefaultKeyspaceNamme, oc.changefeedID.Name(), op.Type(), common.StringMode(oc.mode)).Dec()
metrics.OperatorDuration.WithLabelValues(common.DefaultKeyspaceNamme, oc.changefeedID.Name(), op.Type(), common.StringMode(oc.mode)).Observe(time.Since(item.CreatedAt).Seconds())
log.Info("operator finished",
zap.String("role", oc.role),
zap.String("changefeed", oc.changefeedID.Name()),
Expand Down Expand Up @@ -331,8 +330,8 @@ func (oc *Controller) pushOperator(op operator.Operator[common.DispatcherID, *he
heap.Push(&oc.runningQueue, withTime)
oc.mu.Unlock()

metrics.OperatorCount.WithLabelValues(model.DefaultNamespace, oc.changefeedID.Name(), op.Type(), common.StringMode(oc.mode)).Inc()
metrics.TotalOperatorCount.WithLabelValues(model.DefaultNamespace, oc.changefeedID.Name(), op.Type(), common.StringMode(oc.mode)).Inc()
metrics.OperatorCount.WithLabelValues(common.DefaultKeyspaceNamme, oc.changefeedID.Name(), op.Type(), common.StringMode(oc.mode)).Inc()
metrics.TotalOperatorCount.WithLabelValues(common.DefaultKeyspaceNamme, oc.changefeedID.Name(), op.Type(), common.StringMode(oc.mode)).Inc()
}

func (oc *Controller) checkAffectedNodes(op operator.Operator[common.DispatcherID, *heartbeatpb.TableSpanStatus]) {
Expand Down Expand Up @@ -449,9 +448,9 @@ func (oc *Controller) Close() {
opTypes := []string{"occupy", "merge", "add", "remove", "move", "split", "merge"}

for _, opType := range opTypes {
metrics.OperatorCount.DeleteLabelValues(model.DefaultNamespace, oc.changefeedID.Name(), opType, common.StringMode(oc.mode))
metrics.TotalOperatorCount.DeleteLabelValues(model.DefaultNamespace, oc.changefeedID.Name(), opType, common.StringMode(oc.mode))
metrics.OperatorDuration.DeleteLabelValues(model.DefaultNamespace, oc.changefeedID.Name(), opType, common.StringMode(oc.mode))
metrics.OperatorCount.DeleteLabelValues(common.DefaultKeyspaceNamme, oc.changefeedID.Name(), opType, common.StringMode(oc.mode))
metrics.TotalOperatorCount.DeleteLabelValues(common.DefaultKeyspaceNamme, oc.changefeedID.Name(), opType, common.StringMode(oc.mode))
metrics.OperatorDuration.DeleteLabelValues(common.DefaultKeyspaceNamme, oc.changefeedID.Name(), opType, common.StringMode(oc.mode))
}
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/binlog-filter/filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ func TestFilter(t *testing.T) {
{"Test_1_*", "abc*", []EventType{DeleteEvent, InsertEvent, CreateIndex, DropIndex, DropView}, []string{"^DROP\\s+PROCEDURE", "^CREATE\\s+PROCEDURE"}, nil, Ignore},
{"xxx_*", "abc_*", []EventType{AllDML, NoneDDL}, nil, nil, Ignore},
{"yyy_*", "abc_*", []EventType{EventType("ALL DML")}, nil, nil, Do},
{"Test_1_*", "abc*", []EventType{"wrong event"}, []string{"^DROP\\s+PROCEDURE", "^CREATE\\s+PROCEDURE"}, nil, Ignore},
{"cdc", "t1", []EventType{RebaseAutoID}, nil, nil, Ignore},
// {"Test_1_*", "abc*", []EventType{"wrong event"}, []string{"^DROP\\s+PROCEDURE", "^CREATE\\s+PROCEDURE"}, nil, Ignore},
}

cases := []struct {
Expand Down
21 changes: 0 additions & 21 deletions pkg/chann/LICENSE

This file was deleted.

Loading