Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/10514.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
streaming: fix a bug that was preventing streaming from being enabled.
```
1 change: 1 addition & 0 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,7 @@ func New(bd BaseDeps) (*Agent, error) {
Conn: conn,
Logger: bd.Logger.Named("rpcclient.health"),
},
UseStreamingBackend: a.config.UseStreamingBackend,
}

a.serviceManager = NewServiceManager(&a)
Expand Down
106 changes: 0 additions & 106 deletions agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"path/filepath"
"strconv"
"strings"
"sync"
"testing"
"time"

Expand All @@ -30,7 +29,6 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
"golang.org/x/time/rate"
"google.golang.org/grpc"
"gopkg.in/square/go-jose.v2/jwt"

Expand Down Expand Up @@ -937,110 +935,6 @@ func testAgent_AddServiceNoRemoteExec(t *testing.T, extraHCL string) {
}
}

func TestCacheRateLimit(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}

t.Parallel()
tests := []struct {
// count := number of updates performed (1 every 10ms)
count int
// rateLimit rate limiting of cache
rateLimit float64
// Minimum number of updates to see from a cache perspective
// We add a value with tolerance to work even on a loaded CI
minUpdates int
}{
// 250 => we have a test running for at least 2.5s
{250, 0.5, 1},
{250, 1, 1},
{300, 2, 2},
}
for _, currentTest := range tests {
t.Run(fmt.Sprintf("rate_limit_at_%v", currentTest.rateLimit), func(t *testing.T) {
tt := currentTest
t.Parallel()
a := NewTestAgent(t, "cache = { entry_fetch_rate = 1, entry_fetch_max_burst = 100 }")
defer a.Shutdown()
testrpc.WaitForTestAgent(t, a.RPC, "dc1")

cfg := a.config
require.Equal(t, rate.Limit(1), a.config.Cache.EntryFetchRate)
require.Equal(t, 100, a.config.Cache.EntryFetchMaxBurst)
cfg.Cache.EntryFetchRate = rate.Limit(tt.rateLimit)
cfg.Cache.EntryFetchMaxBurst = 1
a.reloadConfigInternal(cfg)
require.Equal(t, rate.Limit(tt.rateLimit), a.config.Cache.EntryFetchRate)
require.Equal(t, 1, a.config.Cache.EntryFetchMaxBurst)
var wg sync.WaitGroup
stillProcessing := true

injectService := func(i int) {
srv := &structs.NodeService{
Service: "redis",
ID: "redis",
Port: 1024 + i,
Address: fmt.Sprintf("10.0.1.%d", i%255),
}

err := a.addServiceFromSource(srv, []*structs.CheckType{}, false, "", ConfigSourceRemote)
require.Nil(t, err)
}

runUpdates := func() {
wg.Add(tt.count)
for i := 0; i < tt.count; i++ {
time.Sleep(10 * time.Millisecond)
injectService(i)
wg.Done()
}
stillProcessing = false
}

getIndex := func(t *testing.T, oldIndex int) int {
req, err := http.NewRequest("GET", fmt.Sprintf("/v1/health/service/redis?cached&wait=5s&index=%d", oldIndex), nil)
require.NoError(t, err)

resp := httptest.NewRecorder()
a.srv.handler(false).ServeHTTP(resp, req)
// Key doesn't actually exist so we should get 404
if got, want := resp.Code, http.StatusOK; got != want {
t.Fatalf("bad response code got %d want %d", got, want)
}
index, err := strconv.Atoi(resp.Header().Get("X-Consul-Index"))
require.NoError(t, err)
return index
}

{
start := time.Now()
injectService(0)
// Get the first index
index := getIndex(t, 0)
require.Greater(t, index, 2)
go runUpdates()
numberOfUpdates := 0
for stillProcessing {
oldIndex := index
index = getIndex(t, oldIndex)
require.GreaterOrEqual(t, index, oldIndex, "index must be increasing only")
numberOfUpdates++
}
elapsed := time.Since(start)
qps := float64(time.Second) * float64(numberOfUpdates) / float64(elapsed)
summary := fmt.Sprintf("received %v updates in %v aka %f qps, target max was: %f qps", numberOfUpdates, elapsed, qps, tt.rateLimit)

// We must never go beyond the limit, we give 10% margin to avoid having values like 1.05 instead of 1 due to precision of clock
require.LessOrEqual(t, qps, 1.1*tt.rateLimit, fmt.Sprintf("it should never get more requests than ratelimit, had: %s", summary))
// We must have at least being notified a few times
require.GreaterOrEqual(t, numberOfUpdates, tt.minUpdates, fmt.Sprintf("It should have received a minimum of %d updates, had: %s", tt.minUpdates, summary))
}
wg.Wait()
})
}
}

func TestAddServiceIPv4TaggedDefault(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
Expand Down
42 changes: 33 additions & 9 deletions agent/health_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -739,18 +739,24 @@ func TestHealthServiceNodes(t *testing.T) {

func TestHealthServiceNodes_Blocking(t *testing.T) {
cases := []struct {
name string
hcl string
grpcMetrics bool
name string
hcl string
grpcMetrics bool
queryBackend string
}{
{name: "no streaming"},
{
name: "no streaming",
queryBackend: "blocking-query",
hcl: `use_streaming_backend = false`,
},
{
name: "streaming",
grpcMetrics: true,
hcl: `
rpc { enable_streaming = true }
use_streaming_backend = true
`,
queryBackend: "streaming",
},
}

Expand Down Expand Up @@ -856,6 +862,8 @@ use_streaming_backend = true
require.True(t, idx < newIdx, "index should have increased."+
"idx=%d, newIdx=%d", idx, newIdx)

require.Equal(t, tc.queryBackend, resp.Header().Get("X-Consul-Query-Backend"))

idx = newIdx

checkErrs()
Expand All @@ -882,6 +890,7 @@ use_streaming_backend = true

newIdx := getIndex(t, resp)
require.Equal(t, idx, newIdx)
require.Equal(t, tc.queryBackend, resp.Header().Get("X-Consul-Query-Backend"))
}

if tc.grpcMetrics {
Expand All @@ -905,16 +914,25 @@ func TestHealthServiceNodes_NodeMetaFilter(t *testing.T) {
t.Parallel()

tests := []struct {
name string
config string
name string
config string
queryBackend string
}{
{"normal", ""},
{"cache-with-streaming", `
{
name: "blocking-query",
config: `use_streaming_backend=false`,
queryBackend: "blocking-query",
},
{
name: "cache-with-streaming",
config: `
rpc{
enable_streaming=true
}
use_streaming_backend=true
`},
`,
queryBackend: "streaming",
},
}
for _, tst := range tests {
t.Run(tst.name, func(t *testing.T) {
Expand Down Expand Up @@ -986,6 +1004,8 @@ func TestHealthServiceNodes_NodeMetaFilter(t *testing.T) {
if len(nodes) != 1 || nodes[0].Checks == nil || len(nodes[0].Checks) != 0 {
t.Fatalf("bad: %v", obj)
}

require.Equal(t, tst.queryBackend, resp.Header().Get("X-Consul-Query-Backend"))
})
}
}
Expand Down Expand Up @@ -1511,6 +1531,8 @@ func testHealthIngressServiceNodes(t *testing.T, agentHCL string) {

// Should be a cache miss
require.Equal(t, "MISS", resp.Header().Get("X-Cache"))
// always a blocking query, because the ingress endpoint does not yet support streaming.
require.Equal(t, "blocking-query", resp.Header().Get("X-Consul-Query-Backend"))
}))

require.True(t, t.Run("test caching hit", func(t *testing.T) {
Expand All @@ -1525,6 +1547,8 @@ func testHealthIngressServiceNodes(t *testing.T, agentHCL string) {

// Should be a cache HIT now!
require.Equal(t, "HIT", resp.Header().Get("X-Cache"))
// always a blocking query, because the ingress endpoint does not yet support streaming.
require.Equal(t, "blocking-query", resp.Header().Get("X-Consul-Query-Backend"))
}))
}

Expand Down
7 changes: 7 additions & 0 deletions agent/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -723,6 +723,13 @@ func setMeta(resp http.ResponseWriter, m structs.QueryMetaCompat) {
setLastContact(resp, m.GetLastContact())
setKnownLeader(resp, m.GetKnownLeader())
setConsistency(resp, m.GetConsistencyLevel())
setQueryBackend(resp, m.GetBackend())
}

func setQueryBackend(resp http.ResponseWriter, backend structs.QueryBackend) {
if b := backend.String(); b != "" {
resp.Header().Set("X-Consul-Query-Backend", b)
}
}

// setCacheMeta sets http response headers to indicate cache status.
Expand Down
3 changes: 2 additions & 1 deletion agent/rpcclient/health/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ func (c *Client) ServiceNodes(
if err != nil {
return structs.IndexedCheckServiceNodes{}, cache.ResultMeta{}, err
}
return *result.Value.(*structs.IndexedCheckServiceNodes), cache.ResultMeta{Index: result.Index}, err
meta := cache.ResultMeta{Index: result.Index, Hit: result.Cached}
return *result.Value.(*structs.IndexedCheckServiceNodes), meta, err
}

out, md, err := c.getServiceNodes(ctx, req)
Expand Down
3 changes: 2 additions & 1 deletion agent/rpcclient/health/view.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,8 @@ func (s *healthView) Result(index uint64) interface{} {
result := structs.IndexedCheckServiceNodes{
Nodes: make(structs.CheckServiceNodes, 0, len(s.state)),
QueryMeta: structs.QueryMeta{
Index: index,
Index: index,
Backend: structs.QueryBackendStreaming,
},
}
for _, node := range s.state {
Expand Down
4 changes: 3 additions & 1 deletion agent/rpcclient/health/view_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,8 @@ func TestHealthView_IntegrationWithStore_WithEmptySnapshot(t *testing.T) {
empty := &structs.IndexedCheckServiceNodes{
Nodes: structs.CheckServiceNodes{},
QueryMeta: structs.QueryMeta{
Index: 1,
Index: 1,
Backend: structs.QueryBackendStreaming,
},
}

Expand Down Expand Up @@ -381,6 +382,7 @@ func TestHealthView_IntegrationWithStore_WithFullSnapshot(t *testing.T) {

func newExpectedNodes(nodes ...string) *structs.IndexedCheckServiceNodes {
result := &structs.IndexedCheckServiceNodes{}
result.QueryMeta.Backend = structs.QueryBackendStreaming
for _, node := range nodes {
result.Nodes = append(result.Nodes, structs.CheckServiceNode{
Node: &structs.Node{Node: node},
Expand Down
1 change: 1 addition & 0 deletions agent/streaming_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ func testGRPCStreamingWorking(t *testing.T, config string) {

assertIndex(t, resp)
require.NotEmpty(t, resp.Header().Get("X-Consul-Index"))
require.Equal(t, "streaming", resp.Header().Get("X-Consul-Query-Backend"))
}

func TestGRPCWithTLSConfigs(t *testing.T) {
Expand Down
5 changes: 5 additions & 0 deletions agent/structs/protobuf_compat.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type QueryMetaCompat interface {
SetIndex(uint64)
GetConsistencyLevel() string
SetConsistencyLevel(string)
GetBackend() QueryBackend
}

// GetToken helps implement the QueryOptionsCompat interface
Expand Down Expand Up @@ -269,3 +270,7 @@ func (q *QueryMeta) SetIndex(index uint64) {
func (q *QueryMeta) SetConsistencyLevel(consistencyLevel string) {
q.ConsistencyLevel = consistencyLevel
}

func (q *QueryMeta) GetBackend() QueryBackend {
return q.Backend
}
21 changes: 21 additions & 0 deletions agent/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,24 @@ func (w WriteRequest) HasTimedOut(start time.Time, rpcHoldTimeout, maxQueryTime,
return time.Since(start) > rpcHoldTimeout
}

type QueryBackend int

const (
QueryBackendBlocking QueryBackend = iota
QueryBackendStreaming
)

func (q QueryBackend) String() string {
switch q {
case QueryBackendBlocking:
return "blocking-query"
case QueryBackendStreaming:
return "streaming"
default:
return ""
}
}

// QueryMeta allows a query response to include potentially
// useful metadata about a query
type QueryMeta struct {
Expand All @@ -363,6 +381,9 @@ type QueryMeta struct {
// When NotModified is true, the response will not contain the result of
// the query.
NotModified bool

// Backend used to handle this query, either blocking-query or streaming.
Backend QueryBackend
}

// RegisterRequest is used for the Catalog.Register endpoint
Expand Down
5 changes: 5 additions & 0 deletions agent/submatview/materializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,9 +215,13 @@ func (m *Materializer) notifyUpdateLocked(err error) {
m.updateCh = make(chan struct{})
}

// Result returned from the View.
type Result struct {
Index uint64
Value interface{}
// Cached is true if the requested value was already available locally. If
// the value is false, it indicates that getFromView had to wait for an update,
Cached bool
}

// getFromView blocks until the index of the View is greater than opts.MinIndex,
Expand All @@ -237,6 +241,7 @@ func (m *Materializer) getFromView(ctx context.Context, minIndex uint64) (Result
// haven't loaded a snapshot at all yet which means we should wait for one on
// the update chan.
if result.Index > 0 && result.Index > minIndex {
result.Cached = true
return result, nil
}

Expand Down
2 changes: 1 addition & 1 deletion agent/submatview/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func (s *Store) Notify(
u := cache.UpdateEvent{
CorrelationID: correlationID,
Result: result.Value,
Meta: cache.ResultMeta{Index: result.Index},
Meta: cache.ResultMeta{Index: result.Index, Hit: result.Cached},
}
select {
case updateCh <- u:
Expand Down
Loading