From 6b58206eb0959caf14c69dbb0f142d434d7eda29 Mon Sep 17 00:00:00 2001 From: William Zahary Henderson Date: Thu, 22 Jan 2026 00:33:58 +0000 Subject: [PATCH 1/2] Add metric blocklist logic to pantheon writer --- cmd/thanos/receive.go | 33 +++- pkg/receive/capnproto_writer.go | 15 ++ pkg/receive/handler.go | 1 + pkg/receive/metric_blocklist.go | 271 +++++++++++++++++++++++++++ pkg/receive/metric_blocklist_test.go | 189 +++++++++++++++++++ pkg/receive/writer.go | 15 ++ 6 files changed, 523 insertions(+), 1 deletion(-) create mode 100644 pkg/receive/metric_blocklist.go create mode 100644 pkg/receive/metric_blocklist_test.go diff --git a/cmd/thanos/receive.go b/cmd/thanos/receive.go index 1df000f4436..7f563af0b67 100644 --- a/cmd/thanos/receive.go +++ b/cmd/thanos/receive.go @@ -236,11 +236,15 @@ func runReceive( } relabeller, err := receive.NewRelabeller(conf.relabelConfigPath, reg, logger, conf.relabelConfigReloadTimer) - if err != nil { return errors.Wrap(err, "get content of relabel configuration") } + blocklist, err := receive.NewMetricBlocklist(conf.blocklistConfigPath, reg, logger, conf.blocklistConfigReloadTimer) + if err != nil { + return errors.Wrap(err, "get content of blocklist configuration") + } + var cache = storecache.NoopMatchersCache if conf.matcherCacheSize > 0 { cache, err = storecache.NewMatchersCache(storecache.WithSize(conf.matcherCacheSize), storecache.WithPromRegistry(reg)) @@ -266,6 +270,7 @@ func runReceive( Intern: conf.writerInterning, TooFarInFutureTimeWindow: int64(time.Duration(*conf.tsdbTooFarInFutureTimeWindow)), }) + writer.SetBlocklist(blocklist) var limitsConfig *receive.RootLimitsConfig if conf.writeLimitsConfig != nil { @@ -306,6 +311,7 @@ func runReceive( ReplicaHeader: conf.replicaHeader, ReplicationFactor: conf.replicationFactor, Relabeller: relabeller, + MetricBlocklist: blocklist, ReceiverMode: receiveMode, Tracer: tracer, TLSConfig: rwTLSConfig, @@ -337,6 +343,24 @@ func runReceive( } } + { + if blocklist.CanReload() { + ctx, cancel := context.WithCancel(context.Background()) + g.Add(func() error { + level.Debug(logger).Log("msg", "blocklist config initialized with file watcher.") + if err := blocklist.StartConfigReloader(ctx); err != nil { + level.Error(logger).Log("msg", "initializing blocklist config reloading.", "err", err) + return err + } + level.Info(logger).Log("msg", "blocklist config reloading initialized.") + <-ctx.Done() + return nil + }, func(error) { + cancel() + }) + } + } + grpcProbe := prober.NewGRPC() httpProbe := prober.NewHTTP() statusProber := prober.Combine( @@ -985,6 +1009,9 @@ type receiveConfig struct { relabelConfigPath *extflag.PathOrContent relabelConfigReloadTimer time.Duration + blocklistConfigPath *extflag.PathOrContent + blocklistConfigReloadTimer time.Duration + writeLimitsConfig *extflag.PathOrContent storeRateLimits store.SeriesSelectLimits limitsConfigReloadTimer time.Duration @@ -1086,6 +1113,10 @@ func (rc *receiveConfig) registerFlag(cmd extkingpin.FlagClause) { cmd.Flag("receive.relabel-config-reload-timer", "Minimum amount of time to pass for the relabel configuration to be reloaded. Helps to avoid excessive reloads."). Default("0s").Hidden().DurationVar(&rc.relabelConfigReloadTimer) + rc.blocklistConfigPath = extflag.RegisterPathOrContent(cmd, "receive.blocklist-config", "YAML file that contains metric blocklist configuration. Metrics matching blocklist rules will be dropped during ingestion.", extflag.WithEnvSubstitution()) + cmd.Flag("receive.blocklist-config-reload-timer", "Minimum amount of time to pass for the blocklist configuration to be reloaded. Helps to avoid excessive reloads."). + Default("0s").Hidden().DurationVar(&rc.blocklistConfigReloadTimer) + rc.tsdbMinBlockDuration = extkingpin.ModelDuration(cmd.Flag("tsdb.min-block-duration", "Min duration for local TSDB blocks").Default("2h").Hidden()) rc.tsdbMaxBlockDuration = extkingpin.ModelDuration(cmd.Flag("tsdb.max-block-duration", "Max duration for local TSDB blocks").Default("2h").Hidden()) diff --git a/pkg/receive/capnproto_writer.go b/pkg/receive/capnproto_writer.go index e3f1fd6929f..2547d74689f 100644 --- a/pkg/receive/capnproto_writer.go +++ b/pkg/receive/capnproto_writer.go @@ -27,6 +27,7 @@ type CapNProtoWriter struct { logger log.Logger multiTSDB TenantStorage opts *CapNProtoWriterOptions + blocklist *MetricBlocklist } func NewCapNProtoWriter(logger log.Logger, multiTSDB TenantStorage, opts *CapNProtoWriterOptions) *CapNProtoWriter { @@ -40,6 +41,12 @@ func NewCapNProtoWriter(logger log.Logger, multiTSDB TenantStorage, opts *CapNPr } } +// SetBlocklist sets the metric blocklist for the writer. +// This allows filtering out metrics that match blocklist rules after deserialization. +func (r *CapNProtoWriter) SetBlocklist(blocklist *MetricBlocklist) { + r.blocklist = blocklist +} + func (r *CapNProtoWriter) Write(ctx context.Context, tenantID string, wreq *writecapnp.Request) error { tLogger := log.With(r.logger, "tenant", tenantID) @@ -83,6 +90,14 @@ func (r *CapNProtoWriter) Write(ctx context.Context, tenantID string, wreq *writ continue } + // Check if the time series should be blocked based on blocklist rules. + if r.blocklist != nil { + if blocked, ruleName := r.blocklist.ShouldBlock(series.Labels); blocked { + r.blocklist.RecordBlocked(ruleName, tenantID) + continue + } + } + var lset labels.Labels // Check if the TSDB has cached reference for those labels. ref, lset = getRef.GetRef(series.Labels, series.Labels.Hash()) diff --git a/pkg/receive/handler.go b/pkg/receive/handler.go index cfedb288ae5..fa24b392fd6 100644 --- a/pkg/receive/handler.go +++ b/pkg/receive/handler.go @@ -113,6 +113,7 @@ type Options struct { ForwardTimeout time.Duration MaxBackoff time.Duration Relabeller *Relabeller + MetricBlocklist *MetricBlocklist TSDBStats TSDBStats Limiter *Limiter AsyncForwardWorkerCount uint diff --git a/pkg/receive/metric_blocklist.go b/pkg/receive/metric_blocklist.go new file mode 100644 index 00000000000..e62126aae4f --- /dev/null +++ b/pkg/receive/metric_blocklist.go @@ -0,0 +1,271 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package receive + +import ( + "context" + "fmt" + "os" + "time" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/prometheus/prometheus/model/labels" + "github.com/thanos-io/thanos/pkg/extkingpin" + "go.uber.org/atomic" + "gopkg.in/yaml.v2" +) + +// BlocklistRule represents a single metric blocking rule. +// This matches the m3 coordinator mapping rule format. +type BlocklistRule struct { + // Name is a human-readable identifier for the rule. + Name string `yaml:"name"` + // Filter is the filter expression in m3 coordinator format. + // Format: "__name__:metric_pattern label1:value1 label2:value2" + // Should support glob patterns with '*' for prefix/suffix matching. + Filter string `yaml:"filter"` + // Drop indicates whether matching metrics should be dropped. + // Only rules with drop=true are considered blocking rules. + Drop bool `yaml:"drop"` +} + +// BlocklistConfig is a collection of blocklist rules. +type BlocklistConfig struct { + Rules []BlocklistRule `yaml:"rules"` +} + +// MetricBlocklist is responsible for managing metric blocklist configuration +// and evaluating whether metrics should be blocked from ingestion. +// The configuration supports hot-reloading via atomic pointer swap. +type MetricBlocklist struct { + configPathOrContent fileContent + config *atomic.Pointer[BlocklistConfig] + logger log.Logger + configReloadCounter prometheus.Counter + configReloadFailedCounter prometheus.Counter + configReloadTimer time.Duration + + // Metrics for blocked writes. + blockedSeriesTotal *prometheus.CounterVec +} + +// MetricBlocklistMetrics holds Prometheus metrics for the blocklist. +type MetricBlocklistMetrics struct { + BlockedSeriesTotal *prometheus.CounterVec +} + +// NewMetricBlocklistMetrics creates metrics for the metric blocklist. +func NewMetricBlocklistMetrics(reg prometheus.Registerer) *MetricBlocklistMetrics { + return &MetricBlocklistMetrics{ + BlockedSeriesTotal: promauto.With(reg).NewCounterVec( + prometheus.CounterOpts{ + Namespace: "thanos", + Subsystem: "receive", + Name: "blocked_series_total", + Help: "Total number of series blocked by blocklist rules.", + }, + []string{"rule_name", "tenant"}, + ), + } +} + +// NewMetricBlocklist creates a new MetricBlocklist and loads the configuration. +func NewMetricBlocklist( + configFile fileContent, + reg prometheus.Registerer, + logger log.Logger, + configReloadTimer time.Duration, +) (*MetricBlocklist, error) { + var config atomic.Pointer[BlocklistConfig] + config.Store(&BlocklistConfig{}) + + blocklist := &MetricBlocklist{ + configPathOrContent: configFile, + config: &config, + logger: logger, + configReloadTimer: configReloadTimer, + } + + if reg != nil { + blocklist.configReloadCounter = promauto.With(reg).NewCounter( + prometheus.CounterOpts{ + Namespace: "thanos", + Subsystem: "receive", + Name: "blocklist_config_reload_total", + Help: "Total number of blocklist configuration reloads.", + }, + ) + blocklist.configReloadFailedCounter = promauto.With(reg).NewCounter( + prometheus.CounterOpts{ + Namespace: "thanos", + Subsystem: "receive", + Name: "blocklist_config_reload_err_total", + Help: "Total number of failed blocklist configuration reloads.", + }, + ) + blocklist.blockedSeriesTotal = promauto.With(reg).NewCounterVec( + prometheus.CounterOpts{ + Namespace: "thanos", + Subsystem: "receive", + Name: "blocked_series_total", + Help: "Total number of series blocked by blocklist rules.", + }, + []string{"rule_name", "tenant"}, + ) + } + + if configFile == nil { + return blocklist, nil + } + + if err := blocklist.loadConfig(); err != nil { + return nil, errors.Wrap(err, "load blocklist config") + } + + return blocklist, nil +} + +// newMetricBlocklistWithConstantConfig creates a MetricBlocklist with a constant config. +// This is useful for testing. +func newMetricBlocklistWithConstantConfig(config BlocklistConfig, logger log.Logger) *MetricBlocklist { + var configPtr atomic.Pointer[BlocklistConfig] + configPtr.Store(&config) + return &MetricBlocklist{ + configPathOrContent: nil, + config: &configPtr, + logger: logger, + } +} + +// Config returns the current blocklist configuration. +// This is concurrent safe. +func (b *MetricBlocklist) Config() BlocklistConfig { + if b == nil { + return BlocklistConfig{} + } + return *b.config.Load() +} + +// setConfig sets the blocklist config atomically. +func (b *MetricBlocklist) setConfig(config BlocklistConfig) { + b.config.Store(&config) +} + +// loadConfig loads the blocklist configuration from the configured source. +func (b *MetricBlocklist) loadConfig() error { + content, err := b.configPathOrContent.Content() + if err != nil { + // If file does not exist, set an empty config. + if errors.Is(err, os.ErrNotExist) { + level.Debug(b.logger).Log("msg", "blocklist config file does not exist") + b.setConfig(BlocklistConfig{}) + return nil + } + return errors.Wrap(err, "getting content of blocklist config") + } + + var config BlocklistConfig + if err := yaml.UnmarshalStrict(content, &config); err != nil { + return errors.Wrap(err, "parsing blocklist config") + } + + // Filter to only include drop rules. + dropRules := make([]BlocklistRule, 0, len(config.Rules)) + for _, rule := range config.Rules { + if rule.Drop { + dropRules = append(dropRules, rule) + } + } + config.Rules = dropRules + + level.Info(b.logger).Log("msg", "loaded blocklist config", "num_rules", len(config.Rules)) + b.setConfig(config) + return nil +} + +// StartConfigReloader starts the automatic configuration reloader. +func (b *MetricBlocklist) StartConfigReloader(ctx context.Context) error { + if !b.CanReload() { + return nil + } + + return extkingpin.PathContentReloader(ctx, b.configPathOrContent, b.logger, func() { + level.Info(b.logger).Log("msg", "reloading blocklist config") + + if err := b.loadConfig(); err != nil { + if b.configReloadFailedCounter != nil { + b.configReloadFailedCounter.Inc() + } + errMsg := fmt.Sprintf("error reloading blocklist config from %s", b.configPathOrContent.Path()) + level.Error(b.logger).Log("msg", errMsg, "err", err) + return + } + + if b.configReloadCounter != nil { + b.configReloadCounter.Inc() + } + }, b.configReloadTimer) +} + +// CanReload returns whether the blocklist can be hot-reloaded. +func (b *MetricBlocklist) CanReload() bool { + if b.configReloadTimer == 0 { + return false + } + if b.configPathOrContent == nil { + return false + } + if b.configPathOrContent.Path() == "" { + return false + } + return true +} + +// ShouldBlock evaluates whether a time series should be blocked based on the blocklist rules. +// Returns (shouldBlock, matchedRuleName). +// NOTE: The actual filter matching logic is implemented in a separate PR. +// This method will be updated to use the filter matching implementation. +func (b *MetricBlocklist) ShouldBlock(lset labels.Labels) (bool, string) { + if b == nil { + return false, "" + } + + config := b.Config() + if len(config.Rules) == 0 { + return false, "" + } + + for _, rule := range config.Rules { + if matchesFilter(lset, rule.Filter) { + return true, rule.Name + } + } + + return false, "" +} + +// RecordBlocked records a blocked series metric. +func (b *MetricBlocklist) RecordBlocked(ruleName, tenant string) { + if b == nil || b.blockedSeriesTotal == nil { + return + } + b.blockedSeriesTotal.WithLabelValues(ruleName, tenant).Inc() +} + +// matchesFilter checks if a label set matches the given filter expression. +// NOTE: This is a placeholder. The actual implementation is in a separate PR. +// Filter format: "__name__:metric_pattern label1:value1 label2:*" +func matchesFilter(lset labels.Labels, filter string) bool { + // TODO: Implement filter matching logic in Yuchen's PR. + // The filter matching will parse the filter string and check if the label set matches. + // It should support: + // - Exact matches: "label:value" + // - Glob patterns: "label:prefix*", "label:*suffix", "label:*" + // - Multiple label conditions (AND logic) + return false +} diff --git a/pkg/receive/metric_blocklist_test.go b/pkg/receive/metric_blocklist_test.go new file mode 100644 index 00000000000..d2a2cec6b65 --- /dev/null +++ b/pkg/receive/metric_blocklist_test.go @@ -0,0 +1,189 @@ +// Copyright (c) The Thanos Authors. +// Licensed under the Apache License 2.0. + +package receive + +import ( + "context" + "os" + "path/filepath" + "reflect" + "testing" + "time" + + "github.com/efficientgo/core/testutil" + "github.com/go-kit/log" + "github.com/stretchr/testify/require" + "github.com/thanos-io/thanos/pkg/extkingpin" +) + +func TestMetricBlocklist_ConfigReload(t *testing.T) { + t.Parallel() + + tempDir := t.TempDir() + tempFilePath := filepath.Join(tempDir, "blocklist.yaml") + + initialConfig := `rules: + - name: "block test metric" + filter: "__name__:test_metric_*" + drop: true +` + err := os.WriteFile(tempFilePath, []byte(initialConfig), 0644) + testutil.Ok(t, err) + + configFile, err := extkingpin.NewStaticPathContent(tempFilePath) + testutil.Ok(t, err) + + blocklist, err := NewMetricBlocklist(configFile, nil, log.NewLogfmtLogger(os.Stdout), 1*time.Second) + testutil.Ok(t, err) + + // Verify initial config loaded. + config := blocklist.Config() + testutil.Equals(t, 1, len(config.Rules)) + testutil.Equals(t, "block test metric", config.Rules[0].Name) + testutil.Equals(t, "__name__:test_metric_*", config.Rules[0].Filter) + testutil.Equals(t, true, config.Rules[0].Drop) + + // Start reloader in background. + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + err = blocklist.StartConfigReloader(ctx) + testutil.Ok(t, err) + + // Update config file using Rewrite to properly trigger the file watcher. + updatedConfig := []byte(`rules: + - name: "block new metric" + filter: "__name__:new_metric_*" + drop: true + - name: "block another metric" + filter: "__name__:another_metric" + drop: true +`) + testutil.Ok(t, configFile.Rewrite(updatedConfig)) + + // Wait for reload using Eventually pattern. + expectedConfig := BlocklistConfig{ + Rules: []BlocklistRule{ + {Name: "block new metric", Filter: "__name__:new_metric_*", Drop: true}, + {Name: "block another metric", Filter: "__name__:another_metric", Drop: true}, + }, + } + require.Eventually(t, func() bool { + return reflect.DeepEqual(expectedConfig, blocklist.Config()) + }, 5*time.Second, 100*time.Millisecond) +} + +func TestMetricBlocklist_CanReload(t *testing.T) { + t.Parallel() + + tempDir := t.TempDir() + tempFilePath := filepath.Join(tempDir, "blocklist.yaml") + + err := os.WriteFile(tempFilePath, []byte("rules: []"), 0644) + testutil.Ok(t, err) + + tests := []struct { + name string + configFile fileContent + reloadTime time.Duration + wantReload bool + }{ + { + name: "can reload with file path and reload timer", + configFile: mustCreateStaticPathContent(t, tempFilePath), + reloadTime: 1 * time.Second, + wantReload: true, + }, + { + name: "cannot reload with zero reload timer", + configFile: mustCreateStaticPathContent(t, tempFilePath), + reloadTime: 0, + wantReload: false, + }, + { + name: "cannot reload with nil config file", + configFile: nil, + reloadTime: 1 * time.Second, + wantReload: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + blocklist, err := NewMetricBlocklist(tt.configFile, nil, log.NewLogfmtLogger(os.Stdout), tt.reloadTime) + testutil.Ok(t, err) + if tt.wantReload { + testutil.Assert(t, blocklist.CanReload(), "expected CanReload to return true") + } else { + testutil.Assert(t, !blocklist.CanReload(), "expected CanReload to return false") + } + }) + } +} + +func TestMetricBlocklist_OnlyDropRulesLoaded(t *testing.T) { + t.Parallel() + + tempDir := t.TempDir() + tempFilePath := filepath.Join(tempDir, "blocklist.yaml") + + // Config with both drop and non-drop rules. + config := `rules: + - name: "drop rule" + filter: "__name__:drop_metric" + drop: true + - name: "keep rule (should be filtered out)" + filter: "__name__:keep_metric" + drop: false + - name: "another drop rule" + filter: "__name__:another_drop" + drop: true +` + err := os.WriteFile(tempFilePath, []byte(config), 0644) + testutil.Ok(t, err) + + configFile, err := extkingpin.NewStaticPathContent(tempFilePath) + testutil.Ok(t, err) + + blocklist, err := NewMetricBlocklist(configFile, nil, log.NewLogfmtLogger(os.Stdout), 0) + testutil.Ok(t, err) + + // Only drop rules should be loaded. + loadedConfig := blocklist.Config() + testutil.Equals(t, 2, len(loadedConfig.Rules)) + testutil.Equals(t, "drop rule", loadedConfig.Rules[0].Name) + testutil.Equals(t, "another drop rule", loadedConfig.Rules[1].Name) +} + +func TestMetricBlocklist_EmptyConfig(t *testing.T) { + t.Parallel() + + blocklist, err := NewMetricBlocklist(nil, nil, log.NewLogfmtLogger(os.Stdout), 0) + testutil.Ok(t, err) + + config := blocklist.Config() + testutil.Equals(t, 0, len(config.Rules)) +} + +func TestMetricBlocklist_NilBlocklist(t *testing.T) { + t.Parallel() + + var blocklist *MetricBlocklist + + // Should not panic and return empty config. + config := blocklist.Config() + testutil.Equals(t, 0, len(config.Rules)) + + // ShouldBlock should return false for nil blocklist. + blocked, ruleName := blocklist.ShouldBlock(nil) + testutil.Assert(t, !blocked, "expected ShouldBlock to return false for nil blocklist") + testutil.Equals(t, "", ruleName) +} + +func mustCreateStaticPathContent(t *testing.T, path string) fileContent { + t.Helper() + content, err := extkingpin.NewStaticPathContent(path) + testutil.Ok(t, err) + return content +} diff --git a/pkg/receive/writer.go b/pkg/receive/writer.go index d17fd6453cc..fde0c6e9ffc 100644 --- a/pkg/receive/writer.go +++ b/pkg/receive/writer.go @@ -59,6 +59,7 @@ type Writer struct { logger log.Logger multiTSDB TenantStorage opts *WriterOptions + blocklist *MetricBlocklist } func NewWriter(logger log.Logger, multiTSDB TenantStorage, opts *WriterOptions) *Writer { @@ -72,6 +73,12 @@ func NewWriter(logger log.Logger, multiTSDB TenantStorage, opts *WriterOptions) } } +// SetBlocklist sets the metric blocklist for the writer. +// This allows filtering out metrics that match blocklist rules after deserialization. +func (r *Writer) SetBlocklist(blocklist *MetricBlocklist) { + r.blocklist = blocklist +} + func (r *Writer) Write(ctx context.Context, tenantID string, wreq []prompb.TimeSeries) error { tLogger := log.With(r.logger, "tenant", tenantID) @@ -109,6 +116,14 @@ func (r *Writer) Write(ctx context.Context, tenantID string, wreq []prompb.TimeS lset := labelpb.ZLabelsToPromLabels(t.Labels) + // Check if the time series should be blocked based on blocklist rules. + if r.blocklist != nil { + if blocked, ruleName := r.blocklist.ShouldBlock(lset); blocked { + r.blocklist.RecordBlocked(ruleName, tenantID) + continue + } + } + // Check if the TSDB has cached reference for those labels. ref, lset = getRef.GetRef(lset, lset.Hash()) if ref == 0 { From da69a09a41316bfd1e65f5a9e117eff3e0375c4c Mon Sep 17 00:00:00 2001 From: William Zahary Henderson Date: Thu, 22 Jan 2026 01:24:48 +0000 Subject: [PATCH 2/2] linter --- pkg/receive/metric_blocklist.go | 14 +------------- 1 file changed, 1 insertion(+), 13 deletions(-) diff --git a/pkg/receive/metric_blocklist.go b/pkg/receive/metric_blocklist.go index e62126aae4f..4911db6ccc7 100644 --- a/pkg/receive/metric_blocklist.go +++ b/pkg/receive/metric_blocklist.go @@ -130,18 +130,6 @@ func NewMetricBlocklist( return blocklist, nil } -// newMetricBlocklistWithConstantConfig creates a MetricBlocklist with a constant config. -// This is useful for testing. -func newMetricBlocklistWithConstantConfig(config BlocklistConfig, logger log.Logger) *MetricBlocklist { - var configPtr atomic.Pointer[BlocklistConfig] - configPtr.Store(&config) - return &MetricBlocklist{ - configPathOrContent: nil, - config: &configPtr, - logger: logger, - } -} - // Config returns the current blocklist configuration. // This is concurrent safe. func (b *MetricBlocklist) Config() BlocklistConfig { @@ -259,7 +247,7 @@ func (b *MetricBlocklist) RecordBlocked(ruleName, tenant string) { // matchesFilter checks if a label set matches the given filter expression. // NOTE: This is a placeholder. The actual implementation is in a separate PR. -// Filter format: "__name__:metric_pattern label1:value1 label2:*" +// Filter format: "__name__:metric_pattern label1:value1 label2:*". func matchesFilter(lset labels.Labels, filter string) bool { // TODO: Implement filter matching logic in Yuchen's PR. // The filter matching will parse the filter string and check if the label set matches.