diff --git a/coordinator/coordinator.go b/coordinator/coordinator.go index 9bcd55265e..1b44fdc992 100644 --- a/coordinator/coordinator.go +++ b/coordinator/coordinator.go @@ -118,7 +118,7 @@ func New(node *node.Info, backend: backend, } // handle messages from message center - mc.RegisterHandler(messaging.CoordinatorTopic, c.recvMessages) + mc.RegisterHandler(messaging.CoordinatorTopic, c.recvMessage) c.controller = NewController( c.version, c.nodeInfo, @@ -145,9 +145,9 @@ func New(node *node.Info, return c } -func (c *coordinator) recvMessages(ctx context.Context, msg *messaging.TargetMessage) error { +func (c *coordinator) recvMessage(ctx context.Context, msg *messaging.TargetMessage) { if c.closed.Load() { - return nil + return } defer func() { @@ -168,12 +168,10 @@ func (c *coordinator) recvMessages(ctx context.Context, msg *messaging.TargetMes select { case <-ctx.Done(): - return ctx.Err() + return default: c.eventCh.In() <- &Event{message: msg} } - - return nil } // Run spawns two goroutines to handle messages and run the coordinator. diff --git a/coordinator/coordinator_test.go b/coordinator/coordinator_test.go index 809364eb1d..1776ff7588 100644 --- a/coordinator/coordinator_test.go +++ b/coordinator/coordinator_test.go @@ -73,7 +73,7 @@ func NewMaintainerManager(mc messaging.MessageCenter) *mockMaintainerManager { maintainerMap: make(map[common.ChangeFeedID]*heartbeatpb.MaintainerStatus, 1000000), msgCh: make(chan *messaging.TargetMessage, 1024), } - mc.RegisterHandler(messaging.MaintainerManagerTopic, m.recvMessages) + mc.RegisterHandler(messaging.MaintainerManagerTopic, m.recvMessage) return m } @@ -125,7 +125,7 @@ func (m *mockMaintainerManager) sendMessages(msg *heartbeatpb.MaintainerHeartbea } } -func (m *mockMaintainerManager) recvMessages(ctx context.Context, msg *messaging.TargetMessage) error { +func (m *mockMaintainerManager) recvMessage(ctx context.Context, msg *messaging.TargetMessage) { switch msg.Type { // receive message from coordinator case messaging.TypeAddMaintainerRequest, messaging.TypeRemoveMaintainerRequest: @@ -133,14 +133,12 @@ func (m *mockMaintainerManager) recvMessages(ctx context.Context, msg *messaging case messaging.TypeCoordinatorBootstrapRequest: select { case <-ctx.Done(): - return ctx.Err() + return case m.msgCh <- msg: } - return nil default: log.Panic("unknown message type", zap.Any("message", msg.Message)) } - return nil } func (m *mockMaintainerManager) onCoordinatorBootstrapRequest(msg *messaging.TargetMessage) { @@ -578,11 +576,7 @@ func TestConcurrentStopAndSendEvents(t *testing.T) { } // Use recvMessages to send event to channel - err := co.recvMessages(ctx, msg) - if err != nil && err != context.Canceled { - t.Logf("Failed to send event in goroutine %d: %v", id, err) - } - + co.recvMessage(ctx, msg) // Small sleep to increase chance of race conditions time.Sleep(time.Millisecond) } @@ -625,8 +619,7 @@ func TestConcurrentStopAndSendEvents(t *testing.T) { Type: messaging.TypeMaintainerHeartbeatRequest, } - err := co.recvMessages(ctx, msg) - require.NoError(t, err) + co.recvMessage(ctx, msg) require.True(t, co.closed.Load()) } diff --git a/downstreamadapter/dispatchermanager/heartbeat_collector.go b/downstreamadapter/dispatchermanager/heartbeat_collector.go index 8ffd9c9eb5..a95c572eed 100644 --- a/downstreamadapter/dispatchermanager/heartbeat_collector.go +++ b/downstreamadapter/dispatchermanager/heartbeat_collector.go @@ -78,7 +78,7 @@ func NewHeartBeatCollector(serverId node.ID) *HeartBeatCollector { mergeDispatcherRequestDynamicStream: newMergeDispatcherRequestDynamicStream(), mc: appcontext.GetService[messaging.MessageCenter](appcontext.MessageCenter), } - heartBeatCollector.mc.RegisterHandler(messaging.HeartbeatCollectorTopic, heartBeatCollector.RecvMessages) + heartBeatCollector.mc.RegisterHandler(messaging.HeartbeatCollectorTopic, heartBeatCollector.recvMessage) return &heartBeatCollector } @@ -239,7 +239,7 @@ func (c *HeartBeatCollector) sendBlockStatusMessages(ctx context.Context) error } } -func (c *HeartBeatCollector) RecvMessages(_ context.Context, msg *messaging.TargetMessage) error { +func (c *HeartBeatCollector) recvMessage(_ context.Context, msg *messaging.TargetMessage) { switch msg.Type { case messaging.TypeHeartBeatResponse: // TODO: Change a more appropriate name for HeartBeatResponse. It should be BlockStatusResponse or something else. @@ -272,7 +272,6 @@ func (c *HeartBeatCollector) RecvMessages(_ context.Context, msg *messaging.Targ default: log.Panic("unknown message type", zap.Any("message", msg.Message)) } - return nil } func (c *HeartBeatCollector) Close() { diff --git a/downstreamadapter/dispatcherorchestrator/dispatcher_orchestrator.go b/downstreamadapter/dispatcherorchestrator/dispatcher_orchestrator.go index 8298b0768d..327d4952fc 100644 --- a/downstreamadapter/dispatcherorchestrator/dispatcher_orchestrator.go +++ b/downstreamadapter/dispatcherorchestrator/dispatcher_orchestrator.go @@ -52,7 +52,7 @@ func New() *DispatcherOrchestrator { dispatcherManagers: make(map[common.ChangeFeedID]*dispatchermanager.DispatcherManager), msgChan: make(chan *messaging.TargetMessage, 1024), // buffer size 1024 } - m.mc.RegisterHandler(messaging.DispatcherManagerManagerTopic, m.RecvMaintainerRequest) + m.mc.RegisterHandler(messaging.DispatcherManagerManagerTopic, m.recvMessage) return m } @@ -73,20 +73,20 @@ func (m *DispatcherOrchestrator) Run(ctx context.Context) { }() } -// RecvMaintainerRequest is the handler for the maintainer request message. +// recvMessage is the handler for the maintainer request message. // It puts the message into a channel for asynchronous processing to avoid blocking the message center. -func (m *DispatcherOrchestrator) RecvMaintainerRequest( +func (m *DispatcherOrchestrator) recvMessage( _ context.Context, msg *messaging.TargetMessage, -) error { +) { // Put message into channel for asynchronous processing by another goroutine select { case m.msgChan <- msg: - return nil + return default: // Channel is full, log warning and drop the message log.Warn("message channel is full, dropping message", zap.Any("message", msg.Message)) - return nil + return } } diff --git a/downstreamadapter/eventcollector/event_collector.go b/downstreamadapter/eventcollector/event_collector.go index ebad636219..bb4cd0c803 100644 --- a/downstreamadapter/eventcollector/event_collector.go +++ b/downstreamadapter/eventcollector/event_collector.go @@ -137,8 +137,8 @@ func New(serverId node.ID) *EventCollector { eventCollector.logCoordinatorClient = newLogCoordinatorClient(eventCollector) eventCollector.ds = NewEventDynamicStream(eventCollector) eventCollector.redoDs = NewEventDynamicStream(eventCollector) - eventCollector.mc.RegisterHandler(messaging.EventCollectorTopic, eventCollector.MessageCenterHandler) - eventCollector.mc.RegisterHandler(messaging.RedoEventCollectorTopic, eventCollector.RedoMessageCenterHandler) + eventCollector.mc.RegisterHandler(messaging.EventCollectorTopic, eventCollector.recvMessage) + eventCollector.mc.RegisterHandler(messaging.RedoEventCollectorTopic, eventCollector.recvRedoMessage) return eventCollector } @@ -448,8 +448,8 @@ func (c *EventCollector) handleDispatcherHeartbeatResponse(targetMessage *messag } } -// MessageCenterHandler is the handler for the events message from EventService. -func (c *EventCollector) MessageCenterHandler(_ context.Context, targetMessage *messaging.TargetMessage) error { +// recvMessage is the handler for the events message from EventService. +func (c *EventCollector) recvMessage(_ context.Context, targetMessage *messaging.TargetMessage) { inflightDuration := time.Since(time.UnixMilli(targetMessage.CreateAt)).Seconds() c.metricReceiveEventLagDuration.Observe(inflightDuration) @@ -462,7 +462,7 @@ func (c *EventCollector) MessageCenterHandler(_ context.Context, targetMessage * // corresponding channel to handle it in multi-thread. if targetMessage.Type.IsLogServiceEvent() { c.receiveChannels[targetMessage.GetGroup()%uint64(len(c.receiveChannels))] <- targetMessage - return nil + return } for _, msg := range targetMessage.Message { @@ -473,19 +473,17 @@ func (c *EventCollector) MessageCenterHandler(_ context.Context, targetMessage * log.Panic("invalid message type", zap.Any("msg", msg)) } } - return nil } -// RedoMessageCenterHandler is the handler for the redo events message from EventService. -func (c *EventCollector) RedoMessageCenterHandler(_ context.Context, targetMessage *messaging.TargetMessage) error { +// recvRedoMessage is the handler for the redo events message from EventService. +func (c *EventCollector) recvRedoMessage(_ context.Context, targetMessage *messaging.TargetMessage) { // If the message is a log service event, we need to forward it to the // corresponding channel to handle it in multi-thread. if targetMessage.Type.IsLogServiceEvent() { c.redoReceiveChannels[targetMessage.GetGroup()%uint64(len(c.redoReceiveChannels))] <- targetMessage - return nil + return } log.Panic("invalid message type", zap.Any("msg", targetMessage)) - return nil } // runDispatchMessage dispatches messages from the input channel to the dynamic stream. diff --git a/downstreamadapter/eventcollector/log_coordinator_client.go b/downstreamadapter/eventcollector/log_coordinator_client.go index ae6a80f002..9d825a14cf 100644 --- a/downstreamadapter/eventcollector/log_coordinator_client.go +++ b/downstreamadapter/eventcollector/log_coordinator_client.go @@ -50,11 +50,11 @@ func newLogCoordinatorClient(eventCollector *EventCollector) *LogCoordinatorClie logCoordinatorRequestChan: chann.NewAutoDrainChann[*logservicepb.ReusableEventServiceRequest](), enableRemoteEventService: config.GetGlobalServerConfig().Debug.EventService.EnableRemoteEventService, } - client.mc.RegisterHandler(logCoordinatorClientTopic, client.MessageCenterHandler) + client.mc.RegisterHandler(logCoordinatorClientTopic, client.recvMessage) return client } -func (l *LogCoordinatorClient) MessageCenterHandler(_ context.Context, targetMessage *messaging.TargetMessage) error { +func (l *LogCoordinatorClient) recvMessage(_ context.Context, targetMessage *messaging.TargetMessage) { for _, msg := range targetMessage.Message { switch msg := msg.(type) { case *common.LogCoordinatorBroadcastRequest: @@ -69,7 +69,6 @@ func (l *LogCoordinatorClient) MessageCenterHandler(_ context.Context, targetMes log.Panic("invalid message type", zap.Any("msg", msg)) } } - return nil } func (l *LogCoordinatorClient) run(ctx context.Context) error { diff --git a/logservice/coordinator/coordinator.go b/logservice/coordinator/coordinator.go index cf5b8a66e2..6897307a42 100644 --- a/logservice/coordinator/coordinator.go +++ b/logservice/coordinator/coordinator.go @@ -97,7 +97,7 @@ func New() LogCoordinator { c.changefeedStates.m = make(map[common.GID]*changefeedState) // recv and handle messages - messageCenter.RegisterHandler(logCoordinatorTopic, c.handleMessage) + messageCenter.RegisterHandler(logCoordinatorTopic, c.recvMessage) // watch node changes nodeManager := appcontext.GetService[*watcher.NodeManager](watcher.NodeManagerName) nodes := nodeManager.GetAliveNodes() @@ -149,7 +149,7 @@ func (c *logCoordinator) Run(ctx context.Context) error { } } -func (c *logCoordinator) handleMessage(_ context.Context, targetMessage *messaging.TargetMessage) error { +func (c *logCoordinator) recvMessage(_ context.Context, targetMessage *messaging.TargetMessage) { for _, msg := range targetMessage.Message { switch msg := msg.(type) { case *logservicepb.EventStoreState: @@ -167,7 +167,6 @@ func (c *logCoordinator) handleMessage(_ context.Context, targetMessage *messagi log.Panic("invalid message type", zap.Any("msg", msg)) } } - return nil } func (c *logCoordinator) sendResolvedTsToCoordinator(id node.ID, changefeedID common.ChangeFeedID) { diff --git a/logservice/eventstore/event_store.go b/logservice/eventstore/event_store.go index 529ae666b6..5d63df73d2 100644 --- a/logservice/eventstore/event_store.go +++ b/logservice/eventstore/event_store.go @@ -280,7 +280,7 @@ func New( store.dispatcherMeta.dispatcherStats = make(map[common.DispatcherID]*dispatcherStat) store.dispatcherMeta.tableStats = make(map[int64]subscriptionStats) - store.messageCenter.RegisterHandler(messaging.EventStoreTopic, store.handleMessage) + store.messageCenter.RegisterHandler(messaging.EventStoreTopic, store.recvMessage) return store } @@ -1219,7 +1219,7 @@ func (iter *eventStoreIter) Close() (int64, error) { return iter.rowCount, err } -func (e *eventStore) handleMessage(_ context.Context, targetMessage *messaging.TargetMessage) error { +func (e *eventStore) recvMessage(_ context.Context, targetMessage *messaging.TargetMessage) { for _, msg := range targetMessage.Message { switch msg.(type) { case *common.LogCoordinatorBroadcastRequest: @@ -1228,7 +1228,6 @@ func (e *eventStore) handleMessage(_ context.Context, targetMessage *messaging.T log.Panic("invalid message type", zap.Any("msg", msg)) } } - return nil } type SubscriptionChangeType int diff --git a/maintainer/maintainer_manager.go b/maintainer/maintainer_manager.go index b76370cc5e..522cb49800 100644 --- a/maintainer/maintainer_manager.go +++ b/maintainer/maintainer_manager.go @@ -70,17 +70,17 @@ func NewMaintainerManager( taskScheduler: threadpool.NewThreadPoolDefault(), } - mc.RegisterHandler(messaging.MaintainerManagerTopic, m.recvMessages) + mc.RegisterHandler(messaging.MaintainerManagerTopic, m.recvMessage) mc.RegisterHandler(messaging.MaintainerTopic, - func(ctx context.Context, msg *messaging.TargetMessage) error { + func(ctx context.Context, msg *messaging.TargetMessage) { req := msg.Message[0].(*heartbeatpb.MaintainerCloseResponse) - return m.dispatcherMaintainerMessage(ctx, common.NewChangefeedIDFromPB(req.ChangefeedID), msg) + m.dispatcherMaintainerMessage(ctx, common.NewChangefeedIDFromPB(req.ChangefeedID), msg) }) return m } -// recvMessages is the message handler for maintainer manager -func (m *Manager) recvMessages(ctx context.Context, msg *messaging.TargetMessage) error { +// recvMessage is the message handler for maintainer manager +func (m *Manager) recvMessage(ctx context.Context, msg *messaging.TargetMessage) { switch msg.Type { // Coordinator related messages case messaging.TypeAddMaintainerRequest, @@ -88,31 +88,29 @@ func (m *Manager) recvMessages(ctx context.Context, msg *messaging.TargetMessage messaging.TypeCoordinatorBootstrapRequest: select { case <-ctx.Done(): - return ctx.Err() + return case m.msgCh <- msg: } - return nil // receive bootstrap response message from the dispatcher manager case messaging.TypeMaintainerBootstrapResponse: req := msg.Message[0].(*heartbeatpb.MaintainerBootstrapResponse) - return m.dispatcherMaintainerMessage(ctx, common.NewChangefeedIDFromPB(req.ChangefeedID), msg) + m.dispatcherMaintainerMessage(ctx, common.NewChangefeedIDFromPB(req.ChangefeedID), msg) case messaging.TypeMaintainerPostBootstrapResponse: req := msg.Message[0].(*heartbeatpb.MaintainerPostBootstrapResponse) - return m.dispatcherMaintainerMessage(ctx, common.NewChangefeedIDFromPB(req.ChangefeedID), msg) + m.dispatcherMaintainerMessage(ctx, common.NewChangefeedIDFromPB(req.ChangefeedID), msg) // receive heartbeat message from dispatchers case messaging.TypeHeartBeatRequest: req := msg.Message[0].(*heartbeatpb.HeartBeatRequest) - return m.dispatcherMaintainerMessage(ctx, common.NewChangefeedIDFromPB(req.ChangefeedID), msg) + m.dispatcherMaintainerMessage(ctx, common.NewChangefeedIDFromPB(req.ChangefeedID), msg) case messaging.TypeBlockStatusRequest: req := msg.Message[0].(*heartbeatpb.BlockStatusRequest) - return m.dispatcherMaintainerMessage(ctx, common.NewChangefeedIDFromPB(req.ChangefeedID), msg) + m.dispatcherMaintainerMessage(ctx, common.NewChangefeedIDFromPB(req.ChangefeedID), msg) case messaging.TypeCheckpointTsMessage: req := msg.Message[0].(*heartbeatpb.CheckpointTsMessage) - return m.dispatcherMaintainerMessage(ctx, common.NewChangefeedIDFromPB(req.ChangefeedID), msg) + m.dispatcherMaintainerMessage(ctx, common.NewChangefeedIDFromPB(req.ChangefeedID), msg) default: log.Panic("unknown message type", zap.Any("message", msg.Message)) } - return nil } func (m *Manager) Name() string { @@ -348,17 +346,17 @@ func (m *Manager) handleMessage(msg *messaging.TargetMessage) { func (m *Manager) dispatcherMaintainerMessage( ctx context.Context, changefeed common.ChangeFeedID, msg *messaging.TargetMessage, -) error { +) { c, ok := m.maintainers.Load(changefeed) if !ok { log.Warn("maintainer is not found", zap.Stringer("changefeed", changefeed), zap.String("message", msg.String())) - return nil + return } select { case <-ctx.Done(): - return ctx.Err() + return default: maintainer := c.(*Maintainer) maintainer.pushEvent(&Event{ @@ -367,7 +365,6 @@ func (m *Manager) dispatcherMaintainerMessage( message: msg, }) } - return nil } func (m *Manager) GetMaintainerForChangefeed(changefeedID common.ChangeFeedID) (*Maintainer, bool) { diff --git a/maintainer/maintainer_manager_test.go b/maintainer/maintainer_manager_test.go index c600ce34be..bdd7973104 100644 --- a/maintainer/maintainer_manager_test.go +++ b/maintainer/maintainer_manager_test.go @@ -90,9 +90,7 @@ func TestMaintainerSchedulesNodeChanges(t *testing.T) { startDispatcherNode(t, ctx, selfNode, mc, nodeManager) nodeManager.RegisterNodeChangeHandler(appcontext.MessageCenter, mc.OnNodeChanges) // Discard maintainer manager messages, cuz we don't need to handle them in this test - mc.RegisterHandler(messaging.CoordinatorTopic, func(ctx context.Context, msg *messaging.TargetMessage) error { - return nil - }) + mc.RegisterHandler(messaging.CoordinatorTopic, func(ctx context.Context, msg *messaging.TargetMessage) {}) schedulerConf := &config.SchedulerConfig{ AddTableBatchSize: 1000, CheckBalanceInterval: 0, @@ -316,9 +314,7 @@ func TestMaintainerBootstrapWithTablesReported(t *testing.T) { startDispatcherNode(t, ctx, selfNode, mc, nodeManager) nodeManager.RegisterNodeChangeHandler(appcontext.MessageCenter, mc.OnNodeChanges) // discard maintainer manager messages - mc.RegisterHandler(messaging.CoordinatorTopic, func(ctx context.Context, msg *messaging.TargetMessage) error { - return nil - }) + mc.RegisterHandler(messaging.CoordinatorTopic, func(ctx context.Context, msg *messaging.TargetMessage) {}) manager := NewMaintainerManager(selfNode, config.GetGlobalServerConfig().Debug.Scheduler) msg := messaging.NewSingleTargetMessage(selfNode.ID, messaging.MaintainerManagerTopic, @@ -452,9 +448,7 @@ func TestStopNotExistsMaintainer(t *testing.T) { startDispatcherNode(t, ctx, selfNode, mc, nodeManager) nodeManager.RegisterNodeChangeHandler(appcontext.MessageCenter, mc.OnNodeChanges) // discard maintainer manager messages - mc.RegisterHandler(messaging.CoordinatorTopic, func(ctx context.Context, msg *messaging.TargetMessage) error { - return nil - }) + mc.RegisterHandler(messaging.CoordinatorTopic, func(ctx context.Context, msg *messaging.TargetMessage) {}) schedulerConf := &config.SchedulerConfig{AddTableBatchSize: 1000} manager := NewMaintainerManager(selfNode, schedulerConf) msg := messaging.NewSingleTargetMessage(selfNode.ID, diff --git a/maintainer/maintainer_test.go b/maintainer/maintainer_test.go index 58f6cf9288..69dc443e5d 100644 --- a/maintainer/maintainer_test.go +++ b/maintainer/maintainer_test.go @@ -62,8 +62,8 @@ func MockDispatcherManager(mc messaging.MessageCenter, self node.ID) *mockDispat dispatchersMap: make(map[heartbeatpb.DispatcherID]*heartbeatpb.TableSpanStatus, 2000001), self: self, } - mc.RegisterHandler(messaging.DispatcherManagerManagerTopic, m.recvMessages) - mc.RegisterHandler(messaging.HeartbeatCollectorTopic, m.recvMessages) + mc.RegisterHandler(messaging.DispatcherManagerManagerTopic, m.recvMessage) + mc.RegisterHandler(messaging.HeartbeatCollectorTopic, m.recvMessage) return m } @@ -108,7 +108,7 @@ func (m *mockDispatcherManager) sendMessages(msg *heartbeatpb.HeartBeatRequest) } } -func (m *mockDispatcherManager) recvMessages(ctx context.Context, msg *messaging.TargetMessage) error { +func (m *mockDispatcherManager) recvMessage(ctx context.Context, msg *messaging.TargetMessage) { switch msg.Type { // receive message from maintainer case messaging.TypeScheduleDispatcherRequest, @@ -117,14 +117,12 @@ func (m *mockDispatcherManager) recvMessages(ctx context.Context, msg *messaging messaging.TypeMaintainerCloseRequest: select { case <-ctx.Done(): - return ctx.Err() + return case m.msgCh <- msg: } - return nil default: log.Panic("unknown message type", zap.Any("message", msg.Message), zap.Any("type", msg.Type)) } - return nil } func (m *mockDispatcherManager) onBootstrapRequest(msg *messaging.TargetMessage) { @@ -332,13 +330,12 @@ func TestMaintainerSchedule(t *testing.T) { }, n, taskScheduler, 10, true, common.DefaultKeyspaceID) mc.RegisterHandler(messaging.MaintainerManagerTopic, - func(ctx context.Context, msg *messaging.TargetMessage) error { + func(ctx context.Context, msg *messaging.TargetMessage) { maintainer.eventCh.In() <- &Event{ changefeedID: cfID, eventType: EventMessage, message: msg, } - return nil }) // send bootstrap message diff --git a/pkg/eventservice/event_service.go b/pkg/eventservice/event_service.go index ec78b6f7d4..a43e70bab2 100644 --- a/pkg/eventservice/event_service.go +++ b/pkg/eventservice/event_service.go @@ -89,7 +89,7 @@ func New(eventStore eventstore.EventStore, schemaStore schemastore.SchemaStore) dispatcherInfoChan: make(chan DispatcherInfo, 32), dispatcherHeartbeat: make(chan *DispatcherHeartBeatWithServerID, 32), } - es.mc.RegisterHandler(messaging.EventServiceTopic, es.handleMessage) + es.mc.RegisterHandler(messaging.EventServiceTopic, es.recvMessage) return es } @@ -140,14 +140,14 @@ func (s *eventService) Close(_ context.Context) error { return nil } -func (s *eventService) handleMessage(ctx context.Context, msg *messaging.TargetMessage) error { +func (s *eventService) recvMessage(ctx context.Context, msg *messaging.TargetMessage) { switch msg.Type { case messaging.TypeDispatcherRequest: infos := msgToDispatcherInfo(msg) for _, info := range infos { select { case <-ctx.Done(): - return ctx.Err() + return case s.dispatcherInfoChan <- info: } } @@ -158,7 +158,7 @@ func (s *eventService) handleMessage(ctx context.Context, msg *messaging.TargetM heartbeat := msg.Message[0].(*event.DispatcherHeartbeat) select { case <-ctx.Done(): - return ctx.Err() + return case s.dispatcherHeartbeat <- &DispatcherHeartBeatWithServerID{ serverID: msg.From.String(), heartbeat: heartbeat, @@ -173,7 +173,6 @@ func (s *eventService) handleMessage(ctx context.Context, msg *messaging.TargetM default: log.Panic("unknown message type", zap.String("type", msg.Type.String()), zap.Any("message", msg)) } - return nil } func (s *eventService) registerDispatcher(ctx context.Context, info DispatcherInfo) { diff --git a/pkg/messaging/message_center_integration_test.go b/pkg/messaging/message_center_integration_test.go index 1a2b833352..04c7a53bba 100644 --- a/pkg/messaging/message_center_integration_test.go +++ b/pkg/messaging/message_center_integration_test.go @@ -52,10 +52,9 @@ func setupMessageCenters(t *testing.T) (*messageCenter, *messageCenter, *message func registerHandler(mc *messageCenter, topic string) chan *TargetMessage { ch := make(chan *TargetMessage, 1) - mc.RegisterHandler(topic, func(ctx context.Context, msg *TargetMessage) error { + mc.RegisterHandler(topic, func(ctx context.Context, msg *TargetMessage) { ch <- msg log.Info(fmt.Sprintf("%s received message", mc.id), zap.Any("msg", msg)) - return nil }) return ch } @@ -83,9 +82,8 @@ func waitForTargetsReady(mc *messageCenter) { func sendAndReceiveMessage(t *testing.T, sender *messageCenter, receiver *messageCenter, topic string, event *commonEvent.BatchDMLEvent) { targetMsg := NewSingleTargetMessage(receiver.id, topic, event) ch := make(chan *TargetMessage, 1) - receiver.RegisterHandler(topic, func(ctx context.Context, msg *TargetMessage) error { + receiver.RegisterHandler(topic, func(ctx context.Context, msg *TargetMessage) { ch <- msg - return nil }) timeoutCh := time.After(30 * time.Second) diff --git a/pkg/messaging/remote_target.go b/pkg/messaging/remote_target.go index 6794fd2af8..5bdcf402ff 100644 --- a/pkg/messaging/remote_target.go +++ b/pkg/messaging/remote_target.go @@ -36,15 +36,8 @@ import ( ) const ( - reconnectInterval = 2 * time.Second streamTypeEvent = "event" streamTypeCommand = "command" - - eventRecvCh = "eventRecvCh" - commandRecvCh = "commandRecvCh" - - eventSendCh = "eventSendCh" - commandSendCh = "commandSendCh" ) type streamSession struct { diff --git a/pkg/messaging/router.go b/pkg/messaging/router.go index b5c0a46d23..796cbd5a71 100644 --- a/pkg/messaging/router.go +++ b/pkg/messaging/router.go @@ -23,7 +23,7 @@ import ( "go.uber.org/zap" ) -type MessageHandler func(ctx context.Context, msg *TargetMessage) error +type MessageHandler func(ctx context.Context, msg *TargetMessage) type router struct { mu sync.RWMutex @@ -64,7 +64,7 @@ func (r *router) runDispatch(ctx context.Context, out <-chan *TargetMessage) { continue } start := time.Now() - err := handler(ctx, msg) + handler(ctx, msg) now := time.Now() if now.Sub(start) > 100*time.Millisecond { // Rate limit logging: only log once every 10 seconds @@ -80,9 +80,6 @@ func (r *router) runDispatch(ctx context.Context, out <-chan *TargetMessage) { // Always increment metrics counter for slow message handling metrics.MessagingSlowHandleCounter.WithLabelValues(msg.Type.String()).Inc() } - if err != nil { - log.Error("Handle message failed", zap.Error(err), zap.Any("msg", msg)) - } } } } diff --git a/pkg/messaging/router_test.go b/pkg/messaging/router_test.go index 5adf9fc7b7..e9ba420cb2 100644 --- a/pkg/messaging/router_test.go +++ b/pkg/messaging/router_test.go @@ -15,7 +15,6 @@ package messaging import ( "context" - "errors" "fmt" "sync" "testing" @@ -28,7 +27,7 @@ func TestRegisterAndDeRegisterHandler(t *testing.T) { r := newRouter() testTopic := "test-topic" - handler := func(ctx context.Context, msg *TargetMessage) error { return nil } + handler := func(ctx context.Context, msg *TargetMessage) {} r.registerHandler(testTopic, handler) assert.Len(t, r.handlers, 1) @@ -60,9 +59,8 @@ func TestRunDispatchSuccessful(t *testing.T) { t.Parallel() handledMsg := make([]*TargetMessage, 0) r := newRouter() - r.registerHandler("topic1", func(ctx context.Context, msg *TargetMessage) error { + r.registerHandler("topic1", func(ctx context.Context, msg *TargetMessage) { handledMsg = append(handledMsg, msg) - return nil }) msgChan := make(chan *TargetMessage, 1) @@ -77,22 +75,6 @@ func TestRunDispatchSuccessful(t *testing.T) { assert.Equal(t, "topic1", handledMsg[0].Topic) } -func TestRunDispatchWithError(t *testing.T) { - t.Parallel() - r := newRouter() - r.registerHandler("topic1", func(ctx context.Context, msg *TargetMessage) error { - return errors.New("handler error") - }) - - msgChan := make(chan *TargetMessage, 1) - msgChan <- &TargetMessage{Topic: "topic1", Message: newMockIOTypeT([]byte("test"))} - - ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) - defer cancel() - - r.runDispatch(ctx, msgChan) -} - func TestRunDispatchNoHandler(t *testing.T) { t.Parallel() r := newRouter() @@ -115,7 +97,7 @@ func TestConcurrentAccess(t *testing.T) { go func() { defer wg.Done() - handler := func(ctx context.Context, msg *TargetMessage) error { return nil } + handler := func(ctx context.Context, msg *TargetMessage) {} r.registerHandler(topic, handler) }() diff --git a/pkg/messaging/stream.go b/pkg/messaging/stream.go index 17c3771d78..cddbd55e56 100644 --- a/pkg/messaging/stream.go +++ b/pkg/messaging/stream.go @@ -14,8 +14,6 @@ package messaging import ( - "sync/atomic" - "github.com/pingcap/ticdc/pkg/messaging/proto" ) @@ -27,23 +25,12 @@ type grpcStream interface { Recv() (*proto.Message, error) } -var streamGenerator atomic.Uint64 - type streamWrapper struct { grpcStream id uint64 streamType string } -// newStreamWrapper creates a new stream wrapper. -func newStreamWrapper(stream grpcStream, streamType string) *streamWrapper { - return &streamWrapper{ - grpcStream: stream, - id: streamGenerator.Add(1), - streamType: streamType, - } -} - func (s *streamWrapper) Send(msg *proto.Message) error { return s.grpcStream.Send(msg) }