Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions docs/user_guide/targets/target_discovery/consul_discovery.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ loader:
type: consul
services:
- name: cluster1-gnmi-server
filter: Service.Meta.environment == "production"
config:
insecure: true
username: admin
Expand Down Expand Up @@ -71,6 +72,8 @@ loader:
- name:
# a list of strings to further filter the service instances
tags:
# optional go-bexpr filter evaluated by Consul in addition to tags
filter:
# configuration map to apply to target discovered from this service
config:
# list of actions to run on target discovery
Expand All @@ -82,4 +85,19 @@ loader:
# path to variable file, the variables defined will be passed to the actions to be run
# values in this file will be overwritten by the ones defined in `vars`
vars-file:

### Filtering services

Each service entry can also define a [`filter`](https://developer.hashicorp.com/consul/api-docs/features/filtering) that Consul evaluates before returning results. Filters use [go-bexpr syntax](https://github.com/HashiCorp/go-bexpr) and are applied in addition to the `tags` list (both conditions must match).

```yaml
loader:
type: consul
services:
- name: cluster1-gnmi-server
tags: ["gnmic", "network-device"]
filter: Service.Meta.environment == "production" && Node.Datacenter == "dc1"
```

Use filters to keep existing tag-based configs working while narrowing the results with metadata such as health status, service meta fields, or node attributes.
```
162 changes: 82 additions & 80 deletions pkg/loaders/consul_loader/consul_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,11 @@ type consulLoader struct {
numActions int
}

type serviceWatchResult struct {
def *serviceDef
entries []*api.ServiceEntry
}

type cfg struct {
// Consul server address
Address string `mapstructure:"address,omitempty" json:"address,omitempty"`
Expand Down Expand Up @@ -113,6 +118,7 @@ type cfg struct {
type serviceDef struct {
Name string `mapstructure:"name,omitempty" json:"name,omitempty"`
Tags []string `mapstructure:"tags,omitempty" json:"tags,omitempty"`
Filter string `mapstructure:"filter,omitempty" json:"filter,omitempty"`
Config map[string]interface{} `mapstructure:"config,omitempty" json:"config,omitempty"`

tags map[string]struct{}
Expand Down Expand Up @@ -210,25 +216,27 @@ CLIENT:
time.Sleep(2 * time.Second)
goto CLIENT
}
sChan := make(chan []*api.ServiceEntry)
sChan := make(chan *serviceWatchResult)
go func() {
for {
select {
case <-ctx.Done():
return
case ses, ok := <-sChan:
case res, ok := <-sChan:
if !ok {
return
}
tcs := make(map[string]*types.TargetConfig)
srvName := ""
for _, se := range ses {
srvName = se.Service.Service
tc, err := c.serviceEntryToTargetConfig(se)
srvName := res.def.Name
for _, se := range res.entries {
tc, err := c.serviceEntryToTargetConfig(res.def, se)
if err != nil {
c.logger.Printf("Failed to convert service entry %+v to a target config: %v", se, err)
continue
}
if tc == nil {
continue
}
tcs[tc.Name] = tc
}

Expand All @@ -238,7 +246,7 @@ CLIENT:
}()
for _, s := range c.cfg.Services {
go func(s *serviceDef) {
err := c.startServicesWatch(ctx, s.Name, s.Tags, sChan, time.Minute)
err := c.startServicesWatch(ctx, s, sChan, time.Minute)
if err != nil {
c.logger.Printf("service %q watch stopped: %v", s.Name, err)
}
Expand All @@ -252,25 +260,27 @@ func (c *consulLoader) RunOnce(ctx context.Context) (map[string]*types.TargetCon
return nil, err
}
result := make(map[string]*types.TargetConfig)
rsChan := make(chan *api.ServiceEntry)
rsChan := make(chan *serviceWatchResult)
wg := new(sync.WaitGroup)

// fan-out queries
for _, s := range c.cfg.Services {
wg.Add(1)
go func(s *serviceDef) {
defer wg.Done()
ses, _, err := c.client.Health().ServiceMultipleTags(s.Name, s.Tags, true, &api.QueryOptions{})
var qOpts *api.QueryOptions
if s.Filter != "" {
qOpts = &api.QueryOptions{Filter: s.Filter}
}
ses, _, err := c.client.Health().ServiceMultipleTags(s.Name, s.Tags, true, qOpts)
if err != nil {
c.logger.Printf("failed to get service %q instances: %v", s.Name, err)
return
}
for _, se := range ses {
select {
case rsChan <- se:
case <-ctx.Done():
return
}
select {
case rsChan <- &serviceWatchResult{def: s, entries: ses}:
case <-ctx.Done():
return
}
}(s)
}
Expand All @@ -283,17 +293,19 @@ func (c *consulLoader) RunOnce(ctx context.Context) (map[string]*types.TargetCon

for {
select {
case se, ok := <-rsChan:
case res, ok := <-rsChan:
if !ok {
return result, nil
}
tc, err := c.serviceEntryToTargetConfig(se)
if err != nil {
c.logger.Printf("failed to convert service %+v to target config: %v", se, err)
continue
}
if tc != nil {
result[tc.Name] = tc
for _, se := range res.entries {
tc, err := c.serviceEntryToTargetConfig(res.def, se)
if err != nil {
c.logger.Printf("failed to convert service %+v to target config: %v", se, err)
continue
}
if tc != nil {
result[tc.Name] = tc
}
}
case <-ctx.Done():
return result, ctx.Err()
Expand Down Expand Up @@ -344,14 +356,15 @@ func (c *consulLoader) setDefaults() error {
return nil
}

func (c *consulLoader) startServicesWatch(ctx context.Context, serviceName string, tags []string, sChan chan<- []*api.ServiceEntry, watchTimeout time.Duration) error {
func (c *consulLoader) startServicesWatch(ctx context.Context, sd *serviceDef, sChan chan<- *serviceWatchResult, watchTimeout time.Duration) error {
if watchTimeout <= 0 {
watchTimeout = defaultWatchTimeout
}
var index uint64
qOpts := &api.QueryOptions{
WaitIndex: index,
WaitTime: watchTimeout,
Filter: sd.Filter,
}
var err error
// long blocking watch
Expand All @@ -361,11 +374,11 @@ func (c *consulLoader) startServicesWatch(ctx context.Context, serviceName strin
return ctx.Err()
default:
if c.cfg.Debug {
c.logger.Printf("(re)starting watch service=%q, index=%d", serviceName, qOpts.WaitIndex)
c.logger.Printf("(re)starting watch service=%q, index=%d", sd.Name, qOpts.WaitIndex)
}
index, err = c.watch(qOpts.WithContext(ctx), serviceName, tags, sChan)
index, err = c.watch(qOpts.WithContext(ctx), sd, sChan)
if err != nil {
c.logger.Printf("service %q watch failed: %v", serviceName, err)
c.logger.Printf("service %q watch failed: %v", sd.Name, err)
}
if index == 1 {
qOpts.WaitIndex = index
Expand All @@ -384,90 +397,79 @@ func (c *consulLoader) startServicesWatch(ctx context.Context, serviceName strin
}
}

func (c *consulLoader) watch(qOpts *api.QueryOptions, serviceName string, tags []string, sChan chan<- []*api.ServiceEntry) (uint64, error) {
se, meta, err := c.client.Health().ServiceMultipleTags(serviceName, tags, true, qOpts)
func (c *consulLoader) watch(qOpts *api.QueryOptions, sd *serviceDef, sChan chan<- *serviceWatchResult) (uint64, error) {
se, meta, err := c.client.Health().ServiceMultipleTags(sd.Name, sd.Tags, true, qOpts)
if err != nil {
return 0, err
}
if meta.LastIndex == qOpts.WaitIndex {
c.logger.Printf("service=%q did not change", serviceName)
c.logger.Printf("service=%q did not change", sd.Name)
return meta.LastIndex, nil
}
if len(se) == 0 {
return 1, nil
}
sChan <- se
sChan <- &serviceWatchResult{def: sd, entries: se}
return meta.LastIndex, nil
}

func (c *consulLoader) serviceEntryToTargetConfig(se *api.ServiceEntry) (*types.TargetConfig, error) {
func (c *consulLoader) serviceEntryToTargetConfig(sd *serviceDef, se *api.ServiceEntry) (*types.TargetConfig, error) {
tc := new(types.TargetConfig)
if se.Service == nil {
return tc, nil
}

SRV:
for _, sd := range c.cfg.Services {
// match service name
if se.Service.Service != sd.Name {
continue
}
if se.Service.Service != sd.Name {
return nil, fmt.Errorf("service entry name %q mismatches definition %q", se.Service.Service, sd.Name)
}
Comment on lines -409 to +424
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@amuntean-godaddy I think this is where you should handle the case of same serviceName with different filters.
If the service name does not match continue with the next service, if they match compare the filters. If the filters match continue with building the target config, otherwise continue with the next service.


// match service tags
if len(sd.tags) > 0 {
for requiredTag := range sd.tags {
if !slices.Contains(se.Service.Tags, requiredTag) {
goto SRV
}
if len(sd.tags) > 0 {
for requiredTag := range sd.tags {
if !slices.Contains(se.Service.Tags, requiredTag) {
return nil, fmt.Errorf("service entry %q missing required tag %q", se.Service.ID, requiredTag)
}
}
}

// decode config if present
if sd.Config != nil {
err := mapstructure.Decode(sd.Config, tc)
if err != nil {
return nil, err
}
if sd.Config != nil {
err := mapstructure.Decode(sd.Config, tc)
if err != nil {
return nil, err
}
}

tc.Address = se.Service.Address
if tc.Address == "" {
tc.Address = se.Node.Address
}
tc.Address = net.JoinHostPort(tc.Address, strconv.Itoa(se.Service.Port))
tc.Address = se.Service.Address
if tc.Address == "" {
tc.Address = se.Node.Address
}
tc.Address = net.JoinHostPort(tc.Address, strconv.Itoa(se.Service.Port))

var buffer bytes.Buffer
var buffer bytes.Buffer

tc.Name = se.Service.ID
tc.Name = se.Service.ID

if sd.targetNameTemplate != nil {
buffer.Reset()
err := sd.targetNameTemplate.Execute(&buffer, se.Service)
if err != nil {
c.logger.Println("Could not execute nameTemplate")
continue
}
tc.Name = buffer.String()
if sd.targetNameTemplate != nil {
buffer.Reset()
err := sd.targetNameTemplate.Execute(&buffer, se.Service)
if err != nil {
return nil, fmt.Errorf("execute name template: %w", err)
}
tc.Name = buffer.String()
}

// Create Event tags from Consul via templates
if len(sd.targetTagsTemplate) > 0 {
eventTags := make(map[string]string)
for tagName, tagTemplate := range sd.targetTagsTemplate {
buffer.Reset()
err := tagTemplate.Execute(&buffer, se.Service)
if err != nil {
c.logger.Println("Could not execute tagTemplate:", tagName)
return nil, err
}
eventTags[tagName] = buffer.String()
if len(sd.targetTagsTemplate) > 0 {
eventTags := make(map[string]string)
for tagName, tagTemplate := range sd.targetTagsTemplate {
buffer.Reset()
err := tagTemplate.Execute(&buffer, se.Service)
if err != nil {
return nil, fmt.Errorf("execute tag template %q: %w", tagName, err)
}
tc.EventTags = eventTags
eventTags[tagName] = buffer.String()
}
return tc, nil
tc.EventTags = eventTags
}

return nil, errors.New("unable to find a match in Consul service(s)")
return tc, nil
}

func (c *consulLoader) updateTargets(ctx context.Context, srvName string, tcs map[string]*types.TargetConfig, opChan chan *loaders.TargetOperation) {
Expand Down
50 changes: 49 additions & 1 deletion pkg/loaders/consul_loader/consul_loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,13 @@ package consul_loader

import (
"context"
"fmt"
"io"
"log"
"net/http"
"net/http/httptest"
"strings"
"sync"
"testing"

"github.com/hashicorp/consul/api"
Expand Down Expand Up @@ -59,7 +64,7 @@ func TestIssue706_ServicesWithExtraTagsFiltered(t *testing.T) {
},
}

result, err := cl.serviceEntryToTargetConfig(serviceEntry)
result, err := cl.serviceEntryToTargetConfig(cl.cfg.Services[0], serviceEntry)

if err != nil {
t.Fatalf("Expected service with extra tags to be accepted, but got error: %v", err)
Expand Down Expand Up @@ -132,3 +137,46 @@ func TestOldBuggyLogicWouldReject(t *testing.T) {
t.Logf("✓ Old logic would incorrectly reject: %v", oldLogicWouldReject)
t.Logf("✓ New logic correctly accepts: %v", newLogicShouldAccept)
}

func TestRunOnceAppliesServiceFilter(t *testing.T) {
filterExpr := `Service.Meta.profile == "arista"`
var filterChecked bool
hs := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
switch {
case r.URL.Path == "/v1/agent/self":
fmt.Fprint(w, `{"Member":{"Tags":{}}}`)
case strings.HasPrefix(r.URL.Path, "/v1/health/service/gnmi"):
if got := r.URL.Query().Get("filter"); got != filterExpr {
t.Fatalf("expected filter %q, got %q", filterExpr, got)
}
filterChecked = true
fmt.Fprint(w, `[{"Node":{"Address":"10.0.0.1"},"Service":{"ID":"target-1","Service":"gnmi","Address":"10.0.0.1","Port":6030}}]`)
default:
t.Fatalf("unexpected path: %s", r.URL.Path)
}
}))
defer hs.Close()
addr := strings.TrimPrefix(hs.URL, "http://")
cl := &consulLoader{
cfg: &cfg{
Address: addr,
Datacenter: "dc1",
Services: []*serviceDef{
{Name: "gnmi", Filter: filterExpr},
},
},
logger: log.New(io.Discard, loggingPrefix, utils.DefaultLoggingFlags),
m: new(sync.Mutex),
}
res, err := cl.RunOnce(context.Background())
if err != nil {
t.Fatalf("RunOnce returned error: %v", err)
}
if !filterChecked {
t.Fatalf("expected health query to include filter parameter")
}
if _, ok := res["target-1"]; !ok {
t.Fatalf("expected target-1 in results, got %v", res)
}
}
Loading