Skip to content

Commit 492cc3b

Browse files
authored
Merge pull request #1748 from adnxn/ha-jitter
Add jitter to HA deduping heartbeats (#1543)
2 parents a8bf91f + 964cc2a commit 492cc3b

File tree

5 files changed

+54
-15
lines changed

5 files changed

+54
-15
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
* [ENHANCEMENT] Allocation improvements in adding samples to Chunk. #1706
1616
* [ENHANCEMENT] Consul client now follows recommended practices for blocking queries wrt returned Index value. #1708
1717
* [ENHANCEMENT] Consul client can optionally rate-limit itself during Watch (used e.g. by ring watchers) and WatchPrefix (used by HA feature) operations. Rate limiting is disabled by default. New flags added: `--consul.watch-rate-limit`, and `--consul.watch-burst-size`. #1708
18+
* [ENHANCEMENT] Added jitter to HA deduping heartbeats, configure using `distributor.ha-tracker.update-timeout-jitter-max` #1534
1819

1920
## 0.3.0 / 2019-10-11
2021

pkg/cortex/cortex.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,9 @@ func (c *Config) Validate() error {
134134
if err := c.LimitsConfig.Validate(c.Distributor.ShardByAllLabels); err != nil {
135135
return errors.Wrap(err, "invalid limits config")
136136
}
137+
if err := c.Distributor.Validate(); err != nil {
138+
return errors.Wrap(err, "invalid distributor config")
139+
}
137140
return nil
138141
}
139142

pkg/distributor/distributor.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,11 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
144144
f.BoolVar(&cfg.ShardByAllLabels, "distributor.shard-by-all-labels", false, "Distribute samples based on all labels, as opposed to solely by user and metric name.")
145145
}
146146

147+
// Validate config and returns error on failure
148+
func (cfg *Config) Validate() error {
149+
return cfg.HATrackerConfig.Validate()
150+
}
151+
147152
// New constructs a new Distributor
148153
func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Overrides, ring ring.ReadRing) (*Distributor, error) {
149154
if cfg.ingesterClientFactory == nil {

pkg/distributor/ha_tracker.go

Lines changed: 31 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"flag"
66
"fmt"
7+
"math/rand"
78
"net/http"
89
"strings"
910
"sync"
@@ -62,9 +63,10 @@ func NewReplicaDesc() *ReplicaDesc {
6263
// Track the replica we're accepting samples from
6364
// for each HA cluster we know about.
6465
type haTracker struct {
65-
logger log.Logger
66-
cfg HATrackerConfig
67-
client kv.Client
66+
logger log.Logger
67+
cfg HATrackerConfig
68+
client kv.Client
69+
updateTimeoutJitter time.Duration
6870

6971
// Replicas we are accepting samples from.
7072
electedLock sync.RWMutex
@@ -80,7 +82,8 @@ type HATrackerConfig struct {
8082
// We should only update the timestamp if the difference
8183
// between the stored timestamp and the time we received a sample at
8284
// is more than this duration.
83-
UpdateTimeout time.Duration `yaml:"ha_tracker_update_timeout"`
85+
UpdateTimeout time.Duration `yaml:"ha_tracker_update_timeout"`
86+
UpdateTimeoutJitterMax time.Duration `yaml:"ha_tracker_update_timeout_jitter_max"`
8487
// We should only failover to accepting samples from a replica
8588
// other than the replica written in the KVStore if the difference
8689
// between the stored timestamp and the time we received a sample is
@@ -100,6 +103,10 @@ func (cfg *HATrackerConfig) RegisterFlags(f *flag.FlagSet) {
100103
"distributor.ha-tracker.update-timeout",
101104
15*time.Second,
102105
"Update the timestamp in the KV store for a given cluster/replica only after this amount of time has passed since the current stored timestamp.")
106+
f.DurationVar(&cfg.UpdateTimeoutJitterMax,
107+
"distributor.ha-tracker.update-timeout-jitter-max",
108+
5*time.Second,
109+
"To spread the HA deduping heartbeats out over time.")
103110
f.DurationVar(&cfg.FailoverTimeout,
104111
"distributor.ha-tracker.failover-timeout",
105112
30*time.Second,
@@ -108,21 +115,33 @@ func (cfg *HATrackerConfig) RegisterFlags(f *flag.FlagSet) {
108115
cfg.KVStore.RegisterFlagsWithPrefix("distributor.ha-tracker.", f)
109116
}
110117

118+
// Validate config and returns error on failure
119+
func (cfg *HATrackerConfig) Validate() error {
120+
if cfg.FailoverTimeout < cfg.UpdateTimeout+cfg.UpdateTimeoutJitterMax+time.Second {
121+
return fmt.Errorf("HA Tracker failover timeout (%v) must be at least 1s greater than update timeout (%v)",
122+
cfg.FailoverTimeout, cfg.UpdateTimeout+cfg.UpdateTimeoutJitterMax+time.Second)
123+
}
124+
return nil
125+
}
126+
111127
// NewClusterTracker returns a new HA cluster tracker using either Consul
112128
// or in-memory KV store.
113129
func newClusterTracker(cfg HATrackerConfig) (*haTracker, error) {
114130
codec := codec.Proto{Factory: ProtoReplicaDescFactory}
115131

116-
if cfg.FailoverTimeout <= cfg.UpdateTimeout {
117-
return nil, fmt.Errorf("HA Tracker failover timeout must be greater than update timeout, %d is <= %d", cfg.FailoverTimeout, cfg.UpdateTimeout)
132+
var jitter time.Duration
133+
if cfg.UpdateTimeoutJitterMax > 0 {
134+
jitter = time.Duration(rand.Int63n(int64(2*cfg.UpdateTimeoutJitterMax))) - cfg.UpdateTimeoutJitterMax
118135
}
136+
119137
ctx, cancel := context.WithCancel(context.Background())
120138
t := haTracker{
121-
logger: util.Logger,
122-
cfg: cfg,
123-
done: make(chan struct{}),
124-
elected: map[string]ReplicaDesc{},
125-
cancel: cancel,
139+
logger: util.Logger,
140+
cfg: cfg,
141+
updateTimeoutJitter: jitter,
142+
done: make(chan struct{}),
143+
elected: map[string]ReplicaDesc{},
144+
cancel: cancel,
126145
}
127146

128147
if cfg.EnableHATracker {
@@ -213,7 +232,7 @@ func (c *haTracker) checkKVStore(ctx context.Context, key, replica string, now t
213232

214233
// We don't need to CAS and update the timestamp in the KV store if the timestamp we've received
215234
// this sample at is less than updateTimeout amount of time since the timestamp in the KV store.
216-
if desc.Replica == replica && now.Sub(timestamp.Time(desc.ReceivedAt)) < c.cfg.UpdateTimeout {
235+
if desc.Replica == replica && now.Sub(timestamp.Time(desc.ReceivedAt)) < c.cfg.UpdateTimeout+c.updateTimeoutJitter {
217236
return nil, false, nil
218237
}
219238

pkg/distributor/ha_tracker_test.go

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ func TestFailoverGreaterUpdate(t *testing.T) {
7373
in: HATrackerConfig{
7474
EnableHATracker: true,
7575
UpdateTimeout: time.Second,
76-
FailoverTimeout: 999 * time.Millisecond,
76+
FailoverTimeout: 1999 * time.Millisecond,
7777
KVStore: kv.Config{
7878
Store: "inmemory",
7979
},
@@ -84,7 +84,18 @@ func TestFailoverGreaterUpdate(t *testing.T) {
8484
in: HATrackerConfig{
8585
EnableHATracker: true,
8686
UpdateTimeout: time.Second,
87-
FailoverTimeout: 1001 * time.Millisecond,
87+
FailoverTimeout: 2000 * time.Millisecond,
88+
KVStore: kv.Config{
89+
Store: "inmemory",
90+
},
91+
},
92+
fail: false,
93+
},
94+
{
95+
in: HATrackerConfig{
96+
EnableHATracker: true,
97+
UpdateTimeout: time.Second,
98+
FailoverTimeout: 2001 * time.Millisecond,
8899
KVStore: kv.Config{
89100
Store: "inmemory",
90101
},
@@ -94,7 +105,7 @@ func TestFailoverGreaterUpdate(t *testing.T) {
94105
}
95106

96107
for _, c := range cases {
97-
_, err := newClusterTracker(c.in)
108+
err := c.in.Validate()
98109
fail := err != nil
99110
assert.Equal(t, c.fail, fail, "unexpected result: %s", err)
100111
}

0 commit comments

Comments
 (0)