Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ func createTestStoreWithoutStart(
supportGracePeriod := rpcContext.StoreLivenessWithdrawalGracePeriod()
options := storeliveness.NewOptions(livenessInterval, heartbeatInterval, supportGracePeriod)
transport, err := storeliveness.NewTransport(
cfg.AmbientCtx, stopper, cfg.Clock, cfg.NodeDialer, grpcServer, drpcServer, nil, /* knobs */
cfg.AmbientCtx, stopper, cfg.Clock, cfg.NodeDialer, grpcServer, drpcServer, cfg.Settings, nil, /* knobs */
)
require.NoError(t, err)
knobs := cfg.TestingKnobs.StoreLivenessKnobs
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/storeliveness/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ go_library(
"//pkg/util/protoutil",
"//pkg/util/stop",
"//pkg/util/syncutil",
"//pkg/util/taskpacer",
"//pkg/util/timeutil",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_redact//:redact",
Expand Down
13 changes: 10 additions & 3 deletions pkg/kv/kvserver/storeliveness/support_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ var Enabled = settings.RegisterBoolSetting(
// MessageSender is the interface that defines how Store Liveness messages are
// sent. Transport is the production implementation of MessageSender.
type MessageSender interface {
SendAsync(ctx context.Context, msg slpb.Message) (sent bool)
EnqueueMessage(ctx context.Context, msg slpb.Message) (sent bool)
SendAllEnqueuedMessages(ctx context.Context)
}

// SupportManager orchestrates requesting and providing Store Liveness support.
Expand Down Expand Up @@ -323,12 +324,15 @@ func (sm *SupportManager) sendHeartbeats(ctx context.Context) {
// Send heartbeats to each remote store.
successes := 0
for _, msg := range heartbeats {
if sent := sm.sender.SendAsync(ctx, msg); sent {
if sent := sm.sender.EnqueueMessage(ctx, msg); sent {
successes++
} else {
log.KvExec.Warningf(ctx, "failed to send heartbeat to store %+v", msg.To)
}
}
if HeartbeatSmearingEnabled.Get(&sm.settings.SV) {
sm.sender.SendAllEnqueuedMessages(ctx)
}
sm.metrics.HeartbeatSuccesses.Inc(int64(successes))
sm.metrics.HeartbeatFailures.Inc(int64(len(heartbeats) - successes))
log.KvExec.VInfof(ctx, 2, "sent heartbeats to %d stores", successes)
Expand Down Expand Up @@ -424,7 +428,10 @@ func (sm *SupportManager) handleMessages(ctx context.Context, msgs []*slpb.Messa
sm.metrics.SupportForStores.Update(int64(sm.supporterStateHandler.getNumSupportFor()))

for _, response := range responses {
_ = sm.sender.SendAsync(ctx, response)
_ = sm.sender.EnqueueMessage(ctx, response)
}
if HeartbeatSmearingEnabled.Get(&sm.settings.SV) {
sm.sender.SendAllEnqueuedMessages(ctx)
}
log.KvExec.VInfof(ctx, 2, "sent %d heartbeat responses", len(responses))
}
Expand Down
6 changes: 5 additions & 1 deletion pkg/kv/kvserver/storeliveness/testutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,13 +94,17 @@ type testMessageSender struct {
messages []slpb.Message
}

func (tms *testMessageSender) SendAsync(_ context.Context, msg slpb.Message) (sent bool) {
func (tms *testMessageSender) EnqueueMessage(_ context.Context, msg slpb.Message) (sent bool) {
tms.mu.Lock()
defer tms.mu.Unlock()
tms.messages = append(tms.messages, msg)
return true
}

func (tms *testMessageSender) SendAllEnqueuedMessages(_ context.Context) {
// No-op for testing
}

func (tms *testMessageSender) drainSentMessages() []slpb.Message {
tms.mu.Lock()
defer tms.mu.Unlock()
Expand Down
Loading