-
Notifications
You must be signed in to change notification settings - Fork 4k
storeliveness: smear storeliveness heartbeat messages to reduce goroutine spikes at heartbeat interval tick #156378
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
storeliveness: smear storeliveness heartbeat messages to reduce goroutine spikes at heartbeat interval tick #156378
Conversation
|
Your pull request contains more than 1000 changes. It is strongly encouraged to split big PRs into smaller chunks. 🦉 Hoot! I am a Blathers, a bot for CockroachDB. My owner is dev-inf. |
e31eb10 to
ee82218
Compare
Potential Bug(s) DetectedThe three-stage Claude Code analysis has identified potential bug(s) in this PR that may warrant investigation. Next Steps: After you review the findings, please tag the issue as follows:
|
ee82218 to
005ce2d
Compare
Potential Bug(s) DetectedThe three-stage Claude Code analysis has identified potential bug(s) in this PR that may warrant investigation. Next Steps: After you review the findings, please tag the issue as follows:
|
005ce2d to
4c635d9
Compare
4c635d9 to
4455b08
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally looks good 🔥 I did a first pass, but I will need to look at the tests one more time.
I guess my only general comment is that supporting both old and new logics seem to come with some decent complexity. Deferring to @miraradeva to decide whether this extra complexity is important, or if we can just move to the new logic and let it bake for sometime.
@iskettaneh reviewed 4 of 4 files at r1, 1 of 8 files at r2.
Reviewable status:complete! 0 of 0 LGTMs obtained (waiting on @miraradeva)
-- commits line 23 at r2:
nit: maybe mention that it used to spike the the runnable goroutines
pkg/kv/kvserver/storeliveness/transport_test.go line 698 at r2 (raw file):
testutils.SucceedsSoon( t, func() error { if len(handler2.messages) == 0 || len(handler3.messages) == 0 {
Can you reverse the logic a bit. Basically, only exist the succeedsSoon if you see the exact expected number of messages sent?
pkg/kv/kvserver/storeliveness/transport_test.go line 825 at r2 (raw file):
// Verify message does NOT arrive immediately (wait briefly to be sure). testTimeout := time.NewTimer(20 * time.Millisecond)
This was tested above in TestTransportCoordinatorBatching, right?
pkg/kv/kvserver/storeliveness/support_manager.go line 39 at r2 (raw file):
"if enabled, heartbeat sends are the responsibility of the heartbeat "+ "coordinator which smears them over a certain duration, otherwise the "+ "heartbeat sends are sent immediately",
nit: otherwise the heartbeat sends are sent when they are enqueued at the sendQueue, bypassing the heartbeat coordinator.
pkg/kv/kvserver/storeliveness/support_manager.go line 47 at r2 (raw file):
"kv.store_liveness.heartbeat_coordinator.refresh", "the refresh interval for the heartbeat coordinator pacer, which determines "+ "how frequently the coordinator checks for messages to send",
I am not sure that the description is accurate here.
I think the refresh is the total duration at which messages are going to be paced over, right?
pkg/kv/kvserver/storeliveness/transport.go line 216 at r1 (raw file):
} // EnqueueMessage implements the MessageSender interface. It sends a message to the
nit: maybe change the description of it to say: it enqueues messages instead of it sends messages.
pkg/kv/kvserver/storeliveness/transport.go line 84 at r2 (raw file):
handlers syncutil.Map[roachpb.StoreID, MessageHandler] // Once signaled, we will signal to all sendQueues to send all messages.
nit: usually the comment that describes what a struct member does starts with the name of that member.
In this case, the comment can start with: // sendAllMessages is signalled...etc
nit: I think this field signals to the heartbeat coordinator to start signalling to the queues, right?
pkg/kv/kvserver/storeliveness/transport.go line 97 at r2 (raw file):
} func (c pacerConfig) GetRefresh() time.Duration {
Nit: add description that says:
// GetRefresh implements the taskpacer.Config interface.
Also applicable to GetSmear below
pkg/kv/kvserver/storeliveness/transport.go line 232 at r2 (raw file):
// When coordinator is disabled, wake up all queues so they can switch // to direct message processing. When coordinator is enabled, we don't // need to signal because messages will wait for SendAllEnqueuedMessages.
Maybe add description of the scenario we are trying to avoid here:
- If UseHeartbeatCoordinatorEnabled was true
- the Select statement inside the processQueue will only look at sendMessages channel, not the actual messages channel
- If we then disable the heartbeat coordinator, the select statement inside sendMessages will get stuck (for up to the duration of idleTimer).
To avoid this, we signal the q.sendMessages here so that the processQueue select statement will move on into the next iteration of the for loop, which it will then detect that UseHeartbeatCoordinatorEnabled is false, and then will adjust the select parameters to wait on the messages channel instead of the q.sendMessages channel.
pkg/kv/kvserver/storeliveness/transport.go line 543 at r2 (raw file):
for {
nit: extra line
pkg/kv/kvserver/storeliveness/transport.go line 548 at r2 (raw file):
// If coordinator is enabled, we don't want to process messages directly. // We'll set the messages channel to nil so the select never triggers on it. var directMessages <-chan slpb.Message
Thoughts on adding a TODO to remove this complexity once we are sure that we don't want the old logic?
4455b08 to
291ab1d
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status:
complete! 0 of 0 LGTMs obtained (waiting on @iskettaneh and @miraradeva)
pkg/kv/kvserver/storeliveness/support_manager.go line 47 at r2 (raw file):
Previously, iskettaneh wrote…
I am not sure that the description is accurate here.
I think the refresh is the total duration at which messages are going to be paced over, right?
Done.
pkg/kv/kvserver/storeliveness/transport.go line 232 at r2 (raw file):
Previously, iskettaneh wrote…
Maybe add description of the scenario we are trying to avoid here:
- If UseHeartbeatCoordinatorEnabled was true
- the Select statement inside the processQueue will only look at sendMessages channel, not the actual messages channel
- If we then disable the heartbeat coordinator, the select statement inside sendMessages will get stuck (for up to the duration of idleTimer).
To avoid this, we signal the q.sendMessages here so that the
processQueueselect statement will move on into the next iteration of the for loop, which it will then detect that UseHeartbeatCoordinatorEnabled is false, and then will adjust the select parameters to wait on the messages channel instead of the q.sendMessages channel.
Done.
pkg/kv/kvserver/storeliveness/transport.go line 548 at r2 (raw file):
Previously, iskettaneh wrote…
Thoughts on adding a TODO to remove this complexity once we are sure that we don't want the old logic?
Done.
pkg/kv/kvserver/storeliveness/transport_test.go line 698 at r2 (raw file):
Previously, iskettaneh wrote…
Can you reverse the logic a bit. Basically, only exist the succeedsSoon if you see the exact expected number of messages sent?
Done.
pkg/kv/kvserver/storeliveness/transport_test.go line 825 at r2 (raw file):
Previously, iskettaneh wrote…
This was tested above in TestTransportCoordinatorBatching, right?
Done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good overall! I think we can still make some simplifications in various places. I haven't looked at the tests yet.
@miraradeva reviewed 8 of 8 files at r5, 7 of 8 files at r6, all commit messages.
Reviewable status:complete! 0 of 0 LGTMs obtained (waiting on @dodeca12 and @iskettaneh)
-- commits line 7 at r5:
nit: I don't think SendAsync implied "immediately"; in fact, it's the opposite. The rename still makes sense though, so maybe just reword this commit message a bit.
pkg/kv/kvserver/storeliveness/support_manager.go line 37 at r6 (raw file):
// heartbeat sends. Once the smearing has battle-tested, remove this and // default to using the smeared heartbeat sends approach (no more fallback). var UseHeartbeatCoordinatorEnabled = settings.RegisterBoolSetting(
nit: these cluster settings probably make more sense in the transport?
pkg/kv/kvserver/storeliveness/support_manager.go line 40 at r6 (raw file):
settings.SystemOnly, "kv.store_liveness.heartbeat_coordinator.enabled", "if enabled, heartbeat sends are the responsibility of the heartbeat "+
nit: maybe say something about smearing in this description?
It might make sense to rename the cluster setting to something like kv.store_liveness.heartbeat_smearing.enabled, and not talk about the heartbeat coordinator, which is no longer a separate component.
pkg/kv/kvserver/storeliveness/transport.go line 153 at r6 (raw file):
// This will hold the channels we need to signal to send messages. toSignal := make([]chan struct{}, 0)
Do we need to accumulate these? Can't we just signal all queues? Presumably, if they are active queues (not deleted), they will have messages to send?
pkg/kv/kvserver/storeliveness/transport.go line 422 at r6 (raw file):
func (t *Transport) SendAllEnqueuedMessages(ctx context.Context) { select { case t.sendAllMessages <- struct{}{}:
This will block if multiple support managers try to call SendAllEnqueuedMessages. I see we consume off of the channel in the new coordinator goroutine, but the fact that the support managers depend on enqueuing on a channel of size 1, and that blocks their critical goroutines, seems error prone.
Do we really need this global signal? Is there a way for the coordinator goroutine to independently realize there are messages to send? Or just attempt smearing on an ongoing basis?
pkg/kv/kvserver/storeliveness/transport.go line 575 at r6 (raw file):
// Signal received - proceed to batching logic below. case msg := <-directMessages:
This seems a bit hacky to pull one message off of the queue and then do the rest in the logic below. Maybe directMessages can be a simple struct channel of size 1, and you can signal it above if !useHeartbeatCoordinator?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status:
complete! 0 of 0 LGTMs obtained (waiting on @dodeca12 and @miraradeva)
pkg/kv/kvserver/storeliveness/transport.go line 422 at r6 (raw file):
Previously, miraradeva (Mira Radeva) wrote…
This will block if multiple support managers try to call
SendAllEnqueuedMessages. I see we consume off of the channel in the new coordinator goroutine, but the fact that the support managers depend on enqueuing on a channel of size 1, and that blocks their critical goroutines, seems error prone.Do we really need this global signal? Is there a way for the coordinator goroutine to independently realize there are messages to send? Or just attempt smearing on an ongoing basis?
Another thing we could do is just to add a default case for when we add to that channel? This way if it was full, we will not add anything.
Previously, the `MessageSender` interface exposed a method called `SendAsync` that queued Store Liveness messages for asynchronous delivery to remote stores. The name `SendAsync` was misleading because it implied messages were sent directly, when in reality they were enqueued to per-node queues and processed in batches before being sent over the network. The rename to `EnqueueMessage` better reflects that the method places messages onto an outgoing queue rather than performing the actual send operation. This patch renames `SendAsync` to `EnqueueMessage` throughout the storeliveness package. This change is also set up for the next commit which enables smearing of the heartbeats. References: cockroachdb#148210 Release note: None
291ab1d to
996ae9c
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status:
complete! 0 of 0 LGTMs obtained (waiting on @iskettaneh and @miraradeva)
Previously, miraradeva (Mira Radeva) wrote…
nit: I don't think SendAsync implied "immediately"; in fact, it's the opposite. The rename still makes sense though, so maybe just reword this commit message a bit.
done.
pkg/kv/kvserver/storeliveness/transport.go line 153 at r6 (raw file):
Previously, miraradeva (Mira Radeva) wrote…
Do we need to accumulate these? Can't we just signal all queues? Presumably, if they are active queues (not deleted), they will have messages to send?
I'm not sure what you mean by signal all the queues (I assume immediately?) - we use toSignal later down in the goroutine to smear the sends via taskpacer
pkg/kv/kvserver/storeliveness/transport.go line 422 at r6 (raw file):
Previously, iskettaneh wrote…
Another thing we could do is just to add a default case for when we add to that channel? This way if it was full, we will not add anything.
I'm confused here, I don't think it's non blocking, since we have the default case. Additionally, if we were to imagine multiple support managers calling SendallEnqueueMessages, the combination of the default + the batching in the goroutine in NewTransport should handle this. Did you mean something else by blocking?
pkg/kv/kvserver/storeliveness/transport.go line 575 at r6 (raw file):
Previously, miraradeva (Mira Radeva) wrote…
This seems a bit hacky to pull one message off of the queue and then do the rest in the logic below. Maybe
directMessagescan be a simple struct channel of size 1, and you can signal it above if!useHeartbeatCoordinator?
Done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status:
complete! 0 of 0 LGTMs obtained (waiting on @iskettaneh and @miraradeva)
pkg/kv/kvserver/storeliveness/support_manager.go line 333 at r8 (raw file):
} } if HeartbeatSmearingEnabled.Get(&sm.settings.SV) {
I tried to use the function I've created in transport.go SendHeartbeatsSmeared() here via:
t := sm.sender.(*Transport)
if t.SendHeartbeatsSmeared() {but the issue is we stub Transport in test with testMessageSender, so we get this error:
panic: interface conversion: storeliveness.MessageSender is *storeliveness.testMessageSender, not *storeliveness.Transport [recovered, repanicked]
I've thought about making changes to the MessageSender interface and then in testutils.go and all that but it felt like unneeded complexity just to use a function when we can use HeartbeatSmearingEnabled.Get(&sm.settings.SV) - is this a bad practice?
pkg/kv/kvserver/storeliveness/support_manager.go line 433 at r8 (raw file):
_ = sm.sender.EnqueueMessage(ctx, response) } if HeartbeatSmearingEnabled.Get(&sm.settings.SV) {
Same comment from above:
I tried to use the function I've created in transport.go SendHeartbeatsSmeared() here via:
t := sm.sender.(*Transport)
if t.SendHeartbeatsSmeared() {but the issue is we stub Transport in test with testMessageSender, so we get this error:
panic: interface conversion: storeliveness.MessageSender is *storeliveness.testMessageSender, not *storeliveness.Transport [recovered, repanicked]
I've thought about making changes to the MessageSender interface and then in testutils.go and all that but it felt like unneeded complexity just to use a function when we can use HeartbeatSmearingEnabled.Get(&sm.settings.SV) - is this a bad practice?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status:
complete! 0 of 0 LGTMs obtained (waiting on @dodeca12 and @iskettaneh)
pkg/kv/kvserver/storeliveness/transport.go line 153 at r6 (raw file):
Previously, dodeca12 (Swapneeth Gorantla) wrote…
I'm not sure what you mean by signal all the queues (I assume immediately?) - we use
toSignallater down in the goroutine to smear the sends viataskpacer
It seems like we're filtering out the queues that have nothing to send, and adding the rest to this slice. My question was just whether we need to do this filtering. If the queues are in t.queues, presumably they have some messages to send (otherwise they would be removed as inactive).
pkg/kv/kvserver/storeliveness/transport.go line 422 at r6 (raw file):
Previously, dodeca12 (Swapneeth Gorantla) wrote…
I'm confused here, I don't think it's non blocking, since we have the default case. Additionally, if we were to imagine multiple support managers calling
SendallEnqueueMessages, the combination of the default + the batching in the goroutine inNewTransportshould handle this. Did you mean something else by blocking?
Yeah, sorry, I missed the default case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status:
complete! 0 of 0 LGTMs obtained (waiting on @iskettaneh and @miraradeva)
pkg/kv/kvserver/storeliveness/transport.go line 51 at r8 (raw file):
var HeartbeatSmearingEnabled = settings.RegisterBoolSetting( settings.SystemOnly, "kv.store_liveness.heartbeat_smearing.enabled",
since I've moved the cluster setting here, should the label still be kv.store_liveness.heartbeat_smearing.enabled - I don't think so right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status:
complete! 0 of 0 LGTMs obtained (waiting on @iskettaneh and @miraradeva)
pkg/kv/kvserver/storeliveness/transport.go line 153 at r6 (raw file):
Previously, miraradeva (Mira Radeva) wrote…
It seems like we're filtering out the queues that have nothing to send, and adding the rest to this slice. My question was just whether we need to do this filtering. If the queues are in t.queues, presumably they have some messages to send (otherwise they would be removed as inactive).
My understanding is that there might be a scenario where after the messages are drained, but before cleanup() is called to remove the queues from t.queues, where we have a queue that is empty. However, I don't think there's any harm in signalling empty queues (though maybe some overhead?), and makes the design a bit more simple so I'll remove the check
Previously, heartbeat messages were sent immediately when enqueued via `EnqueueMessage`. In large clusters, many stores might all send heartbeats simultaneously at the same tick interval, causing a spike in runnable goroutines that caused issues elsewhere in the database process. This patch introduces a heartbeat coordinator that batches and smears heartbeat sends over a configurable duration using a taskpacer. The coordinator is enabled by default via the `kv.store_liveness.use_heartbeat_coordinator` cluster setting (defaults to true), and can be configured via `kv.store_liveness.heartbeat_coordinator.refresh` (default: 10ms) and `kv.store_liveness.heartbeat_coordinator.smear` (default: 1ms) settings. When enabled, messages are enqueued but not sent immediately. Instead, they wait in per-node queues until `SendAllEnqueuedMessages()` is called, which signals the coordinator. The coordinator waits briefly (`batchDuration`) to collect messages from multiple stores, then paces the signaling to each queue's `processQueue` goroutine using the pacer to spread sends over time. Fixes: cockroachdb#148210 Release note: None
996ae9c to
73a00a0
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status:
complete! 0 of 0 LGTMs obtained (waiting on @miraradeva)
pkg/kv/kvserver/storeliveness/transport.go line 93 at r10 (raw file):
// sendQueue is a queue of outgoing Messages. type sendQueue struct { sendMessages chan struct{}
Can you add a comment for these channel to explain what they do?
pkg/kv/kvserver/storeliveness/transport.go line 220 at r10 (raw file):
// Collect all sendQueues. t.queues.Range(func(nodeID roachpb.NodeID, q *sendQueue) bool { toSignal = append(toSignal, q.sendMessages)
We observed that we need to only signal to the processQueue gotoutines if they have something to send. Otherwise, for every HeartbeatResponse that we send to any node, the heartbeat coordinator would wake up all the goroutines, even if they don't need to do anything.
pkg/kv/kvserver/storeliveness/transport.go line 569 at r10 (raw file):
batch := &slpb.MessageBatch{} sendBatch := func() error {
nit: Given that the sendBatch() function is only used once below. Can you just inline this code where this function is called?
pkg/kv/kvserver/storeliveness/transport.go line 596 at r10 (raw file):
var directSendSignal <-chan struct{} // channel is set to nil so the select never triggers it if !t.SendHeartbeatsSmeared() { directSendSignal = q.directSend
Do we need this here?
if t.SendHeartbeatsSmeared() is false, no one will signal to q.directSend, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking good! Added some comment about the tests
Reviewable status:
complete! 0 of 0 LGTMs obtained (waiting on @dodeca12 and @miraradeva)
pkg/kv/kvserver/storeliveness/transport_test.go line 651 at r10 (raw file):
} // TestTransportHeartbeatSmearingBatching verifies that heartbeat smearing batches messages
nit: comment is larger than 80 chars
pkg/kv/kvserver/storeliveness/transport_test.go line 664 at r10 (raw file):
node3 := roachpb.NodeID(3) sender := slpb.StoreIdent{NodeID: node1, StoreID: roachpb.StoreID(1)}
nit: consider changing variables name from sender, receiver2, receiver3 to: node1Sender, node2Receiver, node3Receiver.
pkg/kv/kvserver/storeliveness/transport_test.go line 676 at r10 (raw file):
// Enqueue messages to multiple destinations. msg2 := slpb.Message{Type: slpb.MsgHeartbeat, From: sender, To: receiver2}
nit: consider changing msg2 and msg3 to: node2Message, and node3Message
pkg/kv/kvserver/storeliveness/transport_test.go line 709 at r10 (raw file):
) // Verify messages were sent.
nit: Maybe change this comment to:
// Verify that the messages contents are as expected.
pkg/kv/kvserver/storeliveness/transport_test.go line 710 at r10 (raw file):
// Verify messages were sent. select {
I don't think the select statement is needed given that we verified that the channels have a message. Consider just doing:
require.Equal(t, msg2, <-handler2.messages)
pkg/kv/kvserver/storeliveness/transport_test.go line 736 at r10 (raw file):
node1 := roachpb.NodeID(1) sender := slpb.StoreIdent{NodeID: node1, StoreID: roachpb.StoreID(1)}
For this test, consider skipping it under duress, and consider increasing the pacing refresh cluster setting to something like 250ms.
Then, assert that the time between the first and last message is greater than something like 50ms.
Bonus points: after that, you could change the pacing refresh cluster setting to 0, and assert that the time between the first and last sent messages is less than 50ms.
pkg/kv/kvserver/storeliveness/transport_test.go line 861 at r10 (raw file):
// Send a dummy message with signal to ensure processQueue completes at least // one more iteration and picks up the new setting value. dummyMsg := slpb.Message{Type: slpb.MsgHeartbeat, From: sender, To: receiver}
Why are we calling this dummyMsg? Isn't this a normal message that is sent after we enable heartbeat smearing?
pkg/kv/kvserver/storeliveness/transport_test.go line 871 at r10 (raw file):
} return errors.New("waiting for dummy message") })
Should the test end here? I my understanding is that at this point we verified that switching back and forth works as expected?
Maybe just keep the metric checks below?
pkg/kv/kvserver/storeliveness/transport_test.go line 873 at r10 (raw file):
}) // Note: this dummy message is a test test-only synchronization mechanism. // In production, `support_manager.go` will call `SendAllEnqueuedMessages`
I think the SetOnChange is being called even in the test! No need to do an extra synchronization mechanism.
This PR introduces a heartbeat coordinator that batches and smears Store Liveness heartbeat sends across destination nodes to prevent thundering herd of goroutine spikes.
Changes
Core changes are within these files:
Background
Previously, all stores in a cluster sent heartbeats immediately at each heartbeat interval tick. In large clusters with multi-store nodes, this created synchronized bursts of goroutine spikes that caused issues in other parts of the running CRDB process.
Commits
Commit 1: Rename
SendAsynctoEnqueueMessageCommit 2: Introduce heartbeat smearing
transport.gothat batches enqueued messagestaskpacerto spread traffic over timeHow it works:
EnqueueMessage()into per-node queuesSendAllEnqueuedMessages()is called, transport's goroutine waits briefly to batch messagestaskpacerto pace signaling to each queue over a smear durationprocessQueuegoroutine drains its queue and sends when signalledNew Cluster Settings
kv.store_liveness.heartbeat_smearing.enabled(default: true) - Enable/disable smearingkv.store_liveness.heartbeat_coordinator.refresh(default: 10ms) - Batching window durationkv.store_liveness.heartbeat_coordinator.smear(default: 1ms) - Time to spread sends across queuesBackward Compatibility
kv.store_liveness.heartbeat_smearing.enabled=falseTesting
All existing tests updated to call
SendAllEnqueuedMessages()after enqueuing when smearing is enabled.Roachpod testing
SupportManagergoroutines being put to sleep; this current design ensures thatSupportManagergoroutines do not get blocked) on a roachpod with 150 node cluster to verify smearing works.Fixes: #148210
Release note: None