@@ -14,10 +14,12 @@ import (
1414 "github.com/cockroachdb/cockroach/pkg/roachpb"
1515 "github.com/cockroachdb/cockroach/pkg/rpc/nodedialer"
1616 "github.com/cockroachdb/cockroach/pkg/rpc/rpcbase"
17+ clustersettings "github.com/cockroachdb/cockroach/pkg/settings/cluster"
1718 "github.com/cockroachdb/cockroach/pkg/util/hlc"
1819 "github.com/cockroachdb/cockroach/pkg/util/log"
1920 "github.com/cockroachdb/cockroach/pkg/util/stop"
2021 "github.com/cockroachdb/cockroach/pkg/util/syncutil"
22+ "github.com/cockroachdb/cockroach/pkg/util/taskpacer"
2123 "github.com/cockroachdb/cockroach/pkg/util/timeutil"
2224 "github.com/cockroachdb/errors"
2325 "google.golang.org/grpc"
@@ -54,7 +56,8 @@ type MessageHandler interface {
5456
5557// sendQueue is a queue of outgoing Messages.
5658type sendQueue struct {
57- messages chan slpb.Message
59+ sendMessages chan struct {}
60+ messages chan slpb.Message
5861}
5962
6063// Transport handles the RPC messages for Store Liveness.
@@ -67,22 +70,44 @@ type sendQueue struct {
6770// delivering them asynchronously.
6871type Transport struct {
6972 log.AmbientContext
70- stopper * stop.Stopper
71- clock * hlc.Clock
72- dialer * nodedialer.Dialer
73- metrics * TransportMetrics
73+ stopper * stop.Stopper
74+ clock * hlc.Clock
75+ dialer * nodedialer.Dialer
76+ metrics * TransportMetrics
77+ settings * clustersettings.Settings
7478
7579 // queues stores outgoing message queues keyed by the destination node ID.
7680 queues syncutil.Map [roachpb.NodeID , sendQueue ]
7781 // handlers stores the MessageHandler for each store on the node.
7882 handlers syncutil.Map [roachpb.StoreID , MessageHandler ]
7983
84+ // Once signaled, we will signal to all sendQueues to send all messages.
85+ sendAllMessages chan struct {}
86+
8087 // TransportKnobs includes all knobs for testing.
8188 knobs * TransportKnobs
8289}
8390
8491var _ MessageSender = (* Transport )(nil )
8592
93+ type pacerConfig struct {
94+ settings * clustersettings.Settings
95+ }
96+
97+ func (c pacerConfig ) GetRefresh () time.Duration {
98+ if c .settings != nil {
99+ return HeartbeatCoordinatorRefresh .Get (& c .settings .SV )
100+ }
101+ return 10 * time .Millisecond
102+ }
103+
104+ func (c pacerConfig ) GetSmear () time.Duration {
105+ if c .settings != nil {
106+ return HeartbeatCoordinatorSmear .Get (& c .settings .SV )
107+ }
108+ return 1 * time .Millisecond
109+ }
110+
86111// NewTransport creates a new Store Liveness Transport.
87112func NewTransport (
88113 ambient log.AmbientContext ,
@@ -91,18 +116,21 @@ func NewTransport(
91116 dialer * nodedialer.Dialer ,
92117 grpcServer * grpc.Server ,
93118 drpcMux drpc.Mux ,
119+ settings * clustersettings.Settings ,
94120 knobs * TransportKnobs ,
95121) (* Transport , error ) {
96122 if knobs == nil {
97123 knobs = & TransportKnobs {}
98124 }
99125 t := & Transport {
100- AmbientContext : ambient ,
101- stopper : stopper ,
102- clock : clock ,
103- dialer : dialer ,
104- metrics : newTransportMetrics (),
105- knobs : knobs ,
126+ AmbientContext : ambient ,
127+ stopper : stopper ,
128+ clock : clock ,
129+ dialer : dialer ,
130+ metrics : newTransportMetrics (),
131+ settings : settings ,
132+ sendAllMessages : make (chan struct {}, 1 ),
133+ knobs : knobs ,
106134 }
107135 if grpcServer != nil {
108136 slpb .RegisterStoreLivenessServer (grpcServer , t )
@@ -112,6 +140,99 @@ func NewTransport(
112140 return nil , err
113141 }
114142 }
143+
144+ // Start background goroutine to act as the transport sender coordinator. It
145+ // is responsible for instructing the sendQueues to send their messages.
146+ if err := stopper .RunAsyncTask (
147+ context .Background (), "storeliveness transport send coordinator" ,
148+ func (ctx context.Context ) {
149+ var batchTimer timeutil.Timer
150+ defer batchTimer .Stop ()
151+
152+ conf := pacerConfig {settings : settings }
153+ pacer := taskpacer .New (conf )
154+
155+ // This will hold the channels we need to signal to send messages.
156+ toSignal := make ([]chan struct {}, 0 )
157+
158+ for {
159+ select {
160+ case <- stopper .ShouldQuiesce ():
161+ return
162+
163+ case <- t .sendAllMessages :
164+ // We received a signal to send all messages. Before we do that, let's
165+ // wait for a short duration to give other stores a chance to
166+ // enqueue messages which will increase batching opportunities.
167+ batchTimer .Reset (batchDuration )
168+ for done := false ; ! done ; {
169+ select {
170+ case <- t .sendAllMessages :
171+ // Consume any additional signals to send all messages.
172+
173+ case <- batchTimer .C :
174+ // We have waited to batch messages
175+ done = true
176+ }
177+ }
178+
179+ // At this point, we have waited for a short duration. We now need
180+ // to signal all queues to send their messages.
181+
182+ // Get all the sendQueues that have messages to send. Note that the
183+ // atomicity here is per sendQueue, and not across all sendQueues.
184+ t .queues .Range (func (nodeID roachpb.NodeID , q * sendQueue ) bool {
185+ if len (q .messages ) == 0 {
186+ // Nothing to send.
187+ return true
188+ }
189+
190+ toSignal = append (toSignal , q .sendMessages )
191+ return true
192+ })
193+
194+ // There is a benign race condition here, and it happens in two cases:
195+ // 1. If after we inserted the toSignal channels, a new message is
196+ // enqueued to a new queue that we haven't added. In this case, the
197+ // t.sendAllMessages should be set, and we will pick it up in the next
198+ // iteration of the for loop.
199+ // 2. If after we inserted the toSignal channels, a new message is
200+ // added to a queue that we have already added. In this case, in the
201+ // next iteration t.sendAllMessages might be valid, but the queues
202+ // could be empty. This is not a problem because we won't wake up
203+ // any sendQueue goroutine unnecessarily.
204+
205+ // Pace the signaling of the channels.
206+ pacer .StartTask (timeutil .Now ())
207+ workLeft := len (toSignal )
208+ for workLeft > 0 {
209+ todo , by := pacer .Pace (timeutil .Now (), workLeft )
210+
211+ // Pop todo items off the toSignal slice and signal them.
212+ for i := 0 ; i < todo && workLeft > 0 ; i ++ {
213+ ch := toSignal [len (toSignal )- 1 ]
214+ toSignal = toSignal [:len (toSignal )- 1 ]
215+ select {
216+ case ch <- struct {}{}:
217+ default :
218+ }
219+ workLeft --
220+ }
221+
222+ if workLeft > 0 && timeutil .Now ().Before (by ) {
223+ time .Sleep (by .Sub (timeutil .Now ()))
224+ }
225+ }
226+
227+ // Clear toSignal for next iteration
228+ toSignal = toSignal [:0 ]
229+ }
230+ }
231+ },
232+ ); err != nil {
233+ return nil , err
234+ }
235+
115236 return t , nil
116237}
117238
@@ -265,12 +386,23 @@ func (t *Transport) EnqueueMessage(ctx context.Context, msg slpb.Message) (enque
265386 }
266387}
267388
389+ // SendAllEnqueuedMessages signals all queues to send all their messages.
390+ func (t * Transport ) SendAllEnqueuedMessages (ctx context.Context ) {
391+ select {
392+ case t .sendAllMessages <- struct {}{}:
393+ default :
394+ }
395+ }
396+
268397// getQueue returns the queue for the specified node ID and a boolean
269398// indicating whether the queue already exists (true) or was created (false).
270399func (t * Transport ) getQueue (nodeID roachpb.NodeID ) (* sendQueue , bool ) {
271400 queue , ok := t .queues .Load (nodeID )
272401 if ! ok {
273- q := sendQueue {messages : make (chan slpb.Message , sendBufferSize )}
402+ q := sendQueue {
403+ sendMessages : make (chan struct {}, 1 ),
404+ messages : make (chan slpb.Message , sendBufferSize ),
405+ }
274406 queue , ok = t .queues .LoadOrStore (nodeID , & q )
275407 }
276408 return queue , ok
@@ -365,6 +497,53 @@ func (t *Transport) processQueue(
365497 var batchTimer timeutil.Timer
366498 defer batchTimer .Stop ()
367499 batch := & slpb.MessageBatch {}
500+
501+ drainQueue := func () {
502+ for {
503+ select {
504+ case msg := <- q .messages :
505+ batch .Messages = append (batch .Messages , msg )
506+ t .metrics .SendQueueSize .Dec (1 )
507+ t .metrics .SendQueueBytes .Dec (int64 (msg .Size ()))
508+ default :
509+ return
510+ }
511+ }
512+ }
513+
514+ sendBatch := func () error {
515+ if len (batch .Messages ) == 0 {
516+ return nil
517+ }
518+ batch .Now = t .clock .NowAsClockTimestamp ()
519+ if err := stream .Send (batch ); err != nil {
520+ t .metrics .MessagesSendDropped .Inc (int64 (len (batch .Messages )))
521+ return err
522+ }
523+ t .metrics .BatchesSent .Inc (1 )
524+ t .metrics .MessagesSent .Inc (int64 (len (batch .Messages )))
525+ // Reuse the Messages slice, but zero out the contents to avoid delaying GC.
526+ for i := range batch .Messages {
527+ batch .Messages [i ] = slpb.Message {}
528+ }
529+ batch .Messages = batch .Messages [:0 ]
530+ batch .Now = hlc.ClockTimestamp {}
531+ return nil
532+ }
533+
534+ // Check if we should use the heartbeat coordinator
535+ var useHeartbeatCoordinator bool
536+ if t .settings != nil {
537+ useHeartbeatCoordinator = UseHeartbeatCoordinator .Get (& t .settings .SV )
538+ }
539+
540+ // If coordinator is enabled, we don't want to process messages directly.
541+ // We'll set the messages channel to nil so the select never triggers on it.
542+ var directMessages <- chan slpb.Message
543+ if ! useHeartbeatCoordinator {
544+ directMessages = q .messages
545+ }
546+
368547 for {
369548 idleTimer .Reset (getIdleTimeout ())
370549 select {
@@ -375,7 +554,13 @@ func (t *Transport) processQueue(
375554 t .metrics .SendQueueIdle .Inc (1 )
376555 return nil
377556
378- case msg := <- q .messages :
557+ case <- q .sendMessages :
558+ drainQueue ()
559+ if err = sendBatch (); err != nil {
560+ return err
561+ }
562+
563+ case msg := <- directMessages :
379564 batch .Messages = append (batch .Messages , msg )
380565 t .metrics .SendQueueSize .Dec (1 )
381566 t .metrics .SendQueueBytes .Dec (int64 (msg .Size ()))
0 commit comments