From 52cdedb6032aaae288ddef6fb5920af8f5da9dc2 Mon Sep 17 00:00:00 2001 From: Swapneeth Gorantla Date: Mon, 27 Oct 2025 18:18:15 -0400 Subject: [PATCH 1/2] storeliveness: rename `SendAsync` to `EnqueueMessage` Previously, the `MessageSender` interface exposed a method called `SendAsync` that queued Store Liveness messages for asynchronous delivery to remote stores. The name `SendAsync` was misleading because it implied messages were sent directly, when in reality they were enqueued to per-node queues and processed in batches before being sent over the network. The rename to `EnqueueMessage` better reflects that the method places messages onto an outgoing queue rather than performing the actual send operation. This patch renames `SendAsync` to `EnqueueMessage` throughout the storeliveness package. This change is also set up for the next commit which enables smearing of the heartbeats. References: #148210 Release note: None --- .../kvserver/storeliveness/support_manager.go | 6 ++-- pkg/kv/kvserver/storeliveness/testutils.go | 2 +- pkg/kv/kvserver/storeliveness/transport.go | 6 ++-- .../kvserver/storeliveness/transport_test.go | 36 +++++++++---------- 4 files changed, 25 insertions(+), 25 deletions(-) diff --git a/pkg/kv/kvserver/storeliveness/support_manager.go b/pkg/kv/kvserver/storeliveness/support_manager.go index 3b0521b91d70..a1932b08d483 100644 --- a/pkg/kv/kvserver/storeliveness/support_manager.go +++ b/pkg/kv/kvserver/storeliveness/support_manager.go @@ -34,7 +34,7 @@ var Enabled = settings.RegisterBoolSetting( // MessageSender is the interface that defines how Store Liveness messages are // sent. Transport is the production implementation of MessageSender. type MessageSender interface { - SendAsync(ctx context.Context, msg slpb.Message) (sent bool) + EnqueueMessage(ctx context.Context, msg slpb.Message) (sent bool) } // SupportManager orchestrates requesting and providing Store Liveness support. @@ -323,7 +323,7 @@ func (sm *SupportManager) sendHeartbeats(ctx context.Context) { // Send heartbeats to each remote store. successes := 0 for _, msg := range heartbeats { - if sent := sm.sender.SendAsync(ctx, msg); sent { + if sent := sm.sender.EnqueueMessage(ctx, msg); sent { successes++ } else { log.KvExec.Warningf(ctx, "failed to send heartbeat to store %+v", msg.To) @@ -424,7 +424,7 @@ func (sm *SupportManager) handleMessages(ctx context.Context, msgs []*slpb.Messa sm.metrics.SupportForStores.Update(int64(sm.supporterStateHandler.getNumSupportFor())) for _, response := range responses { - _ = sm.sender.SendAsync(ctx, response) + _ = sm.sender.EnqueueMessage(ctx, response) } log.KvExec.VInfof(ctx, 2, "sent %d heartbeat responses", len(responses)) } diff --git a/pkg/kv/kvserver/storeliveness/testutils.go b/pkg/kv/kvserver/storeliveness/testutils.go index 5b989e9249f1..ca071bbdac46 100644 --- a/pkg/kv/kvserver/storeliveness/testutils.go +++ b/pkg/kv/kvserver/storeliveness/testutils.go @@ -94,7 +94,7 @@ type testMessageSender struct { messages []slpb.Message } -func (tms *testMessageSender) SendAsync(_ context.Context, msg slpb.Message) (sent bool) { +func (tms *testMessageSender) EnqueueMessage(_ context.Context, msg slpb.Message) (sent bool) { tms.mu.Lock() defer tms.mu.Unlock() tms.messages = append(tms.messages, msg) diff --git a/pkg/kv/kvserver/storeliveness/transport.go b/pkg/kv/kvserver/storeliveness/transport.go index 56064d7be9ed..3bcb33e43741 100644 --- a/pkg/kv/kvserver/storeliveness/transport.go +++ b/pkg/kv/kvserver/storeliveness/transport.go @@ -213,14 +213,14 @@ func (t *Transport) handleMessage(ctx context.Context, msg *slpb.Message) { t.metrics.MessagesReceived.Inc(1) } -// SendAsync implements the MessageSender interface. It sends a message to the +// EnqueueMessage implements the MessageSender interface. It sends a message to the // recipient specified in the request, and returns false if the outgoing queue // is full or the node dialer's circuit breaker has tripped. // // The returned bool may be a false positive but will never be a false negative; // if sent is true the message may or may not actually be sent but if it's false // the message definitely was not sent. -func (t *Transport) SendAsync(ctx context.Context, msg slpb.Message) (enqueued bool) { +func (t *Transport) EnqueueMessage(ctx context.Context, msg slpb.Message) (enqueued bool) { toNodeID := msg.To.NodeID fromNodeID := msg.From.NodeID // If this is a message from one local store to another local store, do not @@ -288,7 +288,7 @@ func (t *Transport) startProcessNewQueue( cleanup := func() { q, ok := t.getQueue(toNodeID) t.queues.Delete(toNodeID) - // Account for all remaining messages in the queue. SendAsync may be + // Account for all remaining messages in the queue. EnqueueMessage may be // writing to the queue concurrently, so it's possible that we won't // account for a few messages below. if ok { diff --git a/pkg/kv/kvserver/storeliveness/transport_test.go b/pkg/kv/kvserver/storeliveness/transport_test.go index ec3a939142f8..7cda2afc1739 100644 --- a/pkg/kv/kvserver/storeliveness/transport_test.go +++ b/pkg/kv/kvserver/storeliveness/transport_test.go @@ -66,7 +66,7 @@ type clockWithManualSource struct { // transportTester contains objects needed to test the Store Liveness Transport. // Typical usage will add multiple nodes with AddNode, add multiple stores with -// AddStore, and send messages with SendAsync. +// AddStore, and send messages with EnqueueMessage. type transportTester struct { t testing.TB st *cluster.Settings @@ -199,7 +199,7 @@ func TestTransportSendAndReceive(t *testing.T) { // Send messages between each pair of stores. for _, from := range stores { for _, to := range stores { - tt.transports[from.NodeID].SendAsync(ctx, makeMsg(from, to)) + tt.transports[from.NodeID].EnqueueMessage(ctx, makeMsg(from, to)) } } @@ -261,7 +261,7 @@ func TestTransportRestartedNode(t *testing.T) { checkEnqueued := func(expectedEnqueued bool) { testutils.SucceedsSoon( t, func() error { - enqueued := tt.transports[sender.NodeID].SendAsync(ctx, msg) + enqueued := tt.transports[sender.NodeID].EnqueueMessage(ctx, msg) if enqueued != expectedEnqueued { return errors.Newf("enqueue success is still %v", enqueued) } @@ -274,7 +274,7 @@ func TestTransportRestartedNode(t *testing.T) { initialSent := tt.transports[sender.NodeID].metrics.MessagesSent.Count() testutils.SucceedsSoon( t, func() error { - tt.transports[sender.NodeID].SendAsync(ctx, msg) + tt.transports[sender.NodeID].EnqueueMessage(ctx, msg) sent := tt.transports[sender.NodeID].metrics.MessagesSent.Count() if initialSent >= sent { return errors.Newf("message not sent yet; initial %d, current %d", initialSent, sent) @@ -288,7 +288,7 @@ func TestTransportRestartedNode(t *testing.T) { initialDropped := tt.transports[sender.NodeID].metrics.MessagesSendDropped.Count() testutils.SucceedsSoon( t, func() error { - tt.transports[sender.NodeID].SendAsync(ctx, msg) + tt.transports[sender.NodeID].EnqueueMessage(ctx, msg) dropped := tt.transports[sender.NodeID].metrics.MessagesSendDropped.Count() if initialDropped >= dropped { return errors.Newf( @@ -309,9 +309,9 @@ func TestTransportRestartedNode(t *testing.T) { return nil default: // To ensure messages start getting delivered, keep sending messages - // out. Even after SendAsync returns true, messages may still not be + // out. Even after EnqueueMessage returns true, messages may still not be // delivered (e.g. if the receiver node is not up yet). - tt.transports[sender.NodeID].SendAsync(ctx, msg) + tt.transports[sender.NodeID].EnqueueMessage(ctx, msg) } return errors.New("still waiting to receive message") }, @@ -322,7 +322,7 @@ func TestTransportRestartedNode(t *testing.T) { // The message is sent out successfully. checkEnqueued(true /* expectedEnqueued */) // The message sent as part of checkSend above will likely be dropped it's - // also possible that the SendAsync races with the deletion of the send queue + // also possible that the EnqueueMessage races with the deletion of the send queue // (due to the failed node dial), in which case a dropped message will not be // recorded. checkDropped() @@ -338,7 +338,7 @@ func TestTransportRestartedNode(t *testing.T) { // fails after the circuit breaker kicks in. receiverStopper.Stop(context.Background()) checkEnqueued(false /* expectedEnqueued */) - // Subsequent calls to SendAsync are expected to result in messages being + // Subsequent calls to EnqueueMessage are expected to result in messages being // dropped due to the tripped circuit breaker. checkDropped() @@ -380,8 +380,8 @@ func TestTransportSendToMissingStore(t *testing.T) { // Send the message to the missing store first to ensure it doesn't affect the // receipt of the message to the existing store. - require.True(t, tt.transports[sender.NodeID].SendAsync(ctx, missingMsg)) - require.True(t, tt.transports[sender.NodeID].SendAsync(ctx, existingMsg)) + require.True(t, tt.transports[sender.NodeID].EnqueueMessage(ctx, missingMsg)) + require.True(t, tt.transports[sender.NodeID].EnqueueMessage(ctx, existingMsg)) // Wait for the message to the existing store to be received. testutils.SucceedsSoon( @@ -438,7 +438,7 @@ func TestTransportClockPropagation(t *testing.T) { // Send a message from the sender to the receiver. msg := slpb.Message{Type: slpb.MsgHeartbeat, From: sender, To: receiver} - require.True(t, tt.transports[sender.NodeID].SendAsync(ctx, msg)) + require.True(t, tt.transports[sender.NodeID].EnqueueMessage(ctx, msg)) // Wait for the message to be received. testutils.SucceedsSoon( @@ -480,12 +480,12 @@ func TestTransportShortCircuit(t *testing.T) { handler := tt.AddStore(store2) tt.AddStore(store3) - // Reach in and set node 1's dialer to nil. If SendAsync attempts to dial a + // Reach in and set node 1's dialer to nil. If EnqueueMessage attempts to dial a // node, it will panic. tt.transports[node1].dialer = nil // Send messages between two stores on the same node. - tt.transports[store1.NodeID].SendAsync( + tt.transports[store1.NodeID].EnqueueMessage( ctx, slpb.Message{Type: slpb.MsgHeartbeat, From: store1, To: store2}, ) // The message is received. @@ -506,7 +506,7 @@ func TestTransportShortCircuit(t *testing.T) { // we expect a panic. require.Panics( t, func() { - tt.transports[store1.NodeID].SendAsync( + tt.transports[store1.NodeID].EnqueueMessage( ctx, slpb.Message{Type: slpb.MsgHeartbeat, From: store1, To: store3}, ) }, "sending message to a remote store with a nil dialer", @@ -536,7 +536,7 @@ func TestTransportIdleSendQueue(t *testing.T) { } // Send and receive a message. - require.True(t, tt.transports[sender.NodeID].SendAsync(ctx, msg)) + require.True(t, tt.transports[sender.NodeID].EnqueueMessage(ctx, msg)) testutils.SucceedsSoon( t, func() error { select { @@ -585,7 +585,7 @@ func TestTransportFullReceiveQueue(t *testing.T) { testutils.SucceedsSoon( t, func() error { // The message enqueue can fail temporarily if the sender queue fills up. - if !tt.transports[sender.NodeID].SendAsync(ctx, msg) { + if !tt.transports[sender.NodeID].EnqueueMessage(ctx, msg) { sendDropped++ return errors.New("still waiting to enqueue message") } @@ -614,7 +614,7 @@ func TestTransportFullReceiveQueue(t *testing.T) { }, ) // The receiver queue is full but the enqueue to the sender queue succeeds. - require.True(t, tt.transports[sender.NodeID].SendAsync(ctx, msg)) + require.True(t, tt.transports[sender.NodeID].EnqueueMessage(ctx, msg)) testutils.SucceedsSoon( t, func() error { if tt.transports[receiver.NodeID].metrics.MessagesReceiveDropped.Count() != int64(1) { From 131a3d916073c4d15fb8962c564f72c593161f6c Mon Sep 17 00:00:00 2001 From: Swapneeth Gorantla Date: Tue, 28 Oct 2025 10:01:33 -0400 Subject: [PATCH 2/2] storeliveness: smear storeliveness heartbeats via coordinator Previously, heartbeat messages were sent immediately when enqueued via `EnqueueMessage`. In large clusters, many stores might all send heartbeats simultaneously at the same tick interval, causing a spike in runnable goroutines that caused issues elsewhere in the database process. This patch introduces a heartbeat coordinator that batches and smears heartbeat sends over a configurable duration using a taskpacer. The coordinator is enabled by default via the `kv.store_liveness.use_heartbeat_coordinator` cluster setting (defaults to true), and can be configured via `kv.store_liveness.heartbeat_coordinator.refresh` (default: 10ms) and `kv.store_liveness.heartbeat_coordinator.smear` (default: 1ms) settings. When enabled, messages are enqueued but not sent immediately. Instead, they wait in per-node queues until `SendAllEnqueuedMessages()` is called, which signals the coordinator. The coordinator waits briefly (`batchDuration`) to collect messages from multiple stores, then paces the signaling to each queue's `processQueue` goroutine using the pacer to spread sends over time. Fixes: #148210 Release note: None --- pkg/kv/kvserver/store_test.go | 2 +- pkg/kv/kvserver/storeliveness/BUILD.bazel | 1 + .../kvserver/storeliveness/support_manager.go | 7 + pkg/kv/kvserver/storeliveness/testutils.go | 4 + pkg/kv/kvserver/storeliveness/transport.go | 249 ++++++++++-- .../kvserver/storeliveness/transport_test.go | 377 ++++++++++++++++-- pkg/server/server.go | 2 +- .../localtestcluster/local_test_cluster.go | 2 +- 8 files changed, 583 insertions(+), 61 deletions(-) diff --git a/pkg/kv/kvserver/store_test.go b/pkg/kv/kvserver/store_test.go index 59051f8a2161..ea9ee3bccfa5 100644 --- a/pkg/kv/kvserver/store_test.go +++ b/pkg/kv/kvserver/store_test.go @@ -248,7 +248,7 @@ func createTestStoreWithoutStart( supportGracePeriod := rpcContext.StoreLivenessWithdrawalGracePeriod() options := storeliveness.NewOptions(livenessInterval, heartbeatInterval, supportGracePeriod) transport, err := storeliveness.NewTransport( - cfg.AmbientCtx, stopper, cfg.Clock, cfg.NodeDialer, grpcServer, drpcServer, nil, /* knobs */ + cfg.AmbientCtx, stopper, cfg.Clock, cfg.NodeDialer, grpcServer, drpcServer, cfg.Settings, nil, /* knobs */ ) require.NoError(t, err) knobs := cfg.TestingKnobs.StoreLivenessKnobs diff --git a/pkg/kv/kvserver/storeliveness/BUILD.bazel b/pkg/kv/kvserver/storeliveness/BUILD.bazel index 93b8584cc348..99e0d7f15820 100644 --- a/pkg/kv/kvserver/storeliveness/BUILD.bazel +++ b/pkg/kv/kvserver/storeliveness/BUILD.bazel @@ -34,6 +34,7 @@ go_library( "//pkg/util/protoutil", "//pkg/util/stop", "//pkg/util/syncutil", + "//pkg/util/taskpacer", "//pkg/util/timeutil", "@com_github_cockroachdb_errors//:errors", "@com_github_cockroachdb_redact//:redact", diff --git a/pkg/kv/kvserver/storeliveness/support_manager.go b/pkg/kv/kvserver/storeliveness/support_manager.go index a1932b08d483..c5ec73c2e040 100644 --- a/pkg/kv/kvserver/storeliveness/support_manager.go +++ b/pkg/kv/kvserver/storeliveness/support_manager.go @@ -35,6 +35,7 @@ var Enabled = settings.RegisterBoolSetting( // sent. Transport is the production implementation of MessageSender. type MessageSender interface { EnqueueMessage(ctx context.Context, msg slpb.Message) (sent bool) + SendAllEnqueuedMessages(ctx context.Context) } // SupportManager orchestrates requesting and providing Store Liveness support. @@ -329,6 +330,9 @@ func (sm *SupportManager) sendHeartbeats(ctx context.Context) { log.KvExec.Warningf(ctx, "failed to send heartbeat to store %+v", msg.To) } } + if HeartbeatSmearingEnabled.Get(&sm.settings.SV) { + sm.sender.SendAllEnqueuedMessages(ctx) + } sm.metrics.HeartbeatSuccesses.Inc(int64(successes)) sm.metrics.HeartbeatFailures.Inc(int64(len(heartbeats) - successes)) log.KvExec.VInfof(ctx, 2, "sent heartbeats to %d stores", successes) @@ -426,6 +430,9 @@ func (sm *SupportManager) handleMessages(ctx context.Context, msgs []*slpb.Messa for _, response := range responses { _ = sm.sender.EnqueueMessage(ctx, response) } + if HeartbeatSmearingEnabled.Get(&sm.settings.SV) { + sm.sender.SendAllEnqueuedMessages(ctx) + } log.KvExec.VInfof(ctx, 2, "sent %d heartbeat responses", len(responses)) } diff --git a/pkg/kv/kvserver/storeliveness/testutils.go b/pkg/kv/kvserver/storeliveness/testutils.go index ca071bbdac46..1de36ed2e92d 100644 --- a/pkg/kv/kvserver/storeliveness/testutils.go +++ b/pkg/kv/kvserver/storeliveness/testutils.go @@ -101,6 +101,10 @@ func (tms *testMessageSender) EnqueueMessage(_ context.Context, msg slpb.Message return true } +func (tms *testMessageSender) SendAllEnqueuedMessages(_ context.Context) { + // No-op for testing +} + func (tms *testMessageSender) drainSentMessages() []slpb.Message { tms.mu.Lock() defer tms.mu.Unlock() diff --git a/pkg/kv/kvserver/storeliveness/transport.go b/pkg/kv/kvserver/storeliveness/transport.go index 3bcb33e43741..25e29a0747a1 100644 --- a/pkg/kv/kvserver/storeliveness/transport.go +++ b/pkg/kv/kvserver/storeliveness/transport.go @@ -14,10 +14,13 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/rpc/nodedialer" "github.com/cockroachdb/cockroach/pkg/rpc/rpcbase" + "github.com/cockroachdb/cockroach/pkg/settings" + clustersettings "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/cockroach/pkg/util/taskpacer" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" "google.golang.org/grpc" @@ -40,6 +43,39 @@ const ( connClass = rpcbase.SystemClass ) +// TODO(dodeca12): Currently this complexity allows the fallback to immediate +// heartbeat sends. Once the smearing has been battle-tested, remove this and +// default to using the smeared heartbeat sends approach (no more fallback). +var HeartbeatSmearingEnabled = settings.RegisterBoolSetting( + settings.SystemOnly, + "kv.store_liveness.heartbeat_smearing.enabled", + "if enabled, heartbeat sends are smeared across a certain duration, "+ + "via the transport goroutine, "+ + "otherwise heartbeat sends are sent when they are enqueued "+ + "at the sendQueue, bypassing heartbeat smearing", + + true, +) + +var HeartbeatSmearingRefreshInterval = settings.RegisterDurationSetting( + settings.SystemOnly, + "kv.store_liveness.heartbeat_coordinator.refresh", + "the total time window within which all queued heartbeat messages should be "+ + "sent; the pacer distributes the work across this duration to complete "+ + "by the deadline", + 10*time.Millisecond, + settings.PositiveDuration, +) + +var HeartbeatSmearingInterval = settings.RegisterDurationSetting( + settings.SystemOnly, + "kv.store_liveness.heartbeat_coordinator.smear", + "the interval between sending successive batches of heartbeats across all "+ + "send queues", + 1*time.Millisecond, + settings.PositiveDuration, +) + var logQueueFullEvery = log.Every(1 * time.Second) // MessageHandler is the interface that must be implemented by @@ -54,6 +90,14 @@ type MessageHandler interface { // sendQueue is a queue of outgoing Messages. type sendQueue struct { + // sendMessages is signaled by the transport'smearing sender goroutine + // to tell processQueue to send messages (smearing mechanism). + sendMessages chan struct{} + // directSend is signaled when a message is enqueued (while smearing is + // disabled) to tell processQueue to send immediately, bypassing the + // smearing sender goroutine. + directSend chan struct{} + // messages is a channel of outgoing Messages. messages chan slpb.Message } @@ -67,22 +111,49 @@ type sendQueue struct { // delivering them asynchronously. type Transport struct { log.AmbientContext - stopper *stop.Stopper - clock *hlc.Clock - dialer *nodedialer.Dialer - metrics *TransportMetrics + stopper *stop.Stopper + clock *hlc.Clock + dialer *nodedialer.Dialer + metrics *TransportMetrics + settings *clustersettings.Settings // queues stores outgoing message queues keyed by the destination node ID. queues syncutil.Map[roachpb.NodeID, sendQueue] // handlers stores the MessageHandler for each store on the node. handlers syncutil.Map[roachpb.StoreID, MessageHandler] + // sendAllMessages is signaled to instruct the heartbeat smearing sender to + // signal all sendQueues to send their enqueued messages. + sendAllMessages chan struct{} + // TransportKnobs includes all knobs for testing. knobs *TransportKnobs } var _ MessageSender = (*Transport)(nil) +type pacerConfig struct { + settings *clustersettings.Settings +} + +// GetRefresh implements the taskpacer.Config interface. +func (c pacerConfig) GetRefresh() time.Duration { + return HeartbeatSmearingRefreshInterval.Get(&c.settings.SV) +} + +// GetSmear implements the taskpacer.Config interface. +func (c pacerConfig) GetSmear() time.Duration { + return HeartbeatSmearingInterval.Get(&c.settings.SV) +} + +// SendHeartbeatsSmeared returns true if the cluster setting to send heartbeats +// via the heartbeat smearing sender (the goroutine in `transport.go`) is enabled. +// The smearing sender is responsible for smearing the heartbeats over +// a certain duration, so expect heartbeats sends to be paced when enabled. +func (t *Transport) SendHeartbeatsSmeared() bool { + return HeartbeatSmearingEnabled.Get(&t.settings.SV) +} + // NewTransport creates a new Store Liveness Transport. func NewTransport( ambient log.AmbientContext, @@ -91,18 +162,21 @@ func NewTransport( dialer *nodedialer.Dialer, grpcServer *grpc.Server, drpcMux drpc.Mux, + settings *clustersettings.Settings, knobs *TransportKnobs, ) (*Transport, error) { if knobs == nil { knobs = &TransportKnobs{} } t := &Transport{ - AmbientContext: ambient, - stopper: stopper, - clock: clock, - dialer: dialer, - metrics: newTransportMetrics(), - knobs: knobs, + AmbientContext: ambient, + stopper: stopper, + clock: clock, + dialer: dialer, + metrics: newTransportMetrics(), + settings: settings, + sendAllMessages: make(chan struct{}, 1), + knobs: knobs, } if grpcServer != nil { slpb.RegisterStoreLivenessServer(grpcServer, t) @@ -112,6 +186,90 @@ func NewTransport( return nil, err } } + + // Start background goroutine to act as the transport smearing sender. + // It is responsible for smearing the heartbeat sends across a certain duration. + if err := stopper.RunAsyncTask( + context.Background(), "storeliveness transport smearing sender", + func(ctx context.Context) { + var batchTimer timeutil.Timer + defer batchTimer.Stop() + + conf := pacerConfig{settings: settings} + pacer := taskpacer.New(conf) + + // This will hold the channels we need to signal to send messages. + toSignal := make([]chan struct{}, 0) + + for { + select { + case <-stopper.ShouldQuiesce(): + return + case <-t.sendAllMessages: + // We received a signal to send all messages. + // Wait for a short duration to give other stores a chance to + // enqueue messages which will increase batching opportunities. + batchTimer.Reset(batchDuration) + for done := false; !done; { + select { + case <-t.sendAllMessages: + // Consume any additional signals to send all messages. + + case <-batchTimer.C: + // We have waited to batch messages. + done = true + } + } + + // Collect all sendQueues. + t.queues.Range(func(nodeID roachpb.NodeID, q *sendQueue) bool { + // This check is vital to avoid unnecessary signaling + // on empty queues - otherwise, especially on + // heartbeat responses, we'd wake up N processesQueue + // goroutines where N is the number of destination + // nodes. + if len(q.messages) == 0 { // no messages to send + return true + } + toSignal = append(toSignal, q.sendMessages) + return true + }) + + // There is a benign race condition where a new message may be + // enqueued to a new queue after we collect the toSignal channels. + // In this case, t.sendAllMessages will be set again, and we will + // pick it up in the next iteration of the for loop. + + // Pace the signalling of the channels. + pacer.StartTask(timeutil.Now()) + workLeft := len(toSignal) + for workLeft > 0 { + todo, by := pacer.Pace(timeutil.Now(), workLeft) + + // Pop todo items off the toSignal slice and signal them. + for i := 0; i < todo && workLeft > 0; i++ { + ch := toSignal[len(toSignal)-1] + toSignal = toSignal[:len(toSignal)-1] + select { + case ch <- struct{}{}: + default: + } + workLeft-- + } + + if workLeft > 0 { + if wait := timeutil.Until(by); wait > 0 { + time.Sleep(wait) + } + } + } + } + } + }, + ); err != nil { + return nil, err + } + return t, nil } @@ -213,13 +371,13 @@ func (t *Transport) handleMessage(ctx context.Context, msg *slpb.Message) { t.metrics.MessagesReceived.Inc(1) } -// EnqueueMessage implements the MessageSender interface. It sends a message to the -// recipient specified in the request, and returns false if the outgoing queue -// is full or the node dialer's circuit breaker has tripped. +// EnqueueMessage implements the MessageSender interface. It enqueues a message +// to the sendQueue for the recipient node, and returns false if the outgoing +// queue is full or the node dialer's circuit breaker has tripped. // // The returned bool may be a false positive but will never be a false negative; -// if sent is true the message may or may not actually be sent but if it's false -// the message definitely was not sent. +// if enqueued is true the message may or may not actually be sent but if it's +// false the message definitely was not enqueued. func (t *Transport) EnqueueMessage(ctx context.Context, msg slpb.Message) (enqueued bool) { toNodeID := msg.To.NodeID fromNodeID := msg.From.NodeID @@ -252,6 +410,13 @@ func (t *Transport) EnqueueMessage(ctx context.Context, msg slpb.Message) (enque case q.messages <- msg: t.metrics.SendQueueSize.Inc(1) t.metrics.SendQueueBytes.Inc(int64(msg.Size())) + // Signal the processQueue goroutine if in direct mode (smearing disabled). + if !t.SendHeartbeatsSmeared() { + select { + case q.directSend <- struct{}{}: + default: + } + } return true default: if logQueueFullEvery.ShouldLog() { @@ -265,12 +430,24 @@ func (t *Transport) EnqueueMessage(ctx context.Context, msg slpb.Message) (enque } } +// SendAllEnqueuedMessages signals all queues to send all their messages. +func (t *Transport) SendAllEnqueuedMessages(ctx context.Context) { + select { + case t.sendAllMessages <- struct{}{}: + default: + } +} + // getQueue returns the queue for the specified node ID and a boolean // indicating whether the queue already exists (true) or was created (false). func (t *Transport) getQueue(nodeID roachpb.NodeID) (*sendQueue, bool) { queue, ok := t.queues.Load(nodeID) if !ok { - q := sendQueue{messages: make(chan slpb.Message, sendBufferSize)} + q := sendQueue{ + sendMessages: make(chan struct{}, 1), + directSend: make(chan struct{}, 1), + messages: make(chan slpb.Message, sendBufferSize), + } queue, ok = t.queues.LoadOrStore(nodeID, &q) } return queue, ok @@ -365,6 +542,7 @@ func (t *Transport) processQueue( var batchTimer timeutil.Timer defer batchTimer.Stop() batch := &slpb.MessageBatch{} + for { idleTimer.Reset(getIdleTimeout()) select { @@ -375,35 +553,36 @@ func (t *Transport) processQueue( t.metrics.SendQueueIdle.Inc(1) return nil - case msg := <-q.messages: - batch.Messages = append(batch.Messages, msg) - t.metrics.SendQueueSize.Dec(1) - t.metrics.SendQueueBytes.Dec(int64(msg.Size())) + case <-q.sendMessages: + // Signal received - proceed to batching logic below. - // Pull off as many queued requests as possible within batchDuration. - batchTimer.Reset(batchDuration) - for done := false; !done; { - select { - case msg = <-q.messages: - batch.Messages = append(batch.Messages, msg) - t.metrics.SendQueueSize.Dec(1) - t.metrics.SendQueueBytes.Dec(int64(msg.Size())) - case <-batchTimer.C: - done = true - } + case <-q.directSend: + // Signal received - proceed to batching logic below. + } + + // Common batching logic: pull off as many queued messages as possible + // within batchDuration, then send the batch. + batchTimer.Reset(batchDuration) + for done := false; !done; { + select { + case msg := <-q.messages: + batch.Messages = append(batch.Messages, msg) + t.metrics.SendQueueSize.Dec(1) + t.metrics.SendQueueBytes.Dec(int64(msg.Size())) + case <-batchTimer.C: + done = true } + } + if len(batch.Messages) > 0 { batch.Now = t.clock.NowAsClockTimestamp() if err = stream.Send(batch); err != nil { t.metrics.MessagesSendDropped.Inc(int64(len(batch.Messages))) return err } - t.metrics.BatchesSent.Inc(1) t.metrics.MessagesSent.Inc(int64(len(batch.Messages))) - - // Reuse the Messages slice, but zero out the contents to avoid delaying - // GC of memory referenced from within. + // Reuse the Messages slice, but zero out the contents to avoid delaying GC. for i := range batch.Messages { batch.Messages[i] = slpb.Message{} } diff --git a/pkg/kv/kvserver/storeliveness/transport_test.go b/pkg/kv/kvserver/storeliveness/transport_test.go index 7cda2afc1739..020785b77b82 100644 --- a/pkg/kv/kvserver/storeliveness/transport_test.go +++ b/pkg/kv/kvserver/storeliveness/transport_test.go @@ -9,6 +9,7 @@ import ( "context" "math/rand" "net" + "sort" "testing" "time" @@ -19,6 +20,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/rpc/nodedialer" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" @@ -26,6 +28,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/metric" "github.com/cockroachdb/cockroach/pkg/util/netutil" "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" "github.com/stretchr/testify/require" ) @@ -34,21 +37,27 @@ import ( var maxDelay = 10 * time.Millisecond // testMessageHandler stores all received messages in a channel. +type receivedMessage struct { + msg *slpb.Message + receivedAt time.Time +} + type testMessageHandler struct { - messages chan *slpb.Message + messages chan receivedMessage } func newMessageHandler(size int) testMessageHandler { return testMessageHandler{ - messages: make(chan *slpb.Message, size), + messages: make(chan receivedMessage, size), } } func (tmh *testMessageHandler) HandleMessage(msg *slpb.Message) error { // Simulate a message handling delay. time.Sleep(time.Duration(rand.Int63n(int64(maxDelay)))) + received := receivedMessage{msg: msg, receivedAt: timeutil.Now()} select { - case tmh.messages <- msg: + case tmh.messages <- received: return nil default: return receiveQueueSizeLimitReachedErr @@ -128,7 +137,8 @@ func (tt *transportTester) AddNodeWithoutGossip( nodedialer.New(tt.nodeRPCContext, gossip.AddressResolver(tt.gossip)), grpcServer, drpcServer, - nil, /* knobs */ + tt.st, /* settings */ + nil, /* knobs */ ) require.NoError(tt.t, err) tt.transports[nodeID] = transport @@ -176,8 +186,8 @@ func TestTransportSendAndReceive(t *testing.T) { tt := newTransportTester(t, cluster.MakeTestingClusterSettings()) defer tt.Stop() - // Node 1: stores 1, 2 - // Node 2: stores 3, 4 + // Node 1: stores 1, 2. + // Node 2: stores 3, 4. node1, node2 := roachpb.NodeID(1), roachpb.NodeID(2) store1 := slpb.StoreIdent{NodeID: node1, StoreID: roachpb.StoreID(1)} store2 := slpb.StoreIdent{NodeID: node1, StoreID: roachpb.StoreID(2)} @@ -203,6 +213,10 @@ func TestTransportSendAndReceive(t *testing.T) { } } + for _, from := range stores { + tt.transports[from.NodeID].SendAllEnqueuedMessages(ctx) + } + // Assert that each store received messages from all other stores. for recipient, handler := range handlers { var senders []slpb.StoreIdent @@ -210,9 +224,9 @@ func TestTransportSendAndReceive(t *testing.T) { testutils.SucceedsSoon( t, func() error { select { - case msg := <-handler.messages: - senders = append(senders, msg.From) - require.Equal(t, recipient, msg.To) + case received := <-handler.messages: + senders = append(senders, received.msg.From) + require.Equal(t, recipient, received.msg.To) return nil default: } @@ -275,6 +289,7 @@ func TestTransportRestartedNode(t *testing.T) { testutils.SucceedsSoon( t, func() error { tt.transports[sender.NodeID].EnqueueMessage(ctx, msg) + tt.transports[sender.NodeID].SendAllEnqueuedMessages(ctx) sent := tt.transports[sender.NodeID].metrics.MessagesSent.Count() if initialSent >= sent { return errors.Newf("message not sent yet; initial %d, current %d", initialSent, sent) @@ -289,6 +304,7 @@ func TestTransportRestartedNode(t *testing.T) { testutils.SucceedsSoon( t, func() error { tt.transports[sender.NodeID].EnqueueMessage(ctx, msg) + tt.transports[sender.NodeID].SendAllEnqueuedMessages(ctx) dropped := tt.transports[sender.NodeID].metrics.MessagesSendDropped.Count() if initialDropped >= dropped { return errors.Newf( @@ -305,13 +321,14 @@ func TestTransportRestartedNode(t *testing.T) { t, func() error { select { case received := <-handler.messages: - require.Equal(t, msg, *received) + require.Equal(t, msg, *received.msg) return nil default: // To ensure messages start getting delivered, keep sending messages // out. Even after EnqueueMessage returns true, messages may still not be // delivered (e.g. if the receiver node is not up yet). tt.transports[sender.NodeID].EnqueueMessage(ctx, msg) + tt.transports[sender.NodeID].SendAllEnqueuedMessages(ctx) } return errors.New("still waiting to receive message") }, @@ -382,13 +399,14 @@ func TestTransportSendToMissingStore(t *testing.T) { // receipt of the message to the existing store. require.True(t, tt.transports[sender.NodeID].EnqueueMessage(ctx, missingMsg)) require.True(t, tt.transports[sender.NodeID].EnqueueMessage(ctx, existingMsg)) + tt.transports[sender.NodeID].SendAllEnqueuedMessages(ctx) // Wait for the message to the existing store to be received. testutils.SucceedsSoon( t, func() error { select { case received := <-handler.messages: - require.Equal(t, existingMsg, *received) + require.Equal(t, existingMsg, *received.msg) require.Equal( t, int64(1), tt.transports[existingRcv.NodeID].metrics.MessagesReceived.Count(), ) @@ -439,13 +457,14 @@ func TestTransportClockPropagation(t *testing.T) { // Send a message from the sender to the receiver. msg := slpb.Message{Type: slpb.MsgHeartbeat, From: sender, To: receiver} require.True(t, tt.transports[sender.NodeID].EnqueueMessage(ctx, msg)) + tt.transports[sender.NodeID].SendAllEnqueuedMessages(ctx) // Wait for the message to be received. testutils.SucceedsSoon( t, func() error { select { case received := <-handler.messages: - require.Equal(t, msg, *received) + require.Equal(t, msg, *received.msg) return nil default: } @@ -467,8 +486,8 @@ func TestTransportShortCircuit(t *testing.T) { tt := newTransportTester(t, cluster.MakeTestingClusterSettings()) defer tt.Stop() - // Node 1: stores 1, 2 - // Node 2: store 3 + // Node 1: stores 1, 2. + // Node 2: store 3. node1, node2 := roachpb.NodeID(1), roachpb.NodeID(2) store1 := slpb.StoreIdent{NodeID: node1, StoreID: roachpb.StoreID(1)} store2 := slpb.StoreIdent{NodeID: node1, StoreID: roachpb.StoreID(2)} @@ -488,13 +507,14 @@ func TestTransportShortCircuit(t *testing.T) { tt.transports[store1.NodeID].EnqueueMessage( ctx, slpb.Message{Type: slpb.MsgHeartbeat, From: store1, To: store2}, ) + tt.transports[store1.NodeID].SendAllEnqueuedMessages(ctx) // The message is received. testutils.SucceedsSoon( t, func() error { select { - case msg := <-handler.messages: - require.Equal(t, store1, msg.From) - require.Equal(t, store2, msg.To) + case received := <-handler.messages: + require.Equal(t, store1, received.msg.From) + require.Equal(t, store2, received.msg.To) return nil default: } @@ -509,6 +529,7 @@ func TestTransportShortCircuit(t *testing.T) { tt.transports[store1.NodeID].EnqueueMessage( ctx, slpb.Message{Type: slpb.MsgHeartbeat, From: store1, To: store3}, ) + tt.transports[store1.NodeID].SendAllEnqueuedMessages(ctx) }, "sending message to a remote store with a nil dialer", ) } @@ -532,16 +553,19 @@ func TestTransportIdleSendQueue(t *testing.T) { handler := tt.AddStore(receiver) tt.transports[sender.NodeID].knobs.OverrideIdleTimeout = func() time.Duration { - return time.Millisecond + // Set the idle timeout larger than the batch wait. Otherwise, we won't + // be able to send any message. + return 100 * time.Millisecond } // Send and receive a message. require.True(t, tt.transports[sender.NodeID].EnqueueMessage(ctx, msg)) + tt.transports[sender.NodeID].SendAllEnqueuedMessages(ctx) testutils.SucceedsSoon( t, func() error { select { case received := <-handler.messages: - require.Equal(t, msg, *received) + require.Equal(t, msg, *received.msg) return nil default: } @@ -589,17 +613,20 @@ func TestTransportFullReceiveQueue(t *testing.T) { sendDropped++ return errors.New("still waiting to enqueue message") } + tt.transports[sender.NodeID].SendAllEnqueuedMessages(ctx) return nil }, ) } require.Equal( - t, int64(sendDropped), tt.transports[sender.NodeID].metrics.MessagesSendDropped.Count(), + t, int64(sendDropped), + tt.transports[sender.NodeID].metrics.MessagesSendDropped.Count(), ) testutils.SucceedsSoon( t, func() error { - if tt.transports[sender.NodeID].metrics.MessagesSent.Count() != int64(tt.maxHandlerSize) { + if tt.transports[sender.NodeID].metrics.MessagesSent.Count() != + int64(tt.maxHandlerSize) { return errors.New("not all messages are sent yet") } return nil @@ -607,7 +634,8 @@ func TestTransportFullReceiveQueue(t *testing.T) { ) testutils.SucceedsSoon( t, func() error { - if tt.transports[receiver.NodeID].metrics.MessagesReceived.Count() != int64(tt.maxHandlerSize) { + if tt.transports[receiver.NodeID].metrics.MessagesReceived.Count() != + int64(tt.maxHandlerSize) { return errors.New("not all messages are received yet") } return nil @@ -615,12 +643,315 @@ func TestTransportFullReceiveQueue(t *testing.T) { ) // The receiver queue is full but the enqueue to the sender queue succeeds. require.True(t, tt.transports[sender.NodeID].EnqueueMessage(ctx, msg)) + tt.transports[sender.NodeID].SendAllEnqueuedMessages(ctx) testutils.SucceedsSoon( t, func() error { - if tt.transports[receiver.NodeID].metrics.MessagesReceiveDropped.Count() != int64(1) { + if tt.transports[receiver.NodeID].metrics.MessagesReceiveDropped.Count() != + int64(1) { return errors.New("message not dropped yet") } return nil }, ) } + +// TestTransportHeartbeatSmearingBatching verifies that heartbeat smearing +// batches messages from multiple queues before signalling them to send. +func TestTransportHeartbeatSmearingBatching(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + ctx := context.Background() + tt := newTransportTester(t, cluster.MakeTestingClusterSettings()) + defer tt.Stop() + + node1 := roachpb.NodeID(1) + node2 := roachpb.NodeID(2) + node3 := roachpb.NodeID(3) + + node1Sender := slpb.StoreIdent{NodeID: node1, StoreID: roachpb.StoreID(1)} + node2Receiver := slpb.StoreIdent{NodeID: node2, StoreID: roachpb.StoreID(2)} + node3Receiver := slpb.StoreIdent{NodeID: node3, StoreID: roachpb.StoreID(3)} + + tt.AddNode(node1) + tt.AddNode(node2) + tt.AddNode(node3) + tt.AddStore(node1Sender) + handler2 := tt.AddStore(node2Receiver) + handler3 := tt.AddStore(node3Receiver) + + // Enqueue messages to multiple destinations. + node2Message := slpb.Message{Type: slpb.MsgHeartbeat, From: node1Sender, To: node2Receiver} + node3Message := slpb.Message{Type: slpb.MsgHeartbeat, From: node1Sender, To: node3Receiver} + + tt.transports[node1Sender.NodeID].EnqueueMessage(ctx, node2Message) + tt.transports[node1Sender.NodeID].EnqueueMessage(ctx, node3Message) + + // Verify messages should NOT be sent yet (no SendAllEnqueuedMessages call). + // Wait briefly to ensure messages don't arrive. + testTimeout := time.NewTimer(20 * time.Millisecond) + defer testTimeout.Stop() + select { + case <-handler2.messages: + require.Fail(t, "message should not have been sent yet") + case <-handler3.messages: + require.Fail(t, "message should not have been sent yet") + case <-testTimeout.C: + // Success - no messages arrived. + } + + // Now trigger SendAllEnqueuedMessages - heartbeat smearing should batch and send. + tt.transports[node1Sender.NodeID].SendAllEnqueuedMessages(ctx) + + // Wait for messages to be received. + testutils.SucceedsSoon( + t, func() error { + if len(handler2.messages) != 1 || len(handler3.messages) != 1 { + return errors.Newf("expected 1 message in each handler, got %d and %d", + len(handler2.messages), len(handler3.messages)) + } + return nil + }, + ) + + // Verify contents were as expected. + received2 := <-handler2.messages + require.Equal(t, node2Message, *received2.msg) + received3 := <-handler3.messages + require.Equal(t, node3Message, *received3.msg) +} + +// TestTransportSmearingLogicalBehaviour verifies that message smearing causes +// messages to arrive in a staggered manner over time, rather than all at once. +// This demonstrates that the smearing mechanism is working to avoid thundering herd. +func TestTransportSmearingLogicalBehaviour(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + skip.UnderDuress(t) + ctx := context.Background() + st := cluster.MakeTestingClusterSettings() + HeartbeatSmearingEnabled.Override(ctx, &st.SV, true) + // Override the smearing settings to ensure that the test is not flaky. + HeartbeatSmearingRefreshInterval.Override(ctx, &st.SV, 250*time.Millisecond) + HeartbeatSmearingInterval.Override(ctx, &st.SV, 50*time.Millisecond) + tt := newTransportTester(t, st) + defer tt.Stop() + + node1 := roachpb.NodeID(1) + sender := slpb.StoreIdent{NodeID: node1, StoreID: roachpb.StoreID(1)} + + numReceivers := 10 + receivers := make([]slpb.StoreIdent, numReceivers) + handlers := make([]testMessageHandler, numReceivers) + + tt.AddNode(node1) + tt.AddStore(sender) + + for i := 0; i < numReceivers; i++ { + nodeID := roachpb.NodeID(i + 2) + receivers[i] = slpb.StoreIdent{NodeID: nodeID, StoreID: roachpb.StoreID(i + 2)} + tt.AddNode(nodeID) + handlers[i] = tt.AddStore(receivers[i]) + } + + // Enqueue messages to all receivers. + for i := 0; i < numReceivers; i++ { + msg := slpb.Message{Type: slpb.MsgHeartbeat, From: sender, To: receivers[i]} + tt.transports[sender.NodeID].EnqueueMessage(ctx, msg) + } + + receiveTimes := make([]time.Time, numReceivers) + start := timeutil.Now() + tt.transports[sender.NodeID].SendAllEnqueuedMessages(ctx) + + // Wait for all messages using SucceedsSoon pattern. + for i := 0; i < numReceivers; i++ { + idx := i + testutils.SucceedsSoon(t, func() error { + select { + case received := <-handlers[idx].messages: + require.NotNil(t, received.msg) + receiveTimes[idx] = received.receivedAt + return nil + default: + return errors.Newf("message to receiver %d not yet delivered", idx) + } + }) + } + + // Verify that not all messages arrived at exactly the same time. + // Sort receive times. + sort.Slice(receiveTimes, func(i, j int) bool { + return receiveTimes[i].Before(receiveTimes[j]) + }) + + // The time between first and last arrival should be larger than the smear interval, + // indicating staggered delivery. + timeSpan := receiveTimes[len(receiveTimes)-1].Sub(receiveTimes[0]) + require.Greater(t, timeSpan, 50*time.Millisecond, + "messages should be smeared across multiple batches") + + // Verify all messages arrived relatively quickly from start. + totalElapsed := receiveTimes[len(receiveTimes)-1].Sub(start) + require.Less(t, totalElapsed, 500*time.Millisecond, + "all messages should be delivered within reasonable time") + + // Disable smearing by setting the pacing refresh interval to 0 and ensure messages + // are delivered in a tight window. + HeartbeatSmearingRefreshInterval.Override(ctx, &st.SV, 0) + + for i := 0; i < numReceivers; i++ { + msg := slpb.Message{Type: slpb.MsgHeartbeat, From: sender, To: receivers[i]} + tt.transports[sender.NodeID].EnqueueMessage(ctx, msg) + } + + receiveTimes2 := make([]time.Time, numReceivers) + tt.transports[sender.NodeID].SendAllEnqueuedMessages(ctx) + + for i := 0; i < numReceivers; i++ { + idx := i + testutils.SucceedsSoon(t, func() error { + select { + case received := <-handlers[idx].messages: + require.NotNil(t, received.msg) + receiveTimes2[idx] = received.receivedAt + return nil + default: + return errors.Newf("message to receiver %d not yet delivered", idx) + } + }) + } + + sort.Slice(receiveTimes2, func(i, j int) bool { + return receiveTimes2[i].Before(receiveTimes2[j]) + }) + + timeSpan2 := receiveTimes2[len(receiveTimes2)-1].Sub(receiveTimes2[0]) + require.Less(t, timeSpan2, 50*time.Millisecond, + "messages should arrive nearly simultaneously when smearing is disabled") +} + +// TestTransportClusterSettingToggle verifies that messages are not lost when +// the kv.store_liveness.heartbeat_smearing.enabled cluster setting is toggled +// during active operation. This is a regression test for a race condition where +// messages could be lost if the setting changed while the processQueue goroutine +// was running. +func TestTransportClusterSettingToggle(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + ctx := context.Background() + st := cluster.MakeTestingClusterSettings() + tt := newTransportTester(t, st) + defer tt.Stop() + + node1, node2 := roachpb.NodeID(1), roachpb.NodeID(2) + sender := slpb.StoreIdent{NodeID: node1, StoreID: roachpb.StoreID(1)} + receiver := slpb.StoreIdent{NodeID: node2, StoreID: roachpb.StoreID(2)} + + tt.AddNode(node1) + tt.AddNode(node2) + tt.AddStore(sender) + handler := tt.AddStore(receiver) + + // Start with heartbeat smearing enabled (messages wait for + // SendAllEnqueuedMessages). + HeartbeatSmearingEnabled.Override(ctx, &st.SV, true) + + // Enqueue a message while heartbeat smearing is enabled. + msg1 := slpb.Message{Type: slpb.MsgHeartbeat, From: sender, To: receiver} + require.True(t, tt.transports[sender.NodeID].EnqueueMessage(ctx, msg1)) + + // Toggle the setting to disabled (direct send mode). + // With both channels always active, processQueue will check the setting + // when processing messages, so messages will be sent directly when + // smearing is disabled. + HeartbeatSmearingEnabled.Override(ctx, &st.SV, false) + + // Enqueue another message after the toggle. This message should be sent + // directly without waiting for SendAllEnqueuedMessages. + msg2 := slpb.Message{Type: slpb.MsgHeartbeat, From: sender, To: receiver} + require.True(t, tt.transports[sender.NodeID].EnqueueMessage(ctx, msg2)) + + // Verify both messages are received (msg1 should switch to direct mode, + // msg2 should be sent directly). No SendAllEnqueuedMessages call needed. + receivedMsgs := 0 + testutils.SucceedsSoon( + t, func() error { + select { + case received := <-handler.messages: + require.NotNil(t, received.msg) + receivedMsgs++ + default: + } + if receivedMsgs < 2 { + return errors.Newf( + "only received %d messages so far, expecting 2", + receivedMsgs) + } + return nil + }, + ) + require.Equal(t, 2, receivedMsgs) + + // Now toggle back to heartbeat smearing mode. + HeartbeatSmearingEnabled.Override(ctx, &st.SV, true) + + // Wait for the queue to be fully drained and processQueue to return to + // waiting state. We verify this by checking that the send queue is empty. + // With both channels always active, processQueue will automatically re-read + // the setting on the next iteration when it processes a message, so no + // dummy message is needed to synchronize. + testutils.SucceedsSoon(t, func() error { + if tt.transports[sender.NodeID].metrics.SendQueueSize.Value() != 0 { + return errors.New("queue not empty yet") + } + return nil + }) + + // Now enqueue msg3 - processQueue should have re-read the setting and be in + // heartbeat smearing mode. + msg3 := slpb.Message{Type: slpb.MsgHeartbeat, From: sender, To: receiver} + require.True(t, tt.transports[sender.NodeID].EnqueueMessage(ctx, msg3)) + + // Verify the message is enqueued but NOT delivered (waiting in smearing mode). + // Use SucceedsSoon to give the system time to stabilize in the correct state. + testutils.SucceedsSoon(t, func() error { + // Message should be in the queue. + if tt.transports[sender.NodeID].metrics.SendQueueSize.Value() != 1 { + return errors.Newf("expected queue size 1, got %d", + tt.transports[sender.NodeID].metrics.SendQueueSize.Value()) + } + // But should NOT have been received yet. + select { + case <-handler.messages: + return errors.New( + "message should not have been sent yet in heartbeat smearing mode") + default: + } + return nil + }) + + // Signal to send messages. + tt.transports[sender.NodeID].SendAllEnqueuedMessages(ctx) + + // Verify the message is received. + testutils.SucceedsSoon( + t, func() error { + select { + case received := <-handler.messages: + require.Equal(t, msg3, *received.msg) + return nil + default: + } + return errors.New("message not received yet") + }, + ) + + // Verify no messages were dropped during any of the toggles. + // Total messages sent: msg1, msg2, msg3 = 3. + require.Equal(t, int64(0), + tt.transports[sender.NodeID].metrics.MessagesSendDropped.Count()) + require.Equal(t, int64(3), + tt.transports[sender.NodeID].metrics.MessagesSent.Count()) + require.Equal(t, int64(3), + tt.transports[receiver.NodeID].metrics.MessagesReceived.Count()) +} diff --git a/pkg/server/server.go b/pkg/server/server.go index 5a5137ca0f4c..87e37090fcd3 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -671,7 +671,7 @@ func NewServer(cfg Config, stopper *stop.Stopper) (serverctl.ServerStartupInterf options := storeliveness.NewOptions(livenessInterval, heartbeatInterval, supportGracePeriod) transport, err := storeliveness.NewTransport( cfg.AmbientCtx, stopper, clock, kvNodeDialer, - grpcServer.Server, drpcServer.DRPCServer, nil, /* knobs */ + grpcServer.Server, drpcServer.DRPCServer, st, nil, /* knobs */ ) if err != nil { return nil, err diff --git a/pkg/testutils/localtestcluster/local_test_cluster.go b/pkg/testutils/localtestcluster/local_test_cluster.go index a89c2be29278..41bdb23fb37e 100644 --- a/pkg/testutils/localtestcluster/local_test_cluster.go +++ b/pkg/testutils/localtestcluster/local_test_cluster.go @@ -219,7 +219,7 @@ func (ltc *LocalTestCluster) Start(t testing.TB, initFactory InitFactoryFn) { options := storeliveness.NewOptions(livenessInterval, heartbeatInterval, supportGracePeriod) transport, err := storeliveness.NewTransport( cfg.AmbientCtx, ltc.stopper, ltc.Clock, - nil /* dialer */, nil /* grpcServer */, nil /* drpcServer */, nil, /* knobs */ + nil /* dialer */, nil /* grpcServer */, nil /* drpcServer */, cfg.Settings, nil, /* knobs */ ) if err != nil { t.Fatal(err)