From f88bc159ddf45718a294c8516188853ad34eb56c Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Thu, 30 Oct 2025 16:46:47 +0800 Subject: [PATCH 1/6] remove error from the message handler --- coordinator/coordinator.go | 8 ++---- coordinator/coordinator_test.go | 6 ++-- .../dispatchermanager/heartbeat_collector.go | 3 +- .../dispatcher_orchestrator.go | 6 ++-- .../eventcollector/event_collector.go | 10 +++---- .../eventcollector/log_coordinator_client.go | 3 +- logservice/coordinator/coordinator.go | 3 +- logservice/eventstore/event_store.go | 3 +- maintainer/maintainer_manager.go | 28 +++++++++---------- maintainer/maintainer_manager_test.go | 12 ++------ maintainer/maintainer_test.go | 10 +++---- pkg/eventservice/event_service.go | 7 ++--- .../message_center_integration_test.go | 6 ++-- pkg/messaging/router.go | 2 +- 14 files changed, 42 insertions(+), 65 deletions(-) diff --git a/coordinator/coordinator.go b/coordinator/coordinator.go index 9bcd55265e..23447d1e0e 100644 --- a/coordinator/coordinator.go +++ b/coordinator/coordinator.go @@ -145,9 +145,9 @@ func New(node *node.Info, return c } -func (c *coordinator) recvMessages(ctx context.Context, msg *messaging.TargetMessage) error { +func (c *coordinator) recvMessages(ctx context.Context, msg *messaging.TargetMessage) { if c.closed.Load() { - return nil + return } defer func() { @@ -168,12 +168,10 @@ func (c *coordinator) recvMessages(ctx context.Context, msg *messaging.TargetMes select { case <-ctx.Done(): - return ctx.Err() + return default: c.eventCh.In() <- &Event{message: msg} } - - return nil } // Run spawns two goroutines to handle messages and run the coordinator. diff --git a/coordinator/coordinator_test.go b/coordinator/coordinator_test.go index 809364eb1d..73cd557187 100644 --- a/coordinator/coordinator_test.go +++ b/coordinator/coordinator_test.go @@ -125,7 +125,7 @@ func (m *mockMaintainerManager) sendMessages(msg *heartbeatpb.MaintainerHeartbea } } -func (m *mockMaintainerManager) recvMessages(ctx context.Context, msg *messaging.TargetMessage) error { +func (m *mockMaintainerManager) recvMessages(ctx context.Context, msg *messaging.TargetMessage) { switch msg.Type { // receive message from coordinator case messaging.TypeAddMaintainerRequest, messaging.TypeRemoveMaintainerRequest: @@ -133,14 +133,12 @@ func (m *mockMaintainerManager) recvMessages(ctx context.Context, msg *messaging case messaging.TypeCoordinatorBootstrapRequest: select { case <-ctx.Done(): - return ctx.Err() + return case m.msgCh <- msg: } - return nil default: log.Panic("unknown message type", zap.Any("message", msg.Message)) } - return nil } func (m *mockMaintainerManager) onCoordinatorBootstrapRequest(msg *messaging.TargetMessage) { diff --git a/downstreamadapter/dispatchermanager/heartbeat_collector.go b/downstreamadapter/dispatchermanager/heartbeat_collector.go index 8ffd9c9eb5..0c7ce12081 100644 --- a/downstreamadapter/dispatchermanager/heartbeat_collector.go +++ b/downstreamadapter/dispatchermanager/heartbeat_collector.go @@ -239,7 +239,7 @@ func (c *HeartBeatCollector) sendBlockStatusMessages(ctx context.Context) error } } -func (c *HeartBeatCollector) RecvMessages(_ context.Context, msg *messaging.TargetMessage) error { +func (c *HeartBeatCollector) RecvMessages(_ context.Context, msg *messaging.TargetMessage) { switch msg.Type { case messaging.TypeHeartBeatResponse: // TODO: Change a more appropriate name for HeartBeatResponse. It should be BlockStatusResponse or something else. @@ -272,7 +272,6 @@ func (c *HeartBeatCollector) RecvMessages(_ context.Context, msg *messaging.Targ default: log.Panic("unknown message type", zap.Any("message", msg.Message)) } - return nil } func (c *HeartBeatCollector) Close() { diff --git a/downstreamadapter/dispatcherorchestrator/dispatcher_orchestrator.go b/downstreamadapter/dispatcherorchestrator/dispatcher_orchestrator.go index 8298b0768d..faf1715ea2 100644 --- a/downstreamadapter/dispatcherorchestrator/dispatcher_orchestrator.go +++ b/downstreamadapter/dispatcherorchestrator/dispatcher_orchestrator.go @@ -78,15 +78,15 @@ func (m *DispatcherOrchestrator) Run(ctx context.Context) { func (m *DispatcherOrchestrator) RecvMaintainerRequest( _ context.Context, msg *messaging.TargetMessage, -) error { +) { // Put message into channel for asynchronous processing by another goroutine select { case m.msgChan <- msg: - return nil + return default: // Channel is full, log warning and drop the message log.Warn("message channel is full, dropping message", zap.Any("message", msg.Message)) - return nil + return } } diff --git a/downstreamadapter/eventcollector/event_collector.go b/downstreamadapter/eventcollector/event_collector.go index ebad636219..8378370de6 100644 --- a/downstreamadapter/eventcollector/event_collector.go +++ b/downstreamadapter/eventcollector/event_collector.go @@ -449,7 +449,7 @@ func (c *EventCollector) handleDispatcherHeartbeatResponse(targetMessage *messag } // MessageCenterHandler is the handler for the events message from EventService. -func (c *EventCollector) MessageCenterHandler(_ context.Context, targetMessage *messaging.TargetMessage) error { +func (c *EventCollector) MessageCenterHandler(_ context.Context, targetMessage *messaging.TargetMessage) { inflightDuration := time.Since(time.UnixMilli(targetMessage.CreateAt)).Seconds() c.metricReceiveEventLagDuration.Observe(inflightDuration) @@ -462,7 +462,7 @@ func (c *EventCollector) MessageCenterHandler(_ context.Context, targetMessage * // corresponding channel to handle it in multi-thread. if targetMessage.Type.IsLogServiceEvent() { c.receiveChannels[targetMessage.GetGroup()%uint64(len(c.receiveChannels))] <- targetMessage - return nil + return } for _, msg := range targetMessage.Message { @@ -473,19 +473,17 @@ func (c *EventCollector) MessageCenterHandler(_ context.Context, targetMessage * log.Panic("invalid message type", zap.Any("msg", msg)) } } - return nil } // RedoMessageCenterHandler is the handler for the redo events message from EventService. -func (c *EventCollector) RedoMessageCenterHandler(_ context.Context, targetMessage *messaging.TargetMessage) error { +func (c *EventCollector) RedoMessageCenterHandler(_ context.Context, targetMessage *messaging.TargetMessage) { // If the message is a log service event, we need to forward it to the // corresponding channel to handle it in multi-thread. if targetMessage.Type.IsLogServiceEvent() { c.redoReceiveChannels[targetMessage.GetGroup()%uint64(len(c.redoReceiveChannels))] <- targetMessage - return nil + return } log.Panic("invalid message type", zap.Any("msg", targetMessage)) - return nil } // runDispatchMessage dispatches messages from the input channel to the dynamic stream. diff --git a/downstreamadapter/eventcollector/log_coordinator_client.go b/downstreamadapter/eventcollector/log_coordinator_client.go index ae6a80f002..81e97b0200 100644 --- a/downstreamadapter/eventcollector/log_coordinator_client.go +++ b/downstreamadapter/eventcollector/log_coordinator_client.go @@ -54,7 +54,7 @@ func newLogCoordinatorClient(eventCollector *EventCollector) *LogCoordinatorClie return client } -func (l *LogCoordinatorClient) MessageCenterHandler(_ context.Context, targetMessage *messaging.TargetMessage) error { +func (l *LogCoordinatorClient) MessageCenterHandler(_ context.Context, targetMessage *messaging.TargetMessage) { for _, msg := range targetMessage.Message { switch msg := msg.(type) { case *common.LogCoordinatorBroadcastRequest: @@ -69,7 +69,6 @@ func (l *LogCoordinatorClient) MessageCenterHandler(_ context.Context, targetMes log.Panic("invalid message type", zap.Any("msg", msg)) } } - return nil } func (l *LogCoordinatorClient) run(ctx context.Context) error { diff --git a/logservice/coordinator/coordinator.go b/logservice/coordinator/coordinator.go index cf5b8a66e2..7415ab5d79 100644 --- a/logservice/coordinator/coordinator.go +++ b/logservice/coordinator/coordinator.go @@ -149,7 +149,7 @@ func (c *logCoordinator) Run(ctx context.Context) error { } } -func (c *logCoordinator) handleMessage(_ context.Context, targetMessage *messaging.TargetMessage) error { +func (c *logCoordinator) handleMessage(_ context.Context, targetMessage *messaging.TargetMessage) { for _, msg := range targetMessage.Message { switch msg := msg.(type) { case *logservicepb.EventStoreState: @@ -167,7 +167,6 @@ func (c *logCoordinator) handleMessage(_ context.Context, targetMessage *messagi log.Panic("invalid message type", zap.Any("msg", msg)) } } - return nil } func (c *logCoordinator) sendResolvedTsToCoordinator(id node.ID, changefeedID common.ChangeFeedID) { diff --git a/logservice/eventstore/event_store.go b/logservice/eventstore/event_store.go index 529ae666b6..f41ab41954 100644 --- a/logservice/eventstore/event_store.go +++ b/logservice/eventstore/event_store.go @@ -1219,7 +1219,7 @@ func (iter *eventStoreIter) Close() (int64, error) { return iter.rowCount, err } -func (e *eventStore) handleMessage(_ context.Context, targetMessage *messaging.TargetMessage) error { +func (e *eventStore) handleMessage(_ context.Context, targetMessage *messaging.TargetMessage) { for _, msg := range targetMessage.Message { switch msg.(type) { case *common.LogCoordinatorBroadcastRequest: @@ -1228,7 +1228,6 @@ func (e *eventStore) handleMessage(_ context.Context, targetMessage *messaging.T log.Panic("invalid message type", zap.Any("msg", msg)) } } - return nil } type SubscriptionChangeType int diff --git a/maintainer/maintainer_manager.go b/maintainer/maintainer_manager.go index b76370cc5e..9377a7206c 100644 --- a/maintainer/maintainer_manager.go +++ b/maintainer/maintainer_manager.go @@ -72,15 +72,15 @@ func NewMaintainerManager( mc.RegisterHandler(messaging.MaintainerManagerTopic, m.recvMessages) mc.RegisterHandler(messaging.MaintainerTopic, - func(ctx context.Context, msg *messaging.TargetMessage) error { + func(ctx context.Context, msg *messaging.TargetMessage) { req := msg.Message[0].(*heartbeatpb.MaintainerCloseResponse) - return m.dispatcherMaintainerMessage(ctx, common.NewChangefeedIDFromPB(req.ChangefeedID), msg) + m.dispatcherMaintainerMessage(ctx, common.NewChangefeedIDFromPB(req.ChangefeedID), msg) }) return m } // recvMessages is the message handler for maintainer manager -func (m *Manager) recvMessages(ctx context.Context, msg *messaging.TargetMessage) error { +func (m *Manager) recvMessages(ctx context.Context, msg *messaging.TargetMessage) { switch msg.Type { // Coordinator related messages case messaging.TypeAddMaintainerRequest, @@ -88,31 +88,30 @@ func (m *Manager) recvMessages(ctx context.Context, msg *messaging.TargetMessage messaging.TypeCoordinatorBootstrapRequest: select { case <-ctx.Done(): - return ctx.Err() + return case m.msgCh <- msg: } - return nil + return // receive bootstrap response message from the dispatcher manager case messaging.TypeMaintainerBootstrapResponse: req := msg.Message[0].(*heartbeatpb.MaintainerBootstrapResponse) - return m.dispatcherMaintainerMessage(ctx, common.NewChangefeedIDFromPB(req.ChangefeedID), msg) + m.dispatcherMaintainerMessage(ctx, common.NewChangefeedIDFromPB(req.ChangefeedID), msg) case messaging.TypeMaintainerPostBootstrapResponse: req := msg.Message[0].(*heartbeatpb.MaintainerPostBootstrapResponse) - return m.dispatcherMaintainerMessage(ctx, common.NewChangefeedIDFromPB(req.ChangefeedID), msg) + m.dispatcherMaintainerMessage(ctx, common.NewChangefeedIDFromPB(req.ChangefeedID), msg) // receive heartbeat message from dispatchers case messaging.TypeHeartBeatRequest: req := msg.Message[0].(*heartbeatpb.HeartBeatRequest) - return m.dispatcherMaintainerMessage(ctx, common.NewChangefeedIDFromPB(req.ChangefeedID), msg) + m.dispatcherMaintainerMessage(ctx, common.NewChangefeedIDFromPB(req.ChangefeedID), msg) case messaging.TypeBlockStatusRequest: req := msg.Message[0].(*heartbeatpb.BlockStatusRequest) - return m.dispatcherMaintainerMessage(ctx, common.NewChangefeedIDFromPB(req.ChangefeedID), msg) + m.dispatcherMaintainerMessage(ctx, common.NewChangefeedIDFromPB(req.ChangefeedID), msg) case messaging.TypeCheckpointTsMessage: req := msg.Message[0].(*heartbeatpb.CheckpointTsMessage) - return m.dispatcherMaintainerMessage(ctx, common.NewChangefeedIDFromPB(req.ChangefeedID), msg) + m.dispatcherMaintainerMessage(ctx, common.NewChangefeedIDFromPB(req.ChangefeedID), msg) default: log.Panic("unknown message type", zap.Any("message", msg.Message)) } - return nil } func (m *Manager) Name() string { @@ -348,17 +347,17 @@ func (m *Manager) handleMessage(msg *messaging.TargetMessage) { func (m *Manager) dispatcherMaintainerMessage( ctx context.Context, changefeed common.ChangeFeedID, msg *messaging.TargetMessage, -) error { +) { c, ok := m.maintainers.Load(changefeed) if !ok { log.Warn("maintainer is not found", zap.Stringer("changefeed", changefeed), zap.String("message", msg.String())) - return nil + return } select { case <-ctx.Done(): - return ctx.Err() + return default: maintainer := c.(*Maintainer) maintainer.pushEvent(&Event{ @@ -367,7 +366,6 @@ func (m *Manager) dispatcherMaintainerMessage( message: msg, }) } - return nil } func (m *Manager) GetMaintainerForChangefeed(changefeedID common.ChangeFeedID) (*Maintainer, bool) { diff --git a/maintainer/maintainer_manager_test.go b/maintainer/maintainer_manager_test.go index c600ce34be..bdd7973104 100644 --- a/maintainer/maintainer_manager_test.go +++ b/maintainer/maintainer_manager_test.go @@ -90,9 +90,7 @@ func TestMaintainerSchedulesNodeChanges(t *testing.T) { startDispatcherNode(t, ctx, selfNode, mc, nodeManager) nodeManager.RegisterNodeChangeHandler(appcontext.MessageCenter, mc.OnNodeChanges) // Discard maintainer manager messages, cuz we don't need to handle them in this test - mc.RegisterHandler(messaging.CoordinatorTopic, func(ctx context.Context, msg *messaging.TargetMessage) error { - return nil - }) + mc.RegisterHandler(messaging.CoordinatorTopic, func(ctx context.Context, msg *messaging.TargetMessage) {}) schedulerConf := &config.SchedulerConfig{ AddTableBatchSize: 1000, CheckBalanceInterval: 0, @@ -316,9 +314,7 @@ func TestMaintainerBootstrapWithTablesReported(t *testing.T) { startDispatcherNode(t, ctx, selfNode, mc, nodeManager) nodeManager.RegisterNodeChangeHandler(appcontext.MessageCenter, mc.OnNodeChanges) // discard maintainer manager messages - mc.RegisterHandler(messaging.CoordinatorTopic, func(ctx context.Context, msg *messaging.TargetMessage) error { - return nil - }) + mc.RegisterHandler(messaging.CoordinatorTopic, func(ctx context.Context, msg *messaging.TargetMessage) {}) manager := NewMaintainerManager(selfNode, config.GetGlobalServerConfig().Debug.Scheduler) msg := messaging.NewSingleTargetMessage(selfNode.ID, messaging.MaintainerManagerTopic, @@ -452,9 +448,7 @@ func TestStopNotExistsMaintainer(t *testing.T) { startDispatcherNode(t, ctx, selfNode, mc, nodeManager) nodeManager.RegisterNodeChangeHandler(appcontext.MessageCenter, mc.OnNodeChanges) // discard maintainer manager messages - mc.RegisterHandler(messaging.CoordinatorTopic, func(ctx context.Context, msg *messaging.TargetMessage) error { - return nil - }) + mc.RegisterHandler(messaging.CoordinatorTopic, func(ctx context.Context, msg *messaging.TargetMessage) {}) schedulerConf := &config.SchedulerConfig{AddTableBatchSize: 1000} manager := NewMaintainerManager(selfNode, schedulerConf) msg := messaging.NewSingleTargetMessage(selfNode.ID, diff --git a/maintainer/maintainer_test.go b/maintainer/maintainer_test.go index 58f6cf9288..f4494419a1 100644 --- a/maintainer/maintainer_test.go +++ b/maintainer/maintainer_test.go @@ -108,7 +108,7 @@ func (m *mockDispatcherManager) sendMessages(msg *heartbeatpb.HeartBeatRequest) } } -func (m *mockDispatcherManager) recvMessages(ctx context.Context, msg *messaging.TargetMessage) error { +func (m *mockDispatcherManager) recvMessages(ctx context.Context, msg *messaging.TargetMessage) { switch msg.Type { // receive message from maintainer case messaging.TypeScheduleDispatcherRequest, @@ -117,14 +117,12 @@ func (m *mockDispatcherManager) recvMessages(ctx context.Context, msg *messaging messaging.TypeMaintainerCloseRequest: select { case <-ctx.Done(): - return ctx.Err() + return case m.msgCh <- msg: } - return nil default: log.Panic("unknown message type", zap.Any("message", msg.Message), zap.Any("type", msg.Type)) } - return nil } func (m *mockDispatcherManager) onBootstrapRequest(msg *messaging.TargetMessage) { @@ -332,13 +330,13 @@ func TestMaintainerSchedule(t *testing.T) { }, n, taskScheduler, 10, true, common.DefaultKeyspaceID) mc.RegisterHandler(messaging.MaintainerManagerTopic, - func(ctx context.Context, msg *messaging.TargetMessage) error { + func(ctx context.Context, msg *messaging.TargetMessage) { maintainer.eventCh.In() <- &Event{ changefeedID: cfID, eventType: EventMessage, message: msg, } - return nil + return }) // send bootstrap message diff --git a/pkg/eventservice/event_service.go b/pkg/eventservice/event_service.go index ec78b6f7d4..35cb00322a 100644 --- a/pkg/eventservice/event_service.go +++ b/pkg/eventservice/event_service.go @@ -140,14 +140,14 @@ func (s *eventService) Close(_ context.Context) error { return nil } -func (s *eventService) handleMessage(ctx context.Context, msg *messaging.TargetMessage) error { +func (s *eventService) handleMessage(ctx context.Context, msg *messaging.TargetMessage) { switch msg.Type { case messaging.TypeDispatcherRequest: infos := msgToDispatcherInfo(msg) for _, info := range infos { select { case <-ctx.Done(): - return ctx.Err() + return case s.dispatcherInfoChan <- info: } } @@ -158,7 +158,7 @@ func (s *eventService) handleMessage(ctx context.Context, msg *messaging.TargetM heartbeat := msg.Message[0].(*event.DispatcherHeartbeat) select { case <-ctx.Done(): - return ctx.Err() + return case s.dispatcherHeartbeat <- &DispatcherHeartBeatWithServerID{ serverID: msg.From.String(), heartbeat: heartbeat, @@ -173,7 +173,6 @@ func (s *eventService) handleMessage(ctx context.Context, msg *messaging.TargetM default: log.Panic("unknown message type", zap.String("type", msg.Type.String()), zap.Any("message", msg)) } - return nil } func (s *eventService) registerDispatcher(ctx context.Context, info DispatcherInfo) { diff --git a/pkg/messaging/message_center_integration_test.go b/pkg/messaging/message_center_integration_test.go index 1a2b833352..04c7a53bba 100644 --- a/pkg/messaging/message_center_integration_test.go +++ b/pkg/messaging/message_center_integration_test.go @@ -52,10 +52,9 @@ func setupMessageCenters(t *testing.T) (*messageCenter, *messageCenter, *message func registerHandler(mc *messageCenter, topic string) chan *TargetMessage { ch := make(chan *TargetMessage, 1) - mc.RegisterHandler(topic, func(ctx context.Context, msg *TargetMessage) error { + mc.RegisterHandler(topic, func(ctx context.Context, msg *TargetMessage) { ch <- msg log.Info(fmt.Sprintf("%s received message", mc.id), zap.Any("msg", msg)) - return nil }) return ch } @@ -83,9 +82,8 @@ func waitForTargetsReady(mc *messageCenter) { func sendAndReceiveMessage(t *testing.T, sender *messageCenter, receiver *messageCenter, topic string, event *commonEvent.BatchDMLEvent) { targetMsg := NewSingleTargetMessage(receiver.id, topic, event) ch := make(chan *TargetMessage, 1) - receiver.RegisterHandler(topic, func(ctx context.Context, msg *TargetMessage) error { + receiver.RegisterHandler(topic, func(ctx context.Context, msg *TargetMessage) { ch <- msg - return nil }) timeoutCh := time.After(30 * time.Second) diff --git a/pkg/messaging/router.go b/pkg/messaging/router.go index b5c0a46d23..9c5ba361a9 100644 --- a/pkg/messaging/router.go +++ b/pkg/messaging/router.go @@ -23,7 +23,7 @@ import ( "go.uber.org/zap" ) -type MessageHandler func(ctx context.Context, msg *TargetMessage) error +type MessageHandler func(ctx context.Context, msg *TargetMessage) type router struct { mu sync.RWMutex From b2ad219224d0af5f1cfb64daadb8cae16ce33963 Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Thu, 30 Oct 2025 17:03:50 +0800 Subject: [PATCH 2/6] unify the handler name --- coordinator/coordinator.go | 4 ++-- coordinator/coordinator_test.go | 13 ++++--------- .../dispatchermanager/heartbeat_collector.go | 4 ++-- .../dispatcher_orchestrator.go | 6 +++--- downstreamadapter/eventcollector/event_collector.go | 12 ++++++------ .../eventcollector/log_coordinator_client.go | 4 ++-- logservice/coordinator/coordinator.go | 4 ++-- logservice/eventstore/event_store.go | 4 ++-- maintainer/maintainer_manager.go | 6 +++--- maintainer/maintainer_test.go | 7 +++---- pkg/eventservice/event_service.go | 4 ++-- pkg/messaging/router.go | 5 +---- 12 files changed, 32 insertions(+), 41 deletions(-) diff --git a/coordinator/coordinator.go b/coordinator/coordinator.go index 23447d1e0e..1b44fdc992 100644 --- a/coordinator/coordinator.go +++ b/coordinator/coordinator.go @@ -118,7 +118,7 @@ func New(node *node.Info, backend: backend, } // handle messages from message center - mc.RegisterHandler(messaging.CoordinatorTopic, c.recvMessages) + mc.RegisterHandler(messaging.CoordinatorTopic, c.recvMessage) c.controller = NewController( c.version, c.nodeInfo, @@ -145,7 +145,7 @@ func New(node *node.Info, return c } -func (c *coordinator) recvMessages(ctx context.Context, msg *messaging.TargetMessage) { +func (c *coordinator) recvMessage(ctx context.Context, msg *messaging.TargetMessage) { if c.closed.Load() { return } diff --git a/coordinator/coordinator_test.go b/coordinator/coordinator_test.go index 73cd557187..1776ff7588 100644 --- a/coordinator/coordinator_test.go +++ b/coordinator/coordinator_test.go @@ -73,7 +73,7 @@ func NewMaintainerManager(mc messaging.MessageCenter) *mockMaintainerManager { maintainerMap: make(map[common.ChangeFeedID]*heartbeatpb.MaintainerStatus, 1000000), msgCh: make(chan *messaging.TargetMessage, 1024), } - mc.RegisterHandler(messaging.MaintainerManagerTopic, m.recvMessages) + mc.RegisterHandler(messaging.MaintainerManagerTopic, m.recvMessage) return m } @@ -125,7 +125,7 @@ func (m *mockMaintainerManager) sendMessages(msg *heartbeatpb.MaintainerHeartbea } } -func (m *mockMaintainerManager) recvMessages(ctx context.Context, msg *messaging.TargetMessage) { +func (m *mockMaintainerManager) recvMessage(ctx context.Context, msg *messaging.TargetMessage) { switch msg.Type { // receive message from coordinator case messaging.TypeAddMaintainerRequest, messaging.TypeRemoveMaintainerRequest: @@ -576,11 +576,7 @@ func TestConcurrentStopAndSendEvents(t *testing.T) { } // Use recvMessages to send event to channel - err := co.recvMessages(ctx, msg) - if err != nil && err != context.Canceled { - t.Logf("Failed to send event in goroutine %d: %v", id, err) - } - + co.recvMessage(ctx, msg) // Small sleep to increase chance of race conditions time.Sleep(time.Millisecond) } @@ -623,8 +619,7 @@ func TestConcurrentStopAndSendEvents(t *testing.T) { Type: messaging.TypeMaintainerHeartbeatRequest, } - err := co.recvMessages(ctx, msg) - require.NoError(t, err) + co.recvMessage(ctx, msg) require.True(t, co.closed.Load()) } diff --git a/downstreamadapter/dispatchermanager/heartbeat_collector.go b/downstreamadapter/dispatchermanager/heartbeat_collector.go index 0c7ce12081..a95c572eed 100644 --- a/downstreamadapter/dispatchermanager/heartbeat_collector.go +++ b/downstreamadapter/dispatchermanager/heartbeat_collector.go @@ -78,7 +78,7 @@ func NewHeartBeatCollector(serverId node.ID) *HeartBeatCollector { mergeDispatcherRequestDynamicStream: newMergeDispatcherRequestDynamicStream(), mc: appcontext.GetService[messaging.MessageCenter](appcontext.MessageCenter), } - heartBeatCollector.mc.RegisterHandler(messaging.HeartbeatCollectorTopic, heartBeatCollector.RecvMessages) + heartBeatCollector.mc.RegisterHandler(messaging.HeartbeatCollectorTopic, heartBeatCollector.recvMessage) return &heartBeatCollector } @@ -239,7 +239,7 @@ func (c *HeartBeatCollector) sendBlockStatusMessages(ctx context.Context) error } } -func (c *HeartBeatCollector) RecvMessages(_ context.Context, msg *messaging.TargetMessage) { +func (c *HeartBeatCollector) recvMessage(_ context.Context, msg *messaging.TargetMessage) { switch msg.Type { case messaging.TypeHeartBeatResponse: // TODO: Change a more appropriate name for HeartBeatResponse. It should be BlockStatusResponse or something else. diff --git a/downstreamadapter/dispatcherorchestrator/dispatcher_orchestrator.go b/downstreamadapter/dispatcherorchestrator/dispatcher_orchestrator.go index faf1715ea2..327d4952fc 100644 --- a/downstreamadapter/dispatcherorchestrator/dispatcher_orchestrator.go +++ b/downstreamadapter/dispatcherorchestrator/dispatcher_orchestrator.go @@ -52,7 +52,7 @@ func New() *DispatcherOrchestrator { dispatcherManagers: make(map[common.ChangeFeedID]*dispatchermanager.DispatcherManager), msgChan: make(chan *messaging.TargetMessage, 1024), // buffer size 1024 } - m.mc.RegisterHandler(messaging.DispatcherManagerManagerTopic, m.RecvMaintainerRequest) + m.mc.RegisterHandler(messaging.DispatcherManagerManagerTopic, m.recvMessage) return m } @@ -73,9 +73,9 @@ func (m *DispatcherOrchestrator) Run(ctx context.Context) { }() } -// RecvMaintainerRequest is the handler for the maintainer request message. +// recvMessage is the handler for the maintainer request message. // It puts the message into a channel for asynchronous processing to avoid blocking the message center. -func (m *DispatcherOrchestrator) RecvMaintainerRequest( +func (m *DispatcherOrchestrator) recvMessage( _ context.Context, msg *messaging.TargetMessage, ) { diff --git a/downstreamadapter/eventcollector/event_collector.go b/downstreamadapter/eventcollector/event_collector.go index 8378370de6..bb4cd0c803 100644 --- a/downstreamadapter/eventcollector/event_collector.go +++ b/downstreamadapter/eventcollector/event_collector.go @@ -137,8 +137,8 @@ func New(serverId node.ID) *EventCollector { eventCollector.logCoordinatorClient = newLogCoordinatorClient(eventCollector) eventCollector.ds = NewEventDynamicStream(eventCollector) eventCollector.redoDs = NewEventDynamicStream(eventCollector) - eventCollector.mc.RegisterHandler(messaging.EventCollectorTopic, eventCollector.MessageCenterHandler) - eventCollector.mc.RegisterHandler(messaging.RedoEventCollectorTopic, eventCollector.RedoMessageCenterHandler) + eventCollector.mc.RegisterHandler(messaging.EventCollectorTopic, eventCollector.recvMessage) + eventCollector.mc.RegisterHandler(messaging.RedoEventCollectorTopic, eventCollector.recvRedoMessage) return eventCollector } @@ -448,8 +448,8 @@ func (c *EventCollector) handleDispatcherHeartbeatResponse(targetMessage *messag } } -// MessageCenterHandler is the handler for the events message from EventService. -func (c *EventCollector) MessageCenterHandler(_ context.Context, targetMessage *messaging.TargetMessage) { +// recvMessage is the handler for the events message from EventService. +func (c *EventCollector) recvMessage(_ context.Context, targetMessage *messaging.TargetMessage) { inflightDuration := time.Since(time.UnixMilli(targetMessage.CreateAt)).Seconds() c.metricReceiveEventLagDuration.Observe(inflightDuration) @@ -475,8 +475,8 @@ func (c *EventCollector) MessageCenterHandler(_ context.Context, targetMessage * } } -// RedoMessageCenterHandler is the handler for the redo events message from EventService. -func (c *EventCollector) RedoMessageCenterHandler(_ context.Context, targetMessage *messaging.TargetMessage) { +// recvRedoMessage is the handler for the redo events message from EventService. +func (c *EventCollector) recvRedoMessage(_ context.Context, targetMessage *messaging.TargetMessage) { // If the message is a log service event, we need to forward it to the // corresponding channel to handle it in multi-thread. if targetMessage.Type.IsLogServiceEvent() { diff --git a/downstreamadapter/eventcollector/log_coordinator_client.go b/downstreamadapter/eventcollector/log_coordinator_client.go index 81e97b0200..9d825a14cf 100644 --- a/downstreamadapter/eventcollector/log_coordinator_client.go +++ b/downstreamadapter/eventcollector/log_coordinator_client.go @@ -50,11 +50,11 @@ func newLogCoordinatorClient(eventCollector *EventCollector) *LogCoordinatorClie logCoordinatorRequestChan: chann.NewAutoDrainChann[*logservicepb.ReusableEventServiceRequest](), enableRemoteEventService: config.GetGlobalServerConfig().Debug.EventService.EnableRemoteEventService, } - client.mc.RegisterHandler(logCoordinatorClientTopic, client.MessageCenterHandler) + client.mc.RegisterHandler(logCoordinatorClientTopic, client.recvMessage) return client } -func (l *LogCoordinatorClient) MessageCenterHandler(_ context.Context, targetMessage *messaging.TargetMessage) { +func (l *LogCoordinatorClient) recvMessage(_ context.Context, targetMessage *messaging.TargetMessage) { for _, msg := range targetMessage.Message { switch msg := msg.(type) { case *common.LogCoordinatorBroadcastRequest: diff --git a/logservice/coordinator/coordinator.go b/logservice/coordinator/coordinator.go index 7415ab5d79..6897307a42 100644 --- a/logservice/coordinator/coordinator.go +++ b/logservice/coordinator/coordinator.go @@ -97,7 +97,7 @@ func New() LogCoordinator { c.changefeedStates.m = make(map[common.GID]*changefeedState) // recv and handle messages - messageCenter.RegisterHandler(logCoordinatorTopic, c.handleMessage) + messageCenter.RegisterHandler(logCoordinatorTopic, c.recvMessage) // watch node changes nodeManager := appcontext.GetService[*watcher.NodeManager](watcher.NodeManagerName) nodes := nodeManager.GetAliveNodes() @@ -149,7 +149,7 @@ func (c *logCoordinator) Run(ctx context.Context) error { } } -func (c *logCoordinator) handleMessage(_ context.Context, targetMessage *messaging.TargetMessage) { +func (c *logCoordinator) recvMessage(_ context.Context, targetMessage *messaging.TargetMessage) { for _, msg := range targetMessage.Message { switch msg := msg.(type) { case *logservicepb.EventStoreState: diff --git a/logservice/eventstore/event_store.go b/logservice/eventstore/event_store.go index f41ab41954..5d63df73d2 100644 --- a/logservice/eventstore/event_store.go +++ b/logservice/eventstore/event_store.go @@ -280,7 +280,7 @@ func New( store.dispatcherMeta.dispatcherStats = make(map[common.DispatcherID]*dispatcherStat) store.dispatcherMeta.tableStats = make(map[int64]subscriptionStats) - store.messageCenter.RegisterHandler(messaging.EventStoreTopic, store.handleMessage) + store.messageCenter.RegisterHandler(messaging.EventStoreTopic, store.recvMessage) return store } @@ -1219,7 +1219,7 @@ func (iter *eventStoreIter) Close() (int64, error) { return iter.rowCount, err } -func (e *eventStore) handleMessage(_ context.Context, targetMessage *messaging.TargetMessage) { +func (e *eventStore) recvMessage(_ context.Context, targetMessage *messaging.TargetMessage) { for _, msg := range targetMessage.Message { switch msg.(type) { case *common.LogCoordinatorBroadcastRequest: diff --git a/maintainer/maintainer_manager.go b/maintainer/maintainer_manager.go index 9377a7206c..a46fc87482 100644 --- a/maintainer/maintainer_manager.go +++ b/maintainer/maintainer_manager.go @@ -70,7 +70,7 @@ func NewMaintainerManager( taskScheduler: threadpool.NewThreadPoolDefault(), } - mc.RegisterHandler(messaging.MaintainerManagerTopic, m.recvMessages) + mc.RegisterHandler(messaging.MaintainerManagerTopic, m.recvMessage) mc.RegisterHandler(messaging.MaintainerTopic, func(ctx context.Context, msg *messaging.TargetMessage) { req := msg.Message[0].(*heartbeatpb.MaintainerCloseResponse) @@ -79,8 +79,8 @@ func NewMaintainerManager( return m } -// recvMessages is the message handler for maintainer manager -func (m *Manager) recvMessages(ctx context.Context, msg *messaging.TargetMessage) { +// recvMessage is the message handler for maintainer manager +func (m *Manager) recvMessage(ctx context.Context, msg *messaging.TargetMessage) { switch msg.Type { // Coordinator related messages case messaging.TypeAddMaintainerRequest, diff --git a/maintainer/maintainer_test.go b/maintainer/maintainer_test.go index f4494419a1..69dc443e5d 100644 --- a/maintainer/maintainer_test.go +++ b/maintainer/maintainer_test.go @@ -62,8 +62,8 @@ func MockDispatcherManager(mc messaging.MessageCenter, self node.ID) *mockDispat dispatchersMap: make(map[heartbeatpb.DispatcherID]*heartbeatpb.TableSpanStatus, 2000001), self: self, } - mc.RegisterHandler(messaging.DispatcherManagerManagerTopic, m.recvMessages) - mc.RegisterHandler(messaging.HeartbeatCollectorTopic, m.recvMessages) + mc.RegisterHandler(messaging.DispatcherManagerManagerTopic, m.recvMessage) + mc.RegisterHandler(messaging.HeartbeatCollectorTopic, m.recvMessage) return m } @@ -108,7 +108,7 @@ func (m *mockDispatcherManager) sendMessages(msg *heartbeatpb.HeartBeatRequest) } } -func (m *mockDispatcherManager) recvMessages(ctx context.Context, msg *messaging.TargetMessage) { +func (m *mockDispatcherManager) recvMessage(ctx context.Context, msg *messaging.TargetMessage) { switch msg.Type { // receive message from maintainer case messaging.TypeScheduleDispatcherRequest, @@ -336,7 +336,6 @@ func TestMaintainerSchedule(t *testing.T) { eventType: EventMessage, message: msg, } - return }) // send bootstrap message diff --git a/pkg/eventservice/event_service.go b/pkg/eventservice/event_service.go index 35cb00322a..a43e70bab2 100644 --- a/pkg/eventservice/event_service.go +++ b/pkg/eventservice/event_service.go @@ -89,7 +89,7 @@ func New(eventStore eventstore.EventStore, schemaStore schemastore.SchemaStore) dispatcherInfoChan: make(chan DispatcherInfo, 32), dispatcherHeartbeat: make(chan *DispatcherHeartBeatWithServerID, 32), } - es.mc.RegisterHandler(messaging.EventServiceTopic, es.handleMessage) + es.mc.RegisterHandler(messaging.EventServiceTopic, es.recvMessage) return es } @@ -140,7 +140,7 @@ func (s *eventService) Close(_ context.Context) error { return nil } -func (s *eventService) handleMessage(ctx context.Context, msg *messaging.TargetMessage) { +func (s *eventService) recvMessage(ctx context.Context, msg *messaging.TargetMessage) { switch msg.Type { case messaging.TypeDispatcherRequest: infos := msgToDispatcherInfo(msg) diff --git a/pkg/messaging/router.go b/pkg/messaging/router.go index 9c5ba361a9..796cbd5a71 100644 --- a/pkg/messaging/router.go +++ b/pkg/messaging/router.go @@ -64,7 +64,7 @@ func (r *router) runDispatch(ctx context.Context, out <-chan *TargetMessage) { continue } start := time.Now() - err := handler(ctx, msg) + handler(ctx, msg) now := time.Now() if now.Sub(start) > 100*time.Millisecond { // Rate limit logging: only log once every 10 seconds @@ -80,9 +80,6 @@ func (r *router) runDispatch(ctx context.Context, out <-chan *TargetMessage) { // Always increment metrics counter for slow message handling metrics.MessagingSlowHandleCounter.WithLabelValues(msg.Type.String()).Inc() } - if err != nil { - log.Error("Handle message failed", zap.Error(err), zap.Any("msg", msg)) - } } } } From f1289718858a7f83c54bda780403629a2fd9ce65 Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Thu, 30 Oct 2025 17:06:13 +0800 Subject: [PATCH 3/6] fix unit test --- pkg/messaging/router_test.go | 24 +++--------------------- 1 file changed, 3 insertions(+), 21 deletions(-) diff --git a/pkg/messaging/router_test.go b/pkg/messaging/router_test.go index 5adf9fc7b7..e9ba420cb2 100644 --- a/pkg/messaging/router_test.go +++ b/pkg/messaging/router_test.go @@ -15,7 +15,6 @@ package messaging import ( "context" - "errors" "fmt" "sync" "testing" @@ -28,7 +27,7 @@ func TestRegisterAndDeRegisterHandler(t *testing.T) { r := newRouter() testTopic := "test-topic" - handler := func(ctx context.Context, msg *TargetMessage) error { return nil } + handler := func(ctx context.Context, msg *TargetMessage) {} r.registerHandler(testTopic, handler) assert.Len(t, r.handlers, 1) @@ -60,9 +59,8 @@ func TestRunDispatchSuccessful(t *testing.T) { t.Parallel() handledMsg := make([]*TargetMessage, 0) r := newRouter() - r.registerHandler("topic1", func(ctx context.Context, msg *TargetMessage) error { + r.registerHandler("topic1", func(ctx context.Context, msg *TargetMessage) { handledMsg = append(handledMsg, msg) - return nil }) msgChan := make(chan *TargetMessage, 1) @@ -77,22 +75,6 @@ func TestRunDispatchSuccessful(t *testing.T) { assert.Equal(t, "topic1", handledMsg[0].Topic) } -func TestRunDispatchWithError(t *testing.T) { - t.Parallel() - r := newRouter() - r.registerHandler("topic1", func(ctx context.Context, msg *TargetMessage) error { - return errors.New("handler error") - }) - - msgChan := make(chan *TargetMessage, 1) - msgChan <- &TargetMessage{Topic: "topic1", Message: newMockIOTypeT([]byte("test"))} - - ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) - defer cancel() - - r.runDispatch(ctx, msgChan) -} - func TestRunDispatchNoHandler(t *testing.T) { t.Parallel() r := newRouter() @@ -115,7 +97,7 @@ func TestConcurrentAccess(t *testing.T) { go func() { defer wg.Done() - handler := func(ctx context.Context, msg *TargetMessage) error { return nil } + handler := func(ctx context.Context, msg *TargetMessage) {} r.registerHandler(topic, handler) }() From 940dab69ddbeef5da01b6bd2504b00a459f959fb Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Thu, 30 Oct 2025 17:15:45 +0800 Subject: [PATCH 4/6] fix code --- maintainer/maintainer_manager.go | 1 - 1 file changed, 1 deletion(-) diff --git a/maintainer/maintainer_manager.go b/maintainer/maintainer_manager.go index a46fc87482..522cb49800 100644 --- a/maintainer/maintainer_manager.go +++ b/maintainer/maintainer_manager.go @@ -91,7 +91,6 @@ func (m *Manager) recvMessage(ctx context.Context, msg *messaging.TargetMessage) return case m.msgCh <- msg: } - return // receive bootstrap response message from the dispatcher manager case messaging.TypeMaintainerBootstrapResponse: req := msg.Message[0].(*heartbeatpb.MaintainerBootstrapResponse) From 41d07244df12c264ae0c4b6e979e9e2f359aa683 Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Thu, 30 Oct 2025 17:55:36 +0800 Subject: [PATCH 5/6] remove unused code --- pkg/messaging/remote_target.go | 7 ------- 1 file changed, 7 deletions(-) diff --git a/pkg/messaging/remote_target.go b/pkg/messaging/remote_target.go index 6794fd2af8..5bdcf402ff 100644 --- a/pkg/messaging/remote_target.go +++ b/pkg/messaging/remote_target.go @@ -36,15 +36,8 @@ import ( ) const ( - reconnectInterval = 2 * time.Second streamTypeEvent = "event" streamTypeCommand = "command" - - eventRecvCh = "eventRecvCh" - commandRecvCh = "commandRecvCh" - - eventSendCh = "eventSendCh" - commandSendCh = "commandSendCh" ) type streamSession struct { From 770d70ba30ad0dd4060b547f22f9c853ae67ba0e Mon Sep 17 00:00:00 2001 From: 3AceShowHand Date: Thu, 30 Oct 2025 18:13:24 +0800 Subject: [PATCH 6/6] remove unused code --- pkg/messaging/stream.go | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/pkg/messaging/stream.go b/pkg/messaging/stream.go index 17c3771d78..cddbd55e56 100644 --- a/pkg/messaging/stream.go +++ b/pkg/messaging/stream.go @@ -14,8 +14,6 @@ package messaging import ( - "sync/atomic" - "github.com/pingcap/ticdc/pkg/messaging/proto" ) @@ -27,23 +25,12 @@ type grpcStream interface { Recv() (*proto.Message, error) } -var streamGenerator atomic.Uint64 - type streamWrapper struct { grpcStream id uint64 streamType string } -// newStreamWrapper creates a new stream wrapper. -func newStreamWrapper(stream grpcStream, streamType string) *streamWrapper { - return &streamWrapper{ - grpcStream: stream, - id: streamGenerator.Add(1), - streamType: streamType, - } -} - func (s *streamWrapper) Send(msg *proto.Message) error { return s.grpcStream.Send(msg) }