Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 4 additions & 6 deletions coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func New(node *node.Info,
backend: backend,
}
// handle messages from message center
mc.RegisterHandler(messaging.CoordinatorTopic, c.recvMessages)
mc.RegisterHandler(messaging.CoordinatorTopic, c.recvMessage)
c.controller = NewController(
c.version,
c.nodeInfo,
Expand All @@ -145,9 +145,9 @@ func New(node *node.Info,
return c
}

func (c *coordinator) recvMessages(ctx context.Context, msg *messaging.TargetMessage) error {
func (c *coordinator) recvMessage(ctx context.Context, msg *messaging.TargetMessage) {
if c.closed.Load() {
return nil
return
}

defer func() {
Expand All @@ -168,12 +168,10 @@ func (c *coordinator) recvMessages(ctx context.Context, msg *messaging.TargetMes

select {
case <-ctx.Done():
return ctx.Err()
return
default:
c.eventCh.In() <- &Event{message: msg}
}

return nil
}

// Run spawns two goroutines to handle messages and run the coordinator.
Expand Down
17 changes: 5 additions & 12 deletions coordinator/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func NewMaintainerManager(mc messaging.MessageCenter) *mockMaintainerManager {
maintainerMap: make(map[common.ChangeFeedID]*heartbeatpb.MaintainerStatus, 1000000),
msgCh: make(chan *messaging.TargetMessage, 1024),
}
mc.RegisterHandler(messaging.MaintainerManagerTopic, m.recvMessages)
mc.RegisterHandler(messaging.MaintainerManagerTopic, m.recvMessage)
return m
}

Expand Down Expand Up @@ -125,22 +125,20 @@ func (m *mockMaintainerManager) sendMessages(msg *heartbeatpb.MaintainerHeartbea
}
}

func (m *mockMaintainerManager) recvMessages(ctx context.Context, msg *messaging.TargetMessage) error {
func (m *mockMaintainerManager) recvMessage(ctx context.Context, msg *messaging.TargetMessage) {
switch msg.Type {
// receive message from coordinator
case messaging.TypeAddMaintainerRequest, messaging.TypeRemoveMaintainerRequest:
fallthrough
case messaging.TypeCoordinatorBootstrapRequest:
select {
case <-ctx.Done():
return ctx.Err()
return
case m.msgCh <- msg:
}
return nil
default:
log.Panic("unknown message type", zap.Any("message", msg.Message))
}
return nil
}

func (m *mockMaintainerManager) onCoordinatorBootstrapRequest(msg *messaging.TargetMessage) {
Expand Down Expand Up @@ -578,11 +576,7 @@ func TestConcurrentStopAndSendEvents(t *testing.T) {
}

// Use recvMessages to send event to channel
err := co.recvMessages(ctx, msg)
if err != nil && err != context.Canceled {
t.Logf("Failed to send event in goroutine %d: %v", id, err)
}

co.recvMessage(ctx, msg)
// Small sleep to increase chance of race conditions
time.Sleep(time.Millisecond)
}
Expand Down Expand Up @@ -625,8 +619,7 @@ func TestConcurrentStopAndSendEvents(t *testing.T) {
Type: messaging.TypeMaintainerHeartbeatRequest,
}

err := co.recvMessages(ctx, msg)
require.NoError(t, err)
co.recvMessage(ctx, msg)
require.True(t, co.closed.Load())
}

Expand Down
5 changes: 2 additions & 3 deletions downstreamadapter/dispatchermanager/heartbeat_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func NewHeartBeatCollector(serverId node.ID) *HeartBeatCollector {
mergeDispatcherRequestDynamicStream: newMergeDispatcherRequestDynamicStream(),
mc: appcontext.GetService[messaging.MessageCenter](appcontext.MessageCenter),
}
heartBeatCollector.mc.RegisterHandler(messaging.HeartbeatCollectorTopic, heartBeatCollector.RecvMessages)
heartBeatCollector.mc.RegisterHandler(messaging.HeartbeatCollectorTopic, heartBeatCollector.recvMessage)

return &heartBeatCollector
}
Expand Down Expand Up @@ -239,7 +239,7 @@ func (c *HeartBeatCollector) sendBlockStatusMessages(ctx context.Context) error
}
}

func (c *HeartBeatCollector) RecvMessages(_ context.Context, msg *messaging.TargetMessage) error {
func (c *HeartBeatCollector) recvMessage(_ context.Context, msg *messaging.TargetMessage) {
switch msg.Type {
case messaging.TypeHeartBeatResponse:
// TODO: Change a more appropriate name for HeartBeatResponse. It should be BlockStatusResponse or something else.
Expand Down Expand Up @@ -272,7 +272,6 @@ func (c *HeartBeatCollector) RecvMessages(_ context.Context, msg *messaging.Targ
default:
log.Panic("unknown message type", zap.Any("message", msg.Message))
}
return nil
}

func (c *HeartBeatCollector) Close() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func New() *DispatcherOrchestrator {
dispatcherManagers: make(map[common.ChangeFeedID]*dispatchermanager.DispatcherManager),
msgChan: make(chan *messaging.TargetMessage, 1024), // buffer size 1024
}
m.mc.RegisterHandler(messaging.DispatcherManagerManagerTopic, m.RecvMaintainerRequest)
m.mc.RegisterHandler(messaging.DispatcherManagerManagerTopic, m.recvMessage)
return m
}

Expand All @@ -73,20 +73,20 @@ func (m *DispatcherOrchestrator) Run(ctx context.Context) {
}()
}

// RecvMaintainerRequest is the handler for the maintainer request message.
// recvMessage is the handler for the maintainer request message.
// It puts the message into a channel for asynchronous processing to avoid blocking the message center.
func (m *DispatcherOrchestrator) RecvMaintainerRequest(
func (m *DispatcherOrchestrator) recvMessage(
_ context.Context,
msg *messaging.TargetMessage,
) error {
) {
// Put message into channel for asynchronous processing by another goroutine
select {
case m.msgChan <- msg:
return nil
return
default:
// Channel is full, log warning and drop the message
log.Warn("message channel is full, dropping message", zap.Any("message", msg.Message))
return nil
return
}
}

Expand Down
18 changes: 8 additions & 10 deletions downstreamadapter/eventcollector/event_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,8 @@ func New(serverId node.ID) *EventCollector {
eventCollector.logCoordinatorClient = newLogCoordinatorClient(eventCollector)
eventCollector.ds = NewEventDynamicStream(eventCollector)
eventCollector.redoDs = NewEventDynamicStream(eventCollector)
eventCollector.mc.RegisterHandler(messaging.EventCollectorTopic, eventCollector.MessageCenterHandler)
eventCollector.mc.RegisterHandler(messaging.RedoEventCollectorTopic, eventCollector.RedoMessageCenterHandler)
eventCollector.mc.RegisterHandler(messaging.EventCollectorTopic, eventCollector.recvMessage)
eventCollector.mc.RegisterHandler(messaging.RedoEventCollectorTopic, eventCollector.recvRedoMessage)

return eventCollector
}
Expand Down Expand Up @@ -448,8 +448,8 @@ func (c *EventCollector) handleDispatcherHeartbeatResponse(targetMessage *messag
}
}

// MessageCenterHandler is the handler for the events message from EventService.
func (c *EventCollector) MessageCenterHandler(_ context.Context, targetMessage *messaging.TargetMessage) error {
// recvMessage is the handler for the events message from EventService.
func (c *EventCollector) recvMessage(_ context.Context, targetMessage *messaging.TargetMessage) {
inflightDuration := time.Since(time.UnixMilli(targetMessage.CreateAt)).Seconds()
c.metricReceiveEventLagDuration.Observe(inflightDuration)

Expand All @@ -462,7 +462,7 @@ func (c *EventCollector) MessageCenterHandler(_ context.Context, targetMessage *
// corresponding channel to handle it in multi-thread.
if targetMessage.Type.IsLogServiceEvent() {
c.receiveChannels[targetMessage.GetGroup()%uint64(len(c.receiveChannels))] <- targetMessage
return nil
return
}

for _, msg := range targetMessage.Message {
Expand All @@ -473,19 +473,17 @@ func (c *EventCollector) MessageCenterHandler(_ context.Context, targetMessage *
log.Panic("invalid message type", zap.Any("msg", msg))
}
}
return nil
}

// RedoMessageCenterHandler is the handler for the redo events message from EventService.
func (c *EventCollector) RedoMessageCenterHandler(_ context.Context, targetMessage *messaging.TargetMessage) error {
// recvRedoMessage is the handler for the redo events message from EventService.
func (c *EventCollector) recvRedoMessage(_ context.Context, targetMessage *messaging.TargetMessage) {
// If the message is a log service event, we need to forward it to the
// corresponding channel to handle it in multi-thread.
if targetMessage.Type.IsLogServiceEvent() {
c.redoReceiveChannels[targetMessage.GetGroup()%uint64(len(c.redoReceiveChannels))] <- targetMessage
return nil
return
}
log.Panic("invalid message type", zap.Any("msg", targetMessage))
return nil
}

// runDispatchMessage dispatches messages from the input channel to the dynamic stream.
Expand Down
5 changes: 2 additions & 3 deletions downstreamadapter/eventcollector/log_coordinator_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,11 @@ func newLogCoordinatorClient(eventCollector *EventCollector) *LogCoordinatorClie
logCoordinatorRequestChan: chann.NewAutoDrainChann[*logservicepb.ReusableEventServiceRequest](),
enableRemoteEventService: config.GetGlobalServerConfig().Debug.EventService.EnableRemoteEventService,
}
client.mc.RegisterHandler(logCoordinatorClientTopic, client.MessageCenterHandler)
client.mc.RegisterHandler(logCoordinatorClientTopic, client.recvMessage)
return client
}

func (l *LogCoordinatorClient) MessageCenterHandler(_ context.Context, targetMessage *messaging.TargetMessage) error {
func (l *LogCoordinatorClient) recvMessage(_ context.Context, targetMessage *messaging.TargetMessage) {
for _, msg := range targetMessage.Message {
switch msg := msg.(type) {
case *common.LogCoordinatorBroadcastRequest:
Expand All @@ -69,7 +69,6 @@ func (l *LogCoordinatorClient) MessageCenterHandler(_ context.Context, targetMes
log.Panic("invalid message type", zap.Any("msg", msg))
}
}
return nil
}

func (l *LogCoordinatorClient) run(ctx context.Context) error {
Expand Down
5 changes: 2 additions & 3 deletions logservice/coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func New() LogCoordinator {
c.changefeedStates.m = make(map[common.GID]*changefeedState)

// recv and handle messages
messageCenter.RegisterHandler(logCoordinatorTopic, c.handleMessage)
messageCenter.RegisterHandler(logCoordinatorTopic, c.recvMessage)
// watch node changes
nodeManager := appcontext.GetService[*watcher.NodeManager](watcher.NodeManagerName)
nodes := nodeManager.GetAliveNodes()
Expand Down Expand Up @@ -149,7 +149,7 @@ func (c *logCoordinator) Run(ctx context.Context) error {
}
}

func (c *logCoordinator) handleMessage(_ context.Context, targetMessage *messaging.TargetMessage) error {
func (c *logCoordinator) recvMessage(_ context.Context, targetMessage *messaging.TargetMessage) {
for _, msg := range targetMessage.Message {
switch msg := msg.(type) {
case *logservicepb.EventStoreState:
Expand All @@ -167,7 +167,6 @@ func (c *logCoordinator) handleMessage(_ context.Context, targetMessage *messagi
log.Panic("invalid message type", zap.Any("msg", msg))
}
}
return nil
}

func (c *logCoordinator) sendResolvedTsToCoordinator(id node.ID, changefeedID common.ChangeFeedID) {
Expand Down
5 changes: 2 additions & 3 deletions logservice/eventstore/event_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ func New(
store.dispatcherMeta.dispatcherStats = make(map[common.DispatcherID]*dispatcherStat)
store.dispatcherMeta.tableStats = make(map[int64]subscriptionStats)

store.messageCenter.RegisterHandler(messaging.EventStoreTopic, store.handleMessage)
store.messageCenter.RegisterHandler(messaging.EventStoreTopic, store.recvMessage)
return store
}

Expand Down Expand Up @@ -1219,7 +1219,7 @@ func (iter *eventStoreIter) Close() (int64, error) {
return iter.rowCount, err
}

func (e *eventStore) handleMessage(_ context.Context, targetMessage *messaging.TargetMessage) error {
func (e *eventStore) recvMessage(_ context.Context, targetMessage *messaging.TargetMessage) {
for _, msg := range targetMessage.Message {
switch msg.(type) {
case *common.LogCoordinatorBroadcastRequest:
Expand All @@ -1228,7 +1228,6 @@ func (e *eventStore) handleMessage(_ context.Context, targetMessage *messaging.T
log.Panic("invalid message type", zap.Any("msg", msg))
}
}
return nil
}

type SubscriptionChangeType int
Expand Down
31 changes: 14 additions & 17 deletions maintainer/maintainer_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,49 +70,47 @@ func NewMaintainerManager(
taskScheduler: threadpool.NewThreadPoolDefault(),
}

mc.RegisterHandler(messaging.MaintainerManagerTopic, m.recvMessages)
mc.RegisterHandler(messaging.MaintainerManagerTopic, m.recvMessage)
mc.RegisterHandler(messaging.MaintainerTopic,
func(ctx context.Context, msg *messaging.TargetMessage) error {
func(ctx context.Context, msg *messaging.TargetMessage) {
req := msg.Message[0].(*heartbeatpb.MaintainerCloseResponse)
return m.dispatcherMaintainerMessage(ctx, common.NewChangefeedIDFromPB(req.ChangefeedID), msg)
m.dispatcherMaintainerMessage(ctx, common.NewChangefeedIDFromPB(req.ChangefeedID), msg)
})
return m
}

// recvMessages is the message handler for maintainer manager
func (m *Manager) recvMessages(ctx context.Context, msg *messaging.TargetMessage) error {
// recvMessage is the message handler for maintainer manager
func (m *Manager) recvMessage(ctx context.Context, msg *messaging.TargetMessage) {
switch msg.Type {
// Coordinator related messages
case messaging.TypeAddMaintainerRequest,
messaging.TypeRemoveMaintainerRequest,
messaging.TypeCoordinatorBootstrapRequest:
select {
case <-ctx.Done():
return ctx.Err()
return
case m.msgCh <- msg:
}
return nil
// receive bootstrap response message from the dispatcher manager
case messaging.TypeMaintainerBootstrapResponse:
req := msg.Message[0].(*heartbeatpb.MaintainerBootstrapResponse)
return m.dispatcherMaintainerMessage(ctx, common.NewChangefeedIDFromPB(req.ChangefeedID), msg)
m.dispatcherMaintainerMessage(ctx, common.NewChangefeedIDFromPB(req.ChangefeedID), msg)
case messaging.TypeMaintainerPostBootstrapResponse:
req := msg.Message[0].(*heartbeatpb.MaintainerPostBootstrapResponse)
return m.dispatcherMaintainerMessage(ctx, common.NewChangefeedIDFromPB(req.ChangefeedID), msg)
m.dispatcherMaintainerMessage(ctx, common.NewChangefeedIDFromPB(req.ChangefeedID), msg)
// receive heartbeat message from dispatchers
case messaging.TypeHeartBeatRequest:
req := msg.Message[0].(*heartbeatpb.HeartBeatRequest)
return m.dispatcherMaintainerMessage(ctx, common.NewChangefeedIDFromPB(req.ChangefeedID), msg)
m.dispatcherMaintainerMessage(ctx, common.NewChangefeedIDFromPB(req.ChangefeedID), msg)
case messaging.TypeBlockStatusRequest:
req := msg.Message[0].(*heartbeatpb.BlockStatusRequest)
return m.dispatcherMaintainerMessage(ctx, common.NewChangefeedIDFromPB(req.ChangefeedID), msg)
m.dispatcherMaintainerMessage(ctx, common.NewChangefeedIDFromPB(req.ChangefeedID), msg)
case messaging.TypeCheckpointTsMessage:
req := msg.Message[0].(*heartbeatpb.CheckpointTsMessage)
return m.dispatcherMaintainerMessage(ctx, common.NewChangefeedIDFromPB(req.ChangefeedID), msg)
m.dispatcherMaintainerMessage(ctx, common.NewChangefeedIDFromPB(req.ChangefeedID), msg)
default:
log.Panic("unknown message type", zap.Any("message", msg.Message))
}
return nil
}

func (m *Manager) Name() string {
Expand Down Expand Up @@ -348,17 +346,17 @@ func (m *Manager) handleMessage(msg *messaging.TargetMessage) {

func (m *Manager) dispatcherMaintainerMessage(
ctx context.Context, changefeed common.ChangeFeedID, msg *messaging.TargetMessage,
) error {
) {
c, ok := m.maintainers.Load(changefeed)
if !ok {
log.Warn("maintainer is not found",
zap.Stringer("changefeed", changefeed),
zap.String("message", msg.String()))
return nil
return
}
select {
case <-ctx.Done():
return ctx.Err()
return
default:
maintainer := c.(*Maintainer)
maintainer.pushEvent(&Event{
Expand All @@ -367,7 +365,6 @@ func (m *Manager) dispatcherMaintainerMessage(
message: msg,
})
}
return nil
}

func (m *Manager) GetMaintainerForChangefeed(changefeedID common.ChangeFeedID) (*Maintainer, bool) {
Expand Down
Loading