Skip to content

Commit 291ab1d

Browse files
committed
storeliveness: smear storeliveness heartbeats via coordinator
Previously, heartbeat messages were sent immediately when enqueued via `EnqueueMessage`. In large clusters, many stores might all send heartbeats simultaneously at the same tick interval, causing a spike in runnable goroutines that caused issues elsewhere in the database process. This patch introduces a heartbeat coordinator that batches and smears heartbeat sends over a configurable duration using a taskpacer. The coordinator is enabled by default via the `kv.store_liveness.use_heartbeat_coordinator` cluster setting (defaults to true), and can be configured via `kv.store_liveness.heartbeat_coordinator.refresh` (default: 10ms) and `kv.store_liveness.heartbeat_coordinator.smear` (default: 1ms) settings. When enabled, messages are enqueued but not sent immediately. Instead, they wait in per-node queues until `SendAllEnqueuedMessages()` is called, which signals the coordinator. The coordinator waits briefly (`batchDuration`) to collect messages from multiple stores, then paces the signaling to each queue's `processQueue` goroutine using the pacer to spread sends over time. Fixes: #148210 Release note: None
1 parent e5666f1 commit 291ab1d

File tree

8 files changed

+583
-54
lines changed

8 files changed

+583
-54
lines changed

pkg/kv/kvserver/store_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -248,7 +248,7 @@ func createTestStoreWithoutStart(
248248
supportGracePeriod := rpcContext.StoreLivenessWithdrawalGracePeriod()
249249
options := storeliveness.NewOptions(livenessInterval, heartbeatInterval, supportGracePeriod)
250250
transport, err := storeliveness.NewTransport(
251-
cfg.AmbientCtx, stopper, cfg.Clock, cfg.NodeDialer, grpcServer, drpcServer, nil, /* knobs */
251+
cfg.AmbientCtx, stopper, cfg.Clock, cfg.NodeDialer, grpcServer, drpcServer, cfg.Settings, nil, /* knobs */
252252
)
253253
require.NoError(t, err)
254254
knobs := cfg.TestingKnobs.StoreLivenessKnobs

pkg/kv/kvserver/storeliveness/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ go_library(
3434
"//pkg/util/protoutil",
3535
"//pkg/util/stop",
3636
"//pkg/util/syncutil",
37+
"//pkg/util/taskpacer",
3738
"//pkg/util/timeutil",
3839
"@com_github_cockroachdb_errors//:errors",
3940
"@com_github_cockroachdb_redact//:redact",

pkg/kv/kvserver/storeliveness/support_manager.go

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,43 @@ var Enabled = settings.RegisterBoolSetting(
3131
true,
3232
)
3333

34+
// TODO(dodeca12): Currently this complexity allows the fallback to immediate
35+
// heartbeat sends. Once the smearing has battle-tested, remove this and
36+
// default to using the smeared heartbeat sends approach (no more fallback).
37+
var UseHeartbeatCoordinatorEnabled = settings.RegisterBoolSetting(
38+
settings.SystemOnly,
39+
"kv.store_liveness.heartbeat_coordinator.enabled",
40+
"if enabled, heartbeat sends are the responsibility of the heartbeat "+
41+
"coordinator. Otherwise, heartbeat sends are sent when they are enqueued "+
42+
"at the sendQueue, bypassing the heartbeat coordinator",
43+
44+
true,
45+
)
46+
47+
var HeartbeatCoordinatorRefresh = settings.RegisterDurationSetting(
48+
settings.SystemOnly,
49+
"kv.store_liveness.heartbeat_coordinator.refresh",
50+
"the total time window within which all queued heartbeat messages should be "+
51+
"sent; the pacer distributes the work across this duration to complete "+
52+
"by the deadline",
53+
10*time.Millisecond,
54+
settings.PositiveDuration,
55+
)
56+
57+
var HeartbeatCoordinatorSmear = settings.RegisterDurationSetting(
58+
settings.SystemOnly,
59+
"kv.store_liveness.heartbeat_coordinator.smear",
60+
"the interval between sending successive batches of heartbeats across all "+
61+
"send queues",
62+
1*time.Millisecond,
63+
settings.PositiveDuration,
64+
)
65+
3466
// MessageSender is the interface that defines how Store Liveness messages are
3567
// sent. Transport is the production implementation of MessageSender.
3668
type MessageSender interface {
3769
EnqueueMessage(ctx context.Context, msg slpb.Message) (sent bool)
70+
SendAllEnqueuedMessages(ctx context.Context)
3871
}
3972

4073
// SupportManager orchestrates requesting and providing Store Liveness support.
@@ -168,6 +201,14 @@ func (sm *SupportManager) SupportFromEnabled(ctx context.Context) bool {
168201
return Enabled.Get(&sm.settings.SV)
169202
}
170203

204+
// SendViaHeartbeatCoordinator returns true if the cluster setting to send heartbeats
205+
// via the heartbeat coordinator (the goroutine in `transport.go`) is enabled.
206+
// The coordinator is responsible for smearing the heartbeats over
207+
// a certain duration, so expect heartbeats sends to be paced when enabled.
208+
func (sm *SupportManager) SendViaHeartbeatCoordinator(ctx context.Context) bool {
209+
return UseHeartbeatCoordinatorEnabled.Get(&sm.settings.SV)
210+
}
211+
171212
// Start starts the main processing goroutine in startLoop as an async task.
172213
func (sm *SupportManager) Start(ctx context.Context) error {
173214
// onRestart is called synchronously before the start of the main loop in
@@ -329,6 +370,9 @@ func (sm *SupportManager) sendHeartbeats(ctx context.Context) {
329370
log.KvExec.Warningf(ctx, "failed to send heartbeat to store %+v", msg.To)
330371
}
331372
}
373+
if sm.SendViaHeartbeatCoordinator(ctx) {
374+
sm.sender.SendAllEnqueuedMessages(ctx)
375+
}
332376
sm.metrics.HeartbeatSuccesses.Inc(int64(successes))
333377
sm.metrics.HeartbeatFailures.Inc(int64(len(heartbeats) - successes))
334378
log.KvExec.VInfof(ctx, 2, "sent heartbeats to %d stores", successes)
@@ -426,6 +470,9 @@ func (sm *SupportManager) handleMessages(ctx context.Context, msgs []*slpb.Messa
426470
for _, response := range responses {
427471
_ = sm.sender.EnqueueMessage(ctx, response)
428472
}
473+
if sm.SendViaHeartbeatCoordinator(ctx) {
474+
sm.sender.SendAllEnqueuedMessages(ctx)
475+
}
429476
log.KvExec.VInfof(ctx, 2, "sent %d heartbeat responses", len(responses))
430477
}
431478

pkg/kv/kvserver/storeliveness/testutils.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,10 @@ func (tms *testMessageSender) EnqueueMessage(_ context.Context, msg slpb.Message
101101
return true
102102
}
103103

104+
func (tms *testMessageSender) SendAllEnqueuedMessages(_ context.Context) {
105+
// No-op for testing
106+
}
107+
104108
func (tms *testMessageSender) drainSentMessages() []slpb.Message {
105109
tms.mu.Lock()
106110
defer tms.mu.Unlock()

0 commit comments

Comments
 (0)