diff --git a/Insanedocfile b/Insanedocfile
index 65db2b5f8..45cc77686 100644
--- a/Insanedocfile
+++ b/Insanedocfile
@@ -3,21 +3,17 @@ extractors:
fn-list: '"fn-list" #4 /Plugin\)\s(.+)\s{/'
match-modes: '"match-modes" /MatchMode(.*),/ /\"(.*)\"/'
do-if-node: '"do-if-node" /Node(\w+)\s/'
- do-if-field-op: '"do-if-field-op" /field(\w+)OpTag\s/'
- do-if-logical-op: '"do-if-logical-op" /logical(\w+)Tag\s/'
decorators:
config-params: '_ _ /*`%s`* / /*`default=%s`* / /*`%s`* / /*`options=%s`* /'
fn-list: '_ _ /`%s`/'
match-modes: '_ /%s/ /`match_mode: %s`/'
do-if-node: '_ /%s/'
- do-if-field-op: '_ /%s/'
- do-if-logical-op: '_ /%s/'
templates:
- template: docs/*.idoc.md
files: ["../pipeline/*.go"]
- template: pipeline/*.idoc.md
files: ["*.go"]
- - template: pipeline/doif/*.idoc.md
+ - template: pipeline/do_if/*.idoc.md
files: ["*.go"]
- template: plugin/*/*/README.idoc.md
files: ["*.go"]
diff --git a/cfg/config.go b/cfg/config.go
index 4c106b928..b6ca25250 100644
--- a/cfg/config.go
+++ b/cfg/config.go
@@ -731,3 +731,20 @@ func mergeYAMLs(a, b map[interface{}]interface{}) map[interface{}]interface{} {
}
return merged
}
+
+func AnyToInt(v any) (int, error) {
+ switch vNum := v.(type) {
+ case int:
+ return vNum, nil
+ case float64:
+ return int(vNum), nil
+ case json.Number:
+ vInt64, err := vNum.Int64()
+ if err != nil {
+ return 0, err
+ }
+ return int(vInt64), nil
+ default:
+ return 0, fmt.Errorf("not convertable to int: value=%v type=%T", v, v)
+ }
+}
diff --git a/cfg/matchrule/matchrule.go b/cfg/matchrule/matchrule.go
index b49f3dc4e..4e123fb4b 100644
--- a/cfg/matchrule/matchrule.go
+++ b/cfg/matchrule/matchrule.go
@@ -27,6 +27,19 @@ func (m *Mode) UnmarshalJSON(i []byte) error {
return nil
}
+func (m *Mode) ToString() string {
+ switch *m {
+ case ModeContains:
+ return "contains"
+ case ModePrefix:
+ return "prefix"
+ case ModeSuffix:
+ return "suffix"
+ default:
+ panic("unreachable")
+ }
+}
+
const (
ModePrefix Mode = iota
ModeContains
@@ -66,6 +79,14 @@ type Rule struct {
prepared bool
}
+func (r *Rule) GetMinValueSize() int {
+ return r.minValueSize
+}
+
+func (r *Rule) GetMaxValueSize() int {
+ return r.maxValueSize
+}
+
func (r *Rule) Prepare() {
if len(r.Values) == 0 {
return
@@ -186,6 +207,17 @@ var (
condOrBytes = []byte(`"or"`)
)
+func (c *Cond) ToString() string {
+ switch *c {
+ case CondAnd:
+ return "and"
+ case CondOr:
+ return "or"
+ default:
+ panic("unreachable")
+ }
+}
+
type RuleSet struct {
// > @3@4@5@6
// >
diff --git a/decoder/common.go b/decoder/common.go
index 01867661a..16f6ef1e9 100644
--- a/decoder/common.go
+++ b/decoder/common.go
@@ -1,27 +1,5 @@
package decoder
-import (
- "encoding/json"
- "errors"
-)
-
-func anyToInt(v any) (int, error) {
- switch vNum := v.(type) {
- case int:
- return vNum, nil
- case float64:
- return int(vNum), nil
- case json.Number:
- vInt64, err := vNum.Int64()
- if err != nil {
- return 0, err
- }
- return int(vInt64), nil
- default:
- return 0, errors.New("value is not convertable to int")
- }
-}
-
// atoi is allocation free ASCII number to integer conversion
func atoi(b []byte) (int, bool) {
if len(b) == 0 {
diff --git a/decoder/json.go b/decoder/json.go
index 492cd2169..8a3d945c2 100644
--- a/decoder/json.go
+++ b/decoder/json.go
@@ -6,6 +6,7 @@ import (
"slices"
"sync"
+ "github.com/ozontech/file.d/cfg"
insaneJSON "github.com/ozontech/insane-json"
"github.com/tidwall/gjson"
)
@@ -140,7 +141,7 @@ func extractJsonParams(params map[string]any) (jsonParams, error) {
return jsonParams{}, fmt.Errorf("%q must be map", jsonMaxFieldsSizeParam)
}
for k, v := range maxFieldsSizeMap {
- vInt, err := anyToInt(v)
+ vInt, err := cfg.AnyToInt(v)
if err != nil {
return jsonParams{}, fmt.Errorf("each value in %q must be int", jsonMaxFieldsSizeParam)
}
diff --git a/fd/util.go b/fd/util.go
index 38c3ecccd..928219678 100644
--- a/fd/util.go
+++ b/fd/util.go
@@ -11,13 +11,14 @@ import (
"github.com/ozontech/file.d/logger"
"github.com/ozontech/file.d/pipeline"
"github.com/ozontech/file.d/pipeline/antispam"
- "github.com/ozontech/file.d/pipeline/doif"
+ "github.com/ozontech/file.d/pipeline/do_if"
)
func extractPipelineParams(settings *simplejson.Json) *pipeline.Settings {
capacity := pipeline.DefaultCapacity
antispamThreshold := pipeline.DefaultAntispamThreshold
var antispamExceptions antispam.Exceptions
+ var antispamCfg map[string]any
sourceNameMetaField := pipeline.DefaultSourceNameMetaField
avgInputEventSize := pipeline.DefaultAvgInputEventSize
maxInputEventSize := pipeline.DefaultMaxInputEventSize
@@ -101,6 +102,8 @@ func extractPipelineParams(settings *simplejson.Json) *pipeline.Settings {
}
antispamExceptions.Prepare()
+ antispamCfg = settings.Get("antispam").MustMap()
+
sourceNameMetaField = settings.Get("source_name_meta_field").MustString()
isStrict = settings.Get("is_strict").MustBool()
@@ -129,6 +132,7 @@ func extractPipelineParams(settings *simplejson.Json) *pipeline.Settings {
CutOffEventByLimitField: cutOffEventByLimitField,
AntispamThreshold: antispamThreshold,
AntispamExceptions: antispamExceptions,
+ Antispam: antispamCfg,
SourceNameMetaField: sourceNameMetaField,
MaintenanceInterval: maintenanceInterval,
EventTimeout: eventTimeout,
@@ -219,13 +223,13 @@ func extractMetrics(actionJSON *simplejson.Json) (string, []string, bool) {
return metricName, metricLabels, skipStatus
}
-func extractDoIfChecker(actionJSON *simplejson.Json) (*doif.Checker, error) {
+func extractDoIfChecker(actionJSON *simplejson.Json) (*do_if.Checker, error) {
m := actionJSON.MustMap()
if m == nil {
return nil, nil
}
- return doif.NewFromMap(m)
+ return do_if.NewFromMap(m)
}
func makeActionJSON(actionJSON *simplejson.Json) []byte {
diff --git a/pipeline/antispam/README.md b/pipeline/antispam/README.md
index 64d1a033f..cde2aa4ea 100644
--- a/pipeline/antispam/README.md
+++ b/pipeline/antispam/README.md
@@ -6,7 +6,77 @@ In some systems services might explode with logs due to different circumstances.
The main entity is `Antispammer`. It counts input data from the sources (e.g. if data comes from [file input plugin](/plugin/input/file/README.md), source can be filename) and decides whether to ban it or not. For each source it counts how many logs it has got, in other words the counter for the source is incremented for each incoming log. When the counter is greater or equal to the threshold value the source is banned until its counter is less than the threshold value. The counter value is decremented once in maintenance interval by the threshold value. The maintenance interval for antispam is the same as for the pipeline (see `maintenance_interval` in [pipeline settings](/pipeline/README.md#settings)).
-## Exceptions
+## Antispam config
+
+Example:
+
+```
+antispam:
+ threshold: 3000
+ rules:
+ - name: alert_agent
+ if:
+ op: and
+ operands:
+ - op: contains
+ data: meta.service
+ values:
+ - alerts-agent
+ - op: prefix
+ data: event
+ values:
+ - '{"level":"debug"'
+ threshold: -1
+ - name: viewer
+ if:
+ op: and
+ operands:
+ - op: contains
+ data: source_name
+ values:
+ - viewer
+ threshold: 5000
+```
+
+Antispammer iterates over rules, checks event and applies first matched rule.
+If event does not match any rule it will be limited with common threshold.
+
+### Antispam fields
+
+**`threshold`** **`int`**
+
+Common threshold applied to events that don't match any rule.
+Values:
+- `-1` - no limit;
+- `0` - discard all logs;
+- `> 0` - normal threshold value.
+
+**`rules`**
+
+Antispam rules array
+
+### Rule fields
+
+**`name`** **`string`**
+
+Name of rule. If set to nonempty string, adds label value for the `name` label in the `antispam_exceptions` metric.
+
+**`threshold`** **`int`**
+
+Rule threshold. Has the same value meanings as common threshold.
+
+**`if`**
+
+`do_if`-like condition tree (see [doc](../do_if/README.md)).
+Difference is we allowed only logical and data operations.
+We use `data` to point data to check instead of `field`.
+Values:
+- `event`
+- `source_name`
+- `meta.name` - get data to check from metadata by key `name`
+
+
+## Exceptions [deprecated: use rules instead]
Antispammer has some exception rules which can be applied by checking source name or log as raw bytes contents. If the log is matched by the rules it is not accounted for in the antispammer. It might be helpful for the logs from critical infrastructure services which must not be banned at all.
diff --git a/pipeline/antispam/antispammer.go b/pipeline/antispam/antispammer.go
index 4a5cbff16..74d9ed442 100644
--- a/pipeline/antispam/antispammer.go
+++ b/pipeline/antispam/antispammer.go
@@ -13,6 +13,11 @@ import (
"go.uber.org/zap"
)
+const (
+ thresholdUnlimited = -1
+ thresholdBlocked = 0
+)
+
// Antispammer makes a decision on the need to parse the input log.
// It can be useful when any application writes logs at speed faster than File.d can read it.
//
@@ -23,7 +28,8 @@ type Antispammer struct {
maintenanceInterval time.Duration
mu sync.RWMutex
sources map[string]source
- exceptions Exceptions
+ rules Rules
+ enabled bool
logger *zap.Logger
@@ -44,6 +50,7 @@ type Options struct {
Threshold int
UnbanIterations int
Exceptions Exceptions
+ Config map[string]any
Logger *zap.Logger
MetricsController *metric.Ctl
@@ -51,18 +58,10 @@ type Options struct {
}
func NewAntispammer(o *Options) *Antispammer {
- if o.Threshold > 0 {
- o.Logger.Info("antispam enabled",
- zap.Int("threshold", o.Threshold),
- zap.Duration("maintenance", o.MaintenanceInterval))
- }
-
a := &Antispammer{
unbanIterations: o.UnbanIterations,
- threshold: o.Threshold,
maintenanceInterval: o.MaintenanceInterval,
sources: make(map[string]source),
- exceptions: o.Exceptions,
logger: o.Logger,
activeMetric: o.MetricsController.RegisterGauge("antispam_active",
"Gauge indicates whether the antispam is enabled",
@@ -77,40 +76,94 @@ func NewAntispammer(o *Options) *Antispammer {
),
}
+ var err error
+
+ if o.Config != nil {
+ a.rules, a.threshold, err = extractAntispam(o.Config)
+ if err != nil {
+ o.Logger.Fatal("can't extract antispam", zap.Error(err))
+ }
+ } else {
+ a.rules, err = exceptionsToRules(o.Exceptions)
+ if err != nil {
+ o.Logger.Fatal("can't convert exceptions to rules")
+ }
+
+ if o.Threshold > 0 {
+ a.threshold = o.Threshold
+ } else {
+ a.threshold = thresholdUnlimited
+ }
+ }
+
+ a.enabled = a.threshold != thresholdUnlimited
+
+ for i := range a.rules {
+ a.rules[i].Prepare(i)
+ a.enabled = a.enabled || a.rules[i].Threshold != thresholdUnlimited
+ }
+
// not enabled by default
a.activeMetric.Set(0)
return a
}
-func (a *Antispammer) IsSpam(id string, name string, isNewSource bool, event []byte, timeEvent time.Time) bool {
- if a.threshold <= 0 {
+func (a *Antispammer) IsSpam(
+ id string,
+ name string,
+ isNewSource bool,
+ event []byte,
+ timeEvent time.Time,
+ meta map[string]string,
+) bool {
+ if !a.enabled {
return false
}
- for i := 0; i < len(a.exceptions); i++ {
- e := &a.exceptions[i]
- checkData := event
- if e.CheckSourceName {
- checkData = []byte(name)
- }
- if e.Match(checkData) {
- if e.Name != "" {
- a.exceptionMetric.WithLabelValues(e.Name).Inc()
+ rlMapKey := id
+ threshold := a.threshold
+
+ for i := range a.rules {
+ rule := &a.rules[i]
+ if rule.Condition.CheckRaw(event, []byte(name), meta) {
+ switch rule.Threshold {
+ case thresholdUnlimited:
+ if rule.Name != "" {
+ a.exceptionMetric.WithLabelValues(rule.Name).Inc()
+ }
+ return false
+ case thresholdBlocked:
+ return true
}
- return false
+
+ if rule.UniteSources {
+ rlMapKey = fmt.Sprintf("==%d==", rule.RuleID)
+ } else {
+ rlMapKey = fmt.Sprintf("==%d==%s==", rule.RuleID, id)
+ }
+
+ threshold = rule.Threshold
+ break
}
}
+ switch threshold {
+ case thresholdUnlimited:
+ return false
+ case thresholdBlocked:
+ return true
+ }
+
a.mu.RLock()
- src, has := a.sources[id]
+ src, has := a.sources[rlMapKey]
a.mu.RUnlock()
timeEventSeconds := timeEvent.UnixNano()
if !has {
a.mu.Lock()
- if newSrc, has := a.sources[id]; has {
+ if newSrc, has := a.sources[rlMapKey]; has {
src = newSrc
} else {
src = source{
@@ -119,7 +172,7 @@ func (a *Antispammer) IsSpam(id string, name string, isNewSource bool, event []b
timestamp: &atomic.Int64{},
}
src.timestamp.Add(timeEventSeconds)
- a.sources[id] = src
+ a.sources[rlMapKey] = src
}
a.mu.Unlock()
}
@@ -134,8 +187,8 @@ func (a *Antispammer) IsSpam(id string, name string, isNewSource bool, event []b
if diff < a.maintenanceInterval.Nanoseconds() {
x = src.counter.Inc()
}
- if x == int32(a.threshold) {
- src.counter.Swap(int32(a.unbanIterations * a.threshold))
+ if x == int32(threshold) {
+ src.counter.Swap(int32(a.unbanIterations * threshold))
a.activeMetric.Set(1)
a.banMetric.WithLabelValues(name).Inc()
a.logger.Warn("source has been banned",
@@ -146,7 +199,7 @@ func (a *Antispammer) IsSpam(id string, name string, isNewSource bool, event []b
)
}
- return x >= int32(a.threshold)
+ return x >= int32(threshold)
}
func (a *Antispammer) Maintenance() {
diff --git a/pipeline/antispam/antispammer_test.go b/pipeline/antispam/antispammer_test.go
index eb1a1a81e..6dfef5284 100644
--- a/pipeline/antispam/antispammer_test.go
+++ b/pipeline/antispam/antispammer_test.go
@@ -12,11 +12,12 @@ import (
"github.com/stretchr/testify/require"
)
-func newAntispammer(threshold, unbanIterations int, maintenanceInterval time.Duration) *Antispammer {
+func newAntispammer(threshold, unbanIterations int, maintenanceInterval time.Duration, exceptions Exceptions) *Antispammer {
holder := metric.NewHolder(time.Minute)
return NewAntispammer(&Options{
MaintenanceInterval: maintenanceInterval,
Threshold: threshold,
+ Exceptions: exceptions,
UnbanIterations: unbanIterations,
Logger: logger.Instance.Named("antispam").Desugar(),
MetricsController: metric.NewCtl("test", prometheus.NewRegistry()),
@@ -31,12 +32,12 @@ func TestAntispam(t *testing.T) {
unbanIterations := 2
maintenanceInterval := time.Second * 1
- antispamer := newAntispammer(threshold, unbanIterations, maintenanceInterval)
+ antispammer := newAntispammer(threshold, unbanIterations, maintenanceInterval, nil)
startTime := time.Now()
checkSpam := func(i int) bool {
eventTime := startTime.Add(time.Duration(i) * maintenanceInterval / 2)
- return antispamer.IsSpam("1", "test", false, []byte(`{}`), eventTime)
+ return antispammer.IsSpam("1", "test", false, []byte(`{}`), eventTime, nil)
}
for i := 1; i < threshold; i++ {
@@ -47,7 +48,7 @@ func TestAntispam(t *testing.T) {
for i := 0; i <= unbanIterations-1; i++ {
result := checkSpam(threshold + i)
r.True(result)
- antispamer.Maintenance()
+ antispammer.Maintenance()
}
result := checkSpam(threshold + 1)
@@ -61,12 +62,12 @@ func TestAntispamAfterRestart(t *testing.T) {
unbanIterations := 2
maintenanceInterval := time.Second * 1
- antispamer := newAntispammer(threshold, unbanIterations, maintenanceInterval)
+ antispamer := newAntispammer(threshold, unbanIterations, maintenanceInterval, nil)
startTime := time.Now()
checkSpam := func(i int) bool {
eventTime := startTime.Add(time.Duration(i) * maintenanceInterval)
- return antispamer.IsSpam("1", "test", false, []byte(`{}`), eventTime)
+ return antispamer.IsSpam("1", "test", false, []byte(`{}`), eventTime, nil)
}
for i := 1; i < threshold; i++ {
@@ -86,12 +87,10 @@ func TestAntispamExceptions(t *testing.T) {
unbanIterations := 2
maintenanceInterval := time.Second * 1
- antispamer := newAntispammer(threshold, unbanIterations, maintenanceInterval)
-
eventRulesetName := "test_event"
sourceRulesetName := "test_sourcename"
- antispamer.exceptions = Exceptions{
+ exceptions := Exceptions{
{
RuleSet: matchrule.RuleSet{
Name: eventRulesetName,
@@ -125,12 +124,13 @@ func TestAntispamExceptions(t *testing.T) {
},
},
}
- antispamer.exceptions.Prepare()
+
+ antispammer := newAntispammer(threshold, unbanIterations, maintenanceInterval, exceptions)
checkSpam := func(source, event string, wantMetric map[string]float64) {
- antispamer.IsSpam("1", source, true, []byte(event), now)
+ antispammer.IsSpam("1", source, true, []byte(event), now, nil)
for k, v := range wantMetric {
- r.Equal(v, testutil.ToFloat64(antispamer.exceptionMetric.WithLabelValues(k)))
+ r.Equal(v, testutil.ToFloat64(antispammer.exceptionMetric.WithLabelValues(k)))
}
}
diff --git a/pipeline/antispam/ctor.go b/pipeline/antispam/ctor.go
new file mode 100644
index 000000000..aa23afdbe
--- /dev/null
+++ b/pipeline/antispam/ctor.go
@@ -0,0 +1,101 @@
+package antispam
+
+import (
+ "fmt"
+
+ "github.com/ozontech/file.d/cfg"
+ "github.com/ozontech/file.d/pipeline/ctor"
+ "github.com/ozontech/file.d/pipeline/do_if"
+)
+
+const (
+ fieldNameRules = "rules"
+
+ fieldNameThreshold = "threshold"
+
+ fieldNameName = "name"
+ fieldNameIf = "if"
+ fieldNameUniteSources = "unite_sources"
+)
+
+func extractAntispam(node map[string]any) ([]Rule, int, error) {
+ thresholdNode, err := ctor.GetAny(node, fieldNameThreshold)
+ if err != nil {
+ return nil, 0, err
+ }
+
+ threshold, err := cfg.AnyToInt(thresholdNode)
+ if err != nil {
+ return nil, 0, err
+ }
+
+ var rules []Rule
+
+ rawRules, err := ctor.Get[[]any](node, fieldNameRules, nil)
+ if err != nil {
+ return nil, 0, err
+ }
+
+ rules, err = extractRules(rawRules)
+ if err != nil {
+ return nil, 0, err
+ }
+
+ return rules, threshold, nil
+}
+
+func extractRules(rawRules []any) ([]Rule, error) {
+ rules := make([]Rule, 0, len(rawRules))
+
+ for _, rawRule := range rawRules {
+ ruleNode, err := ctor.Must[map[string]any](rawRule)
+ if err != nil {
+ return nil, fmt.Errorf("rule type mismatch: %w", err)
+ }
+
+ rule, err := extractRule(ruleNode)
+ if err != nil {
+ return nil, err
+ }
+
+ rules = append(rules, rule)
+ }
+
+ return rules, nil
+}
+
+func extractRule(node map[string]any) (Rule, error) {
+ def := Rule{}
+
+ name, err := ctor.Get[string](node, fieldNameName, "")
+ if err != nil {
+ return def, err
+ }
+
+ uniteSources, err := ctor.Get[bool](node, fieldNameUniteSources, false)
+ if err != nil {
+ return def, err
+ }
+
+ condNode, err := ctor.Get[map[string]any](node, fieldNameIf)
+ if err != nil {
+ return def, err
+ }
+
+ cond, err := do_if.ExtractNode(condNode)
+ if err != nil {
+ return def, err
+ }
+
+ thresholdRaw, err := ctor.GetAny(node, fieldNameThreshold)
+ if err != nil {
+ return def, err
+ }
+
+ threshold, err := cfg.AnyToInt(thresholdRaw)
+ if err != nil {
+ return def, err
+ }
+
+ return newRule(name, cond, threshold, uniteSources)
+}
diff --git a/pipeline/antispam/rule.go b/pipeline/antispam/rule.go
new file mode 100644
index 000000000..56d288be7
--- /dev/null
+++ b/pipeline/antispam/rule.go
@@ -0,0 +1,70 @@
+package antispam
+
+import (
+ "fmt"
+
+ "github.com/ozontech/file.d/pipeline/do_if"
+)
+
+type Rule struct {
+ Name string
+ Condition do_if.Node
+ Threshold int
+ RuleID int
+ UniteSources bool
+}
+
+type Rules []Rule
+
+func (r *Rule) Prepare(id int) {
+ r.RuleID = id
+}
+
+func checkThreshold(threshold int) error {
+ if threshold < -1 {
+ return fmt.Errorf("invalid threshold: expected=(non-negative or -1) got=%d", threshold)
+ }
+
+ return nil
+}
+
+func newRule(name string, condition do_if.Node, threshold int, uniteSources bool) (Rule, error) {
+ if err := checkThreshold(threshold); err != nil {
+ return Rule{}, err
+ }
+
+ return Rule{
+ Name: name,
+ Condition: condition,
+ Threshold: threshold,
+ UniteSources: uniteSources,
+ }, nil
+}
+
+func exceptionToNode(exception Exception) (do_if.Node, error) {
+ dataTypeTag := do_if.DataTypeEventTag
+ if exception.CheckSourceName {
+ dataTypeTag = do_if.DataTypeSourceNameTag
+ }
+
+ return do_if.RuleSetToNode(exception.RuleSet, dataTypeTag)
+}
+
+func exceptionsToRules(exceptions Exceptions) (Rules, error) {
+ rules := make(Rules, 0, len(exceptions))
+ for _, e := range exceptions {
+ node, err := exceptionToNode(e)
+ if err != nil {
+ return nil, err
+ }
+
+ rule, err := newRule(e.RuleSet.Name, node, -1, false)
+ if err != nil {
+ return nil, err
+ }
+
+ rules = append(rules, rule)
+ }
+
+ return rules, nil
+}
diff --git a/pipeline/ctor/utils.go b/pipeline/ctor/utils.go
new file mode 100644
index 000000000..9329ced97
--- /dev/null
+++ b/pipeline/ctor/utils.go
@@ -0,0 +1,60 @@
+package ctor
+
+import (
+ "errors"
+ "fmt"
+)
+
+type Node = map[string]any
+
+var (
+ ErrFieldNotFound = errors.New("field not found")
+ ErrTypeMismatch = errors.New("type mismatch")
+)
+
+func GetAny(node Node, field string) (any, error) {
+ res, has := node[field]
+ if !has {
+ return nil, fmt.Errorf("field=%q: %w", field, ErrFieldNotFound)
+ }
+
+ return res, nil
+}
+
+func Must[T any](v any) (T, error) {
+ var def T
+
+ result, ok := v.(T)
+ if !ok {
+ return def, fmt.Errorf("%w: expected=%T got=%T", ErrTypeMismatch, def, v)
+ }
+
+ return result, nil
+}
+
+func Get[T any](node Node, field string, defValues ...T) (T, error) {
+ if len(defValues) > 1 {
+ panic("too many default values")
+ }
+
+ var def T
+
+ fieldNode, err := GetAny(node, field)
+ if err != nil {
+ if len(defValues) == 1 {
+ return defValues[0], nil
+ }
+
+ return def, err
+ }
+
+ result, ok := fieldNode.(T)
+ if !ok {
+ return def, fmt.Errorf(
+ "%w: field=%q expected=%T got=%T",
+ ErrTypeMismatch, field, def, fieldNode,
+ )
+ }
+
+ return result, nil
+}
diff --git a/pipeline/doif/README.idoc.md b/pipeline/do_if/README.idoc.md
similarity index 84%
rename from pipeline/doif/README.idoc.md
rename to pipeline/do_if/README.idoc.md
index aea515be0..d332934c3 100644
--- a/pipeline/doif/README.idoc.md
+++ b/pipeline/do_if/README.idoc.md
@@ -8,17 +8,17 @@ the chain of Match func calls are performed across the whole tree.
## Node types
@do-if-node|description
-## Field op node
-@do-if-field-op-node
+## String op node
+@do-if-string-op-node
-## Field operations
-@do-if-field-op|description
+## String operations
+@do-if-string-op
## Logical op node
@do-if-logical-op-node
## Logical operations
-@do-if-logical-op|description
+@do-if-logical-op
## Length comparison op node
@do-if-len-cmp-op-node
diff --git a/pipeline/doif/README.md b/pipeline/do_if/README.md
similarity index 89%
rename from pipeline/doif/README.md
rename to pipeline/do_if/README.md
index 759545365..acef657b7 100755
--- a/pipeline/doif/README.md
+++ b/pipeline/do_if/README.md
@@ -6,7 +6,7 @@ When Do If Checker's Match func is called it calls to the root Match func and th
the chain of Match func calls are performed across the whole tree.
## Node types
-**`FieldOp`** Type of node where matching rules for fields are stored.
+**`StringOp`** Type of node where string checks for fields are stored.
@@ -27,16 +27,16 @@ the chain of Match func calls are performed across the whole tree.
-## Field op node
-DoIf field op node is considered to always be a leaf in the DoIf tree. It checks byte representation of the value by the given field path.
+## String op node
+DoIf string op node is considered to always be a leaf in the DoIf tree. It checks byte representation of the value by the given field path.
Array and object values are considered as not matched since encoding them to bytes leads towards large CPU and memory consumption.
Params:
- - `op` - value from field operations list. Required.
+ - `op` - value from string operations list. Required.
- `field` - path to field in JSON tree. If empty, root value is checked. Path to nested fields is delimited by dots `"."`, e.g. `"field.subfield"` for `{"field": {"subfield": "val"}}`.
If the field name contains dots in it they should be shielded with `"\"`, e.g. `"exception\.type"` for `{"exception.type": "example"}`. Default empty.
- `values` - list of values to check field. Required non-empty.
- - `case_sensitive` - flag indicating whether checks are performed in case sensitive way. Default `true`.
+ - `case_sensitive` - flag indicating whether checks are performed in case-sensitive way. Default `true`.
Note: case insensitive checks can cause CPU and memory overhead since every field value will be converted to lower letters.
Example:
@@ -53,8 +53,8 @@ pipelines:
```
-## Field operations
-**`Equal`** checks whether the field value is equal to one of the elements in the values list.
+## String operations
+Operation `equal` checks whether the field value is equal to one of the elements in the values list.
Example:
```yaml
@@ -68,7 +68,7 @@ pipelines:
values: [test-pod-1, test-pod-2]
```
-result:
+Result:
```
{"pod":"test-pod-1","service":"test-service"} # discarded
{"pod":"test-pod-2","service":"test-service-2"} # discarded
@@ -78,7 +78,7 @@ result:
-**`Contains`** checks whether the field value contains one of the elements the in values list.
+Operation `contains` checks whether the field value contains one of the elements the in values list.
Example:
```yaml
@@ -92,7 +92,7 @@ pipelines:
values: [my-pod, my-test]
```
-result:
+Result:
```
{"pod":"test-my-pod-1","service":"test-service"} # discarded
{"pod":"test-not-my-pod","service":"test-service-2"} # discarded
@@ -102,7 +102,7 @@ result:
-**`Prefix`** checks whether the field value has prefix equal to one of the elements in the values list.
+Operation `prefix` checks whether the field value has prefix equal to one of the elements in the values list.
Example:
```yaml
@@ -116,7 +116,7 @@ pipelines:
values: [test-1, test-2]
```
-result:
+Result:
```
{"pod":"test-1-pod-1","service":"test-service"} # discarded
{"pod":"test-2-pod-2","service":"test-service-2"} # discarded
@@ -126,7 +126,7 @@ result:
-**`Suffix`** checks whether the field value has suffix equal to one of the elements in the values list.
+Operation `suffix` checks whether the field value has suffix equal to one of the elements in the values list.
Example:
```yaml
@@ -140,7 +140,7 @@ pipelines:
values: [pod-1, pod-2]
```
-result:
+Result:
```
{"pod":"test-1-pod-1","service":"test-service"} # discarded
{"pod":"test-2-pod-2","service":"test-service-2"} # discarded
@@ -150,7 +150,7 @@ result:
-**`Regex`** checks whether the field matches any regex from the values list.
+Operation `regex` checks whether the field matches any regex from the values list.
Example:
```yaml
@@ -164,7 +164,7 @@ pipelines:
values: [pod-\d, my-test.*]
```
-result:
+Result:
```
{"pod":"test-1-pod-1","service":"test-service"} # discarded
{"pod":"test-2-pod-2","service":"test-service-2"} # discarded
@@ -207,7 +207,7 @@ pipelines:
## Logical operations
-**`Or`** accepts at least one operand and returns true on the first returned true from its operands.
+Operation `or` accepts at least one operand and returns true on the first returned true from its operands.
Example:
```yaml
@@ -226,7 +226,7 @@ pipelines:
values: [test-service]
```
-result:
+Result:
```
{"pod":"test-pod-1","service":"test-service"} # discarded
{"pod":"test-pod-2","service":"test-service-2"} # discarded
@@ -236,7 +236,7 @@ result:
-**`And`** accepts at least one operand and returns true if all operands return true
+Operation `and` accepts at least one operand and returns true if all operands return true
(in other words returns false on the first returned false from its operands).
Example:
@@ -256,7 +256,7 @@ pipelines:
values: [test-service]
```
-result:
+Result:
```
{"pod":"test-pod-1","service":"test-service"} # discarded
{"pod":"test-pod-2","service":"test-service-2"} # not discarded
@@ -266,7 +266,7 @@ result:
-**`Not`** accepts exactly one operand and returns inverted result of its operand.
+Operation `not` accepts exactly one operand and returns inverted result of its operand.
Example:
```yaml
@@ -282,7 +282,7 @@ pipelines:
values: [test-service]
```
-result:
+Result:
```
{"pod":"test-pod-1","service":"test-service"} # not discarded
{"pod":"test-pod-2","service":"test-service-2"} # discarded
@@ -294,7 +294,7 @@ result:
## Length comparison op node
-DoIf length comparison op node is considered to always be a leaf in the DoIf tree like DoIf field op node.
+DoIf length comparison op node is considered to always be a leaf in the DoIf tree like DoIf string op node.
It contains operation that compares field length in bytes or array length (for array fields) with certain value.
Params:
@@ -361,7 +361,7 @@ They denote corresponding comparison operations.
| `ne` | `!=` |
## Timestamp comparison op node
-DoIf timestamp comparison op node is considered to always be a leaf in the DoIf tree like DoIf field op node.
+DoIf timestamp comparison op node is considered to always be a leaf in the DoIf tree like DoIf string op node.
It contains operation that compares timestamps with certain value.
Params:
diff --git a/pipeline/doif/check_type_op.go b/pipeline/do_if/check_type_op.go
similarity index 94%
rename from pipeline/doif/check_type_op.go
rename to pipeline/do_if/check_type_op.go
index fff31eec9..7adcce9b6 100644
--- a/pipeline/doif/check_type_op.go
+++ b/pipeline/do_if/check_type_op.go
@@ -1,4 +1,4 @@
-package doif
+package do_if
import (
"errors"
@@ -79,7 +79,7 @@ type checkTypeOpNode struct {
checkTypeFns []checkTypeFn
}
-func NewCheckTypeOpNode(field string, values [][]byte) (Node, error) {
+func newCheckTypeOpNode(field string, values [][]byte) (Node, error) {
if len(values) == 0 {
return nil, errors.New("values are not provided")
}
@@ -150,11 +150,11 @@ func NewCheckTypeOpNode(field string, values [][]byte) (Node, error) {
}, nil
}
-func (n *checkTypeOpNode) Type() NodeType {
+func (n *checkTypeOpNode) Type() nodeType {
return NodeCheckTypeOp
}
-func (n *checkTypeOpNode) Check(eventRoot *insaneJSON.Root) bool {
+func (n *checkTypeOpNode) checkEvent(eventRoot *insaneJSON.Root) bool {
node := eventRoot.Dig(n.fieldPath...)
for _, checkFn := range n.checkTypeFns {
if checkFn(node) {
@@ -164,6 +164,10 @@ func (n *checkTypeOpNode) Check(eventRoot *insaneJSON.Root) bool {
return false
}
+func (n *checkTypeOpNode) CheckRaw([]byte, []byte, map[string]string) bool {
+ panic("not impl")
+}
+
func (n *checkTypeOpNode) isEqualTo(n2 Node, _ int) error {
n2f, ok := n2.(*checkTypeOpNode)
if !ok {
diff --git a/pipeline/doif/check_type_test.go b/pipeline/do_if/check_type_test.go
similarity index 96%
rename from pipeline/doif/check_type_test.go
rename to pipeline/do_if/check_type_test.go
index f4dd33c34..b9a3cb118 100644
--- a/pipeline/doif/check_type_test.go
+++ b/pipeline/do_if/check_type_test.go
@@ -1,4 +1,4 @@
-package doif
+package do_if
import (
"testing"
@@ -204,7 +204,7 @@ func TestCheckType(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
var eventRoot *insaneJSON.Root
- node, err := NewCheckTypeOpNode(tt.node.field, tt.node.values)
+ node, err := newCheckTypeOpNode(tt.node.field, tt.node.values)
require.NoError(t, err)
for _, d := range tt.data {
if d.eventStr == "" {
@@ -213,7 +213,7 @@ func TestCheckType(t *testing.T) {
eventRoot, err = insaneJSON.DecodeString(d.eventStr)
require.NoError(t, err)
}
- got := node.Check(eventRoot)
+ got := node.checkEvent(eventRoot)
assert.Equal(t, d.want, got, "invalid result for event %q", d.eventStr)
}
})
@@ -357,8 +357,8 @@ func TestCheckTypeDuplicateValues(t *testing.T) {
tt := tt
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
- node, err := NewCheckTypeOpNode(tt.node.field, tt.node.values)
- require.NoError(t, err, "must be no error on NewCheckTypeOpNode")
+ node, err := newCheckTypeOpNode(tt.node.field, tt.node.values)
+ require.NoError(t, err, "must be no error on newCheckTypeOpNode")
ctnode, ok := node.(*checkTypeOpNode)
require.True(t, ok, "must be *checkTypeOpNode type")
assert.Equal(t, tt.expectedVals, len(ctnode.checkTypeFns))
@@ -366,7 +366,7 @@ func TestCheckTypeDuplicateValues(t *testing.T) {
eventStr := logsMap[d.checkType]
eventRoot, err := insaneJSON.DecodeString(eventStr)
require.NoError(t, err, "must be no error on decode checkEvent")
- got := ctnode.Check(eventRoot)
+ got := ctnode.checkEvent(eventRoot)
assert.Equal(t, d.want, got, "invalid result for check %d of type %q", i, d.checkType)
}
})
diff --git a/pipeline/doif/comparator.go b/pipeline/do_if/comparator.go
similarity index 98%
rename from pipeline/doif/comparator.go
rename to pipeline/do_if/comparator.go
index a16e90844..7a35a4e5b 100644
--- a/pipeline/doif/comparator.go
+++ b/pipeline/do_if/comparator.go
@@ -1,4 +1,4 @@
-package doif
+package do_if
import "fmt"
diff --git a/pipeline/do_if/converter.go b/pipeline/do_if/converter.go
new file mode 100644
index 000000000..b79d6128b
--- /dev/null
+++ b/pipeline/do_if/converter.go
@@ -0,0 +1,50 @@
+package do_if
+
+import (
+ "strings"
+
+ "github.com/ozontech/file.d/cfg/matchrule"
+ "github.com/ozontech/file.d/pipeline/do_if/logic"
+)
+
+func RuleToNode(rule matchrule.Rule, dataTypeTag string) (Node, error) {
+ values := arrStringToArrBytes(rule.Values)
+ node, err := newStringOpNode(
+ rule.Mode.ToString(),
+ !rule.CaseInsensitive,
+ values,
+ "",
+ dataTypeTag,
+ )
+ if err != nil {
+ return nil, err
+ }
+
+ if !rule.Invert {
+ return node, nil
+ }
+
+ return newLogicalNode(logic.NotTag, []Node{node})
+}
+
+func arrStringToArrBytes(a []string) [][]byte {
+ res := make([][]byte, 0, len(a))
+ for _, s := range a {
+ res = append(res, []byte(strings.Clone(s)))
+ }
+
+ return res
+}
+
+func RuleSetToNode(ruleSet matchrule.RuleSet, dataTypeTag string) (Node, error) {
+ operands := make([]Node, 0, len(ruleSet.Rules))
+ for _, r := range ruleSet.Rules {
+ operand, err := RuleToNode(r, dataTypeTag)
+ if err != nil {
+ return nil, err
+ }
+ operands = append(operands, operand)
+ }
+
+ return newLogicalNode(ruleSet.Cond.ToString(), operands)
+}
diff --git a/pipeline/do_if/converter_test.go b/pipeline/do_if/converter_test.go
new file mode 100644
index 000000000..0f36cdb09
--- /dev/null
+++ b/pipeline/do_if/converter_test.go
@@ -0,0 +1,104 @@
+package do_if
+
+import (
+ "testing"
+
+ "github.com/ozontech/file.d/cfg/matchrule"
+ "github.com/ozontech/file.d/pipeline/do_if/logic"
+ "github.com/stretchr/testify/require"
+)
+
+func TestRuleSetToNode(t *testing.T) {
+ ruleSets := []matchrule.RuleSet{
+ {
+ Name: "sample_or",
+ Cond: matchrule.CondOr,
+ Rules: genAllRules(),
+ },
+ {
+ Name: "sample_and",
+ Cond: matchrule.CondAnd,
+ Rules: genAllRules(),
+ },
+ }
+
+ for _, ruleSet := range ruleSets {
+ ruleSet.Prepare()
+
+ rawNode, err := RuleSetToNode(ruleSet, DataTypeEventTag)
+ require.NoError(t, err)
+
+ logicNode := rawNode.(*logicalNode)
+ require.Equal(t, logicNode.op.String(), ruleSet.Cond.ToString())
+ require.Equal(t, len(logicNode.operands), len(ruleSet.Rules))
+
+ for i := range len(logicNode.operands) {
+ cmpRuleAndNode(t, ruleSet.Rules[i], logicNode.operands[i])
+ }
+ }
+}
+
+func TestRuleToNode(t *testing.T) {
+ for _, rule := range genAllRules() {
+ rule.Prepare()
+
+ node, err := RuleToNode(rule, DataTypeEventTag)
+ require.NoError(t, err)
+
+ cmpRuleAndNode(t, rule, node)
+ }
+}
+
+func cmpRuleAndNode(t *testing.T, rule matchrule.Rule, node Node) {
+ if rule.Invert {
+ nLogic := node.(*logicalNode)
+ require.Equal(t, nLogic.op, logic.Not)
+ require.Equal(t, len(nLogic.operands), 1)
+
+ node = nLogic.operands[0]
+ }
+
+ nStr := node.(*stringOpNode)
+ require.Equal(t, nStr.dataType, dataTypeEvent)
+
+ c := nStr.checker
+ require.Equal(t, c.MinValLen, rule.GetMinValueSize())
+ require.Equal(t, c.MinValLen, rule.GetMinValueSize())
+ require.True(t, c.ValuesBySize == nil)
+ require.Equal(t, c.Op.String(), rule.Mode.ToString())
+ require.Equal(t, c.CaseSensitive, !rule.CaseInsensitive)
+ require.Equal(t, c.Values, arrStringToArrBytes(rule.Values))
+}
+
+func genAllRules() []matchrule.Rule {
+ arrValues := [][]string{
+ {"val1", "val2", "val3", "val4"},
+ {"a", "bb", "ccc"},
+ }
+ modes := []matchrule.Mode{
+ matchrule.ModePrefix,
+ matchrule.ModeContains,
+ matchrule.ModeSuffix,
+ }
+ boolVals := []bool{false, true}
+
+ var rules []matchrule.Rule
+
+ for _, values := range arrValues {
+ for _, mode := range modes {
+ for _, caseSensitive := range boolVals {
+ for _, invert := range boolVals {
+ rule := matchrule.Rule{
+ Values: values,
+ Mode: mode,
+ CaseInsensitive: caseSensitive,
+ Invert: invert,
+ }
+ rules = append(rules, rule)
+ }
+ }
+ }
+ }
+
+ return rules
+}
diff --git a/pipeline/doif/ctor.go b/pipeline/do_if/ctor.go
similarity index 53%
rename from pipeline/doif/ctor.go
rename to pipeline/do_if/ctor.go
index 56f08f36b..e3b2fcd6c 100644
--- a/pipeline/doif/ctor.go
+++ b/pipeline/do_if/ctor.go
@@ -1,14 +1,21 @@
-package doif
+package do_if
import (
"errors"
"fmt"
"time"
+
+ "github.com/ozontech/file.d/cfg"
+ "github.com/ozontech/file.d/pipeline/ctor"
+ "github.com/ozontech/file.d/pipeline/do_if/logic"
+ "github.com/ozontech/file.d/pipeline/do_if/str_checker"
)
const (
- fieldNameOp = "op"
+ fieldNameOp = "op"
+
fieldNameField = "field"
+ fieldNameData = "data"
fieldNameCaseSensitive = "case_sensitive"
@@ -27,6 +34,7 @@ const (
tsCmpValueNowTag = "now"
tsCmpValueStartTag = "file_d_start"
+ defaultTsCmpValueShift = 0 * time.Second
defaultTsCmpValUpdateInterval = 10 * time.Second
defaultTsFormat = "rfc3339nano"
@@ -34,7 +42,7 @@ const (
)
func NewFromMap(m map[string]any) (*Checker, error) {
- root, err := extractDoIfNode(m)
+ root, err := ExtractNode(m)
if err != nil {
return nil, fmt.Errorf("extract nodes: %w", err)
}
@@ -44,22 +52,25 @@ func NewFromMap(m map[string]any) (*Checker, error) {
}, nil
}
-func extractDoIfNode(node map[string]any) (Node, error) {
- opName, err := get[string](node, fieldNameOp)
+func ExtractNode(node ctor.Node) (Node, error) {
+ opName, err := ctor.Get[string](node, fieldNameOp)
if err != nil {
return nil, err
}
switch opName {
- case "and", "or", "not":
+ case
+ logic.AndTag,
+ logic.OrTag,
+ logic.NotTag:
return extractLogicalOpNode(opName, node)
case
- "equal",
- "contains",
- "prefix",
- "suffix",
- "regex":
- return extractFieldOpNode(opName, node)
+ str_checker.OpEqualTag,
+ str_checker.OpContainsTag,
+ str_checker.OpPrefixTag,
+ str_checker.OpSuffixTag,
+ str_checker.OpRegexTag:
+ return extractStringOpNode(opName, node)
case
"byte_len_cmp",
"array_len_cmp":
@@ -73,38 +84,49 @@ func extractDoIfNode(node map[string]any) (Node, error) {
}
}
-func extractFieldOpNode(opName string, node map[string]any) (Node, error) {
+func extractStringOpNode(opName string, node map[string]any) (Node, error) {
var result Node
var err error
- fieldPath, err := get[string](node, fieldNameField)
- if err != nil {
+ fieldPath, err := ctor.Get[string](node, fieldNameField)
+ fieldPathFound := err == nil
+ if errors.Is(err, ctor.ErrTypeMismatch) {
return nil, err
}
- caseSensitive := true
- caseSensitiveNode, err := get[bool](node, fieldNameCaseSensitive)
- if err == nil {
- caseSensitive = caseSensitiveNode
- } else if errors.Is(err, errTypeMismatch) {
+ dataTypeTag, err := ctor.Get[string](node, fieldNameData)
+ dataTypeTagFound := err == nil
+ if errors.Is(err, ctor.ErrTypeMismatch) {
+ return nil, err
+ }
+
+ switch {
+ case fieldPathFound && dataTypeTagFound:
+ return nil, errors.New("field selector and data type tag provided")
+ case !fieldPathFound && !dataTypeTagFound:
+ return nil, errors.New("field selector and data type tag are not provided")
+ }
+
+ caseSensitive, err := ctor.Get[bool](node, fieldNameCaseSensitive, true)
+ if err != nil {
return nil, err
}
vals, err := extractOpValues(node)
if err != nil {
- return nil, fmt.Errorf("extract field op values: %w", err)
+ return nil, fmt.Errorf("extract string op values: %w", err)
}
- result, err = NewFieldOpNode(opName, fieldPath, caseSensitive, vals)
+ result, err = newStringOpNode(opName, caseSensitive, vals, fieldPath, dataTypeTag)
if err != nil {
- return nil, fmt.Errorf("init field op: %w", err)
+ return nil, fmt.Errorf("init string op: %w", err)
}
return result, nil
}
func extractOpValues(node map[string]any) ([][]byte, error) {
- valuesRaw, err := getAny(node, fieldNameValues)
+ valuesRaw, err := ctor.GetAny(node, fieldNameValues)
if err != nil {
return nil, err
}
@@ -139,41 +161,41 @@ func extractOpValuesFromArr(values []any) ([][]byte, error) {
}
func extractLengthCmpOpNode(opName string, node map[string]any) (Node, error) {
- fieldPath, err := get[string](node, fieldNameField)
+ fieldPath, err := ctor.Get[string](node, fieldNameField)
if err != nil {
return nil, err
}
- cmpOp, err := get[string](node, fieldNameCmpOp)
+ cmpOp, err := ctor.Get[string](node, fieldNameCmpOp)
if err != nil {
return nil, err
}
- cmpValueRaw, err := getAny(node, fieldNameCmpValue)
+ cmpValueRaw, err := ctor.GetAny(node, fieldNameCmpValue)
if err != nil {
return nil, err
}
- cmpValue, err := anyToInt(cmpValueRaw)
+ cmpValue, err := cfg.AnyToInt(cmpValueRaw)
if err != nil {
return nil, err
}
- return NewLenCmpOpNode(opName, fieldPath, cmpOp, cmpValue)
+ return newLenCmpOpNode(opName, fieldPath, cmpOp, cmpValue)
}
func extractTsCmpOpNode(_ string, node map[string]any) (Node, error) {
- fieldPath, err := get[string](node, fieldNameField)
+ fieldPath, err := ctor.Get[string](node, fieldNameField)
if err != nil {
return nil, err
}
- cmpOp, err := get[string](node, fieldNameCmpOp)
+ cmpOp, err := ctor.Get[string](node, fieldNameCmpOp)
if err != nil {
return nil, err
}
- rawCmpValue, err := get[string](node, fieldNameCmpValue)
+ rawCmpValue, err := ctor.Get[string](node, fieldNameCmpValue)
if err != nil {
return nil, err
}
@@ -195,41 +217,44 @@ func extractTsCmpOpNode(_ string, node map[string]any) (Node, error) {
}
}
- format := defaultTsFormat
- str, err := get[string](node, fieldNameFormat)
- if err == nil {
- format = str
- } else if errors.Is(err, errTypeMismatch) {
+ format, err := ctor.Get[string](node, fieldNameFormat, defaultTsFormat)
+ if err != nil {
return nil, err
}
- cmpValueShift := time.Duration(0)
- str, err = get[string](node, fieldNameCmpValueShift)
- if err == nil {
- cmpValueShift, err = time.ParseDuration(str)
- if err != nil {
- return nil, fmt.Errorf("parse cmp value shift: %w", err)
- }
- } else if errors.Is(err, errTypeMismatch) {
+ cmpValueShiftStr, err := ctor.Get[string](
+ node,
+ fieldNameCmpValueShift,
+ defaultTsCmpValueShift.String(),
+ )
+ if err != nil {
return nil, err
}
- updateInterval := defaultTsCmpValUpdateInterval
- str, err = get[string](node, fieldNameUpdateInterval)
- if err == nil {
- updateInterval, err = time.ParseDuration(str)
- if err != nil {
- return nil, fmt.Errorf("parse update interval: %w", err)
- }
- } else if errors.Is(err, errTypeMismatch) {
+ cmpValueShift, err := time.ParseDuration(cmpValueShiftStr)
+ if err != nil {
+ return nil, fmt.Errorf("parse cmp value shift: %w", err)
+ }
+
+ updateIntervalStr, err := ctor.Get[string](
+ node,
+ fieldNameUpdateInterval,
+ defaultTsCmpValUpdateInterval.String(),
+ )
+ if err != nil {
return nil, err
}
- return NewTsCmpOpNode(fieldPath, format, cmpOp, cmpMode, cmpValue, cmpValueShift, updateInterval)
+ updateInterval, err := time.ParseDuration(updateIntervalStr)
+ if err != nil {
+ return nil, fmt.Errorf("parse update interval: %w", err)
+ }
+
+ return newTsCmpOpNode(fieldPath, format, cmpOp, cmpMode, cmpValue, cmpValueShift, updateInterval)
}
func extractCheckTypeOpNode(_ string, node map[string]any) (Node, error) {
- fieldPath, err := get[string](node, fieldNameField)
+ fieldPath, err := ctor.Get[string](node, fieldNameField)
if err != nil {
return nil, err
}
@@ -239,7 +264,7 @@ func extractCheckTypeOpNode(_ string, node map[string]any) (Node, error) {
return nil, fmt.Errorf("extract check type op values: %w", err)
}
- result, err := NewCheckTypeOpNode(fieldPath, vals)
+ result, err := newCheckTypeOpNode(fieldPath, vals)
if err != nil {
return nil, fmt.Errorf("init check_type op: %w", err)
}
@@ -248,7 +273,7 @@ func extractCheckTypeOpNode(_ string, node map[string]any) (Node, error) {
}
func extractLogicalOpNode(opName string, node map[string]any) (Node, error) {
- rawOperands, err := get[[]any](node, fieldNameOperands)
+ rawOperands, err := ctor.Get[[]any](node, fieldNameOperands)
if err != nil {
return nil, err
}
@@ -256,21 +281,22 @@ func extractLogicalOpNode(opName string, node map[string]any) (Node, error) {
operands := make([]Node, 0)
for _, rawOperand := range rawOperands {
- operandMap, ok := rawOperand.(map[string]any)
- if !ok {
- return nil, fmt.Errorf(
- "logical node operand type mismatch: expected=map[string]any got=%T",
- rawOperand)
+ var operandNode map[string]any
+ operandNode, err = ctor.Must[map[string]any](rawOperand)
+ if err != nil {
+ return nil, fmt.Errorf("logical node operand type mismatch: %w", err)
}
- operand, err := extractDoIfNode(operandMap)
+ var operand Node
+ operand, err = ExtractNode(operandNode)
if err != nil {
return nil, fmt.Errorf("extract operand for logical op %q: %w", opName, err)
}
+
operands = append(operands, operand)
}
- result, err := NewLogicalNode(opName, operands)
+ result, err := newLogicalNode(opName, operands)
if err != nil {
return nil, fmt.Errorf("init logical node: %w", err)
}
diff --git a/pipeline/doif/ctor_test.go b/pipeline/do_if/ctor_test.go
similarity index 85%
rename from pipeline/doif/ctor_test.go
rename to pipeline/do_if/ctor_test.go
index 7a52453ab..6a2896d4d 100644
--- a/pipeline/doif/ctor_test.go
+++ b/pipeline/do_if/ctor_test.go
@@ -1,9 +1,7 @@
-package doif
+package do_if
import (
"bytes"
- "errors"
- "fmt"
"testing"
"time"
@@ -12,74 +10,6 @@ import (
"github.com/stretchr/testify/require"
)
-type doIfTreeNode struct {
- fieldOp string
- fieldName string
- caseSensitive bool
- values [][]byte
-
- logicalOp string
- operands []*doIfTreeNode
-
- lenCmpOp string
- cmpOp string
- cmpValue int
-
- tsCmpOp bool
- tsFormat string
- tsCmpValChangeMode string
- tsCmpValue time.Time
- tsCmpValueShift time.Duration
- tsUpdateInterval time.Duration
-
- checkTypeOp bool
-}
-
-// nolint:gocritic
-func buildDoIfTree(node *doIfTreeNode) (Node, error) {
- switch {
- case node.fieldOp != "":
- return NewFieldOpNode(
- node.fieldOp,
- node.fieldName,
- node.caseSensitive,
- node.values,
- )
- case node.logicalOp != "":
- operands := make([]Node, 0)
- for _, operandNode := range node.operands {
- operand, err := buildDoIfTree(operandNode)
- if err != nil {
- return nil, fmt.Errorf("failed to build tree: %w", err)
- }
- operands = append(operands, operand)
- }
- return NewLogicalNode(
- node.logicalOp,
- operands,
- )
- case node.lenCmpOp != "":
- return NewLenCmpOpNode(node.lenCmpOp, node.fieldName, node.cmpOp, node.cmpValue)
- case node.tsCmpOp:
- return NewTsCmpOpNode(
- node.fieldName,
- node.tsFormat,
- node.cmpOp,
- node.tsCmpValChangeMode,
- node.tsCmpValue,
- node.tsCmpValueShift,
- node.tsUpdateInterval,
- )
- case node.checkTypeOp:
- return NewCheckTypeOpNode(
- node.fieldName,
- node.values,
- )
- default:
- return nil, errors.New("unknown type of node")
- }
-}
-
func Test_extractDoIfChecker(t *testing.T) {
type args struct {
cfgStr string
@@ -88,7 +18,7 @@ func Test_extractDoIfChecker(t *testing.T) {
tests := []struct {
name string
args args
- want *doIfTreeNode
+ want *treeNode
wantErr bool
}{
{
@@ -160,20 +90,20 @@ func Test_extractDoIfChecker(t *testing.T) {
}
`,
},
- want: &doIfTreeNode{
+ want: &treeNode{
logicalOp: "not",
- operands: []*doIfTreeNode{
+ operands: []treeNode{
{
logicalOp: "and",
- operands: []*doIfTreeNode{
+ operands: []treeNode{
{
- fieldOp: "equal",
+ stringOp: "equal",
fieldName: "service",
values: [][]byte{nil, []byte("")},
caseSensitive: false,
},
{
- fieldOp: "prefix",
+ stringOp: "prefix",
fieldName: "log.msg",
values: [][]byte{[]byte("test-1"), []byte("test-2")},
caseSensitive: false,
@@ -201,21 +131,21 @@ func Test_extractDoIfChecker(t *testing.T) {
},
{
logicalOp: "or",
- operands: []*doIfTreeNode{
+ operands: []treeNode{
{
- fieldOp: "suffix",
+ stringOp: "suffix",
fieldName: "service",
values: [][]byte{[]byte("test-svc-1"), []byte("test-svc-2")},
caseSensitive: true,
},
{
- fieldOp: "contains",
+ stringOp: "contains",
fieldName: "pod",
values: [][]byte{[]byte("test")},
caseSensitive: true,
},
{
- fieldOp: "regex",
+ stringOp: "regex",
fieldName: "message",
values: [][]byte{[]byte(`test-\d+`), []byte(`test-msg-\d+`)},
caseSensitive: true,
@@ -239,7 +169,7 @@ func Test_extractDoIfChecker(t *testing.T) {
args: args{
cfgStr: `{"op":"byte_len_cmp","field":"data","cmp_op":"lt","value":10}`,
},
- want: &doIfTreeNode{
+ want: &treeNode{
lenCmpOp: "byte_len_cmp",
cmpOp: "lt",
fieldName: "data",
@@ -251,7 +181,7 @@ func Test_extractDoIfChecker(t *testing.T) {
args: args{
cfgStr: `{"op":"array_len_cmp","field":"items","cmp_op":"lt","value":10}`,
},
- want: &doIfTreeNode{
+ want: &treeNode{
lenCmpOp: "array_len_cmp",
cmpOp: "lt",
fieldName: "items",
@@ -270,7 +200,7 @@ func Test_extractDoIfChecker(t *testing.T) {
"format": "2006-01-02T15:04:05Z07:00",
"update_interval": "15s"}`,
},
- want: &doIfTreeNode{
+ want: &treeNode{
tsCmpOp: true,
cmpOp: "lt",
fieldName: "timestamp",
@@ -290,7 +220,7 @@ func Test_extractDoIfChecker(t *testing.T) {
"cmp_op": "lt",
"value": "now"}`,
},
- want: &doIfTreeNode{
+ want: &treeNode{
tsCmpOp: true,
cmpOp: "lt",
fieldName: "timestamp",
@@ -310,7 +240,7 @@ func Test_extractDoIfChecker(t *testing.T) {
"format": "rfc3339",
"value": "now"}`,
},
- want: &doIfTreeNode{
+ want: &treeNode{
tsCmpOp: true,
cmpOp: "lt",
fieldName: "timestamp",
@@ -330,7 +260,7 @@ func Test_extractDoIfChecker(t *testing.T) {
"values": ["obj","arr"]
}`,
},
- want: &doIfTreeNode{
+ want: &treeNode{
checkTypeOp: true,
fieldName: "log",
values: [][]byte{
@@ -351,23 +281,23 @@ func Test_extractDoIfChecker(t *testing.T) {
]
}`,
},
- want: &doIfTreeNode{
+ want: &treeNode{
logicalOp: "or",
- operands: []*doIfTreeNode{
+ operands: []treeNode{
{
- fieldOp: "equal",
+ stringOp: "equal",
fieldName: "service",
values: [][]byte{nil},
caseSensitive: true,
},
{
- fieldOp: "equal",
+ stringOp: "equal",
fieldName: "service",
values: [][]byte{[]byte("")},
caseSensitive: true,
},
{
- fieldOp: "equal",
+ stringOp: "equal",
fieldName: "service",
values: [][]byte{[]byte("test")},
caseSensitive: true,
@@ -640,7 +570,7 @@ func Test_extractDoIfChecker(t *testing.T) {
assert.Nil(t, got)
return
}
- wantTree, err := buildDoIfTree(tt.want)
+ wantTree, err := buildTree(tt.want)
require.NoError(t, err)
wantDoIfChecker := newChecker(wantTree)
assert.NoError(t, wantDoIfChecker.IsEqualTo(got))
diff --git a/pipeline/doif/do_if.go b/pipeline/do_if/do_if.go
similarity index 64%
rename from pipeline/doif/do_if.go
rename to pipeline/do_if/do_if.go
index 599fe1508..76db4ee14 100644
--- a/pipeline/doif/do_if.go
+++ b/pipeline/do_if/do_if.go
@@ -1,4 +1,4 @@
-package doif
+package do_if
import (
insaneJSON "github.com/ozontech/insane-json"
@@ -7,13 +7,13 @@ import (
// ! do-if-node
// ^ do-if-node
-type NodeType int
+type nodeType int
const (
- NodeUnknownType NodeType = iota
+ NodeUnknownType nodeType = iota
- // > Type of node where matching rules for fields are stored.
- NodeFieldOp // *
+ // > Type of node where string checks for fields are stored.
+ NodeStringOp // *
// > Type of node where matching rules for byte length and array length are stored.
NodeLengthCmpOp // *
@@ -29,8 +29,9 @@ const (
)
type Node interface {
- Type() NodeType
- Check(*insaneJSON.Root) bool
+ Type() nodeType
+ checkEvent(*insaneJSON.Root) bool
+ CheckRaw(event []byte, sourceName []byte, metadata map[string]string) bool
isEqualTo(Node, int) error
}
@@ -52,5 +53,9 @@ func (c *Checker) Check(eventRoot *insaneJSON.Root) bool {
if eventRoot == nil {
return false
}
- return c.root.Check(eventRoot)
+ return c.root.checkEvent(eventRoot)
+}
+
+func (c *Checker) CheckRaw(event []byte, sourceName []byte, metadata map[string]string) bool {
+ return c.root.CheckRaw(event, sourceName, metadata)
}
diff --git a/pipeline/doif/do_if_test.go b/pipeline/do_if/do_if_test.go
similarity index 87%
rename from pipeline/doif/do_if_test.go
rename to pipeline/do_if/do_if_test.go
index 045c0b06b..f36a0391e 100644
--- a/pipeline/doif/do_if_test.go
+++ b/pipeline/do_if/do_if_test.go
@@ -1,4 +1,4 @@
-package doif
+package do_if
import (
"errors"
@@ -8,14 +8,17 @@ import (
"testing"
"time"
+ "github.com/ozontech/file.d/pipeline/do_if/logic"
+ "github.com/ozontech/file.d/pipeline/do_if/str_checker"
insaneJSON "github.com/ozontech/insane-json"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
type treeNode struct {
- fieldOp string
- fieldName string
+ fieldName string
+
+ stringOp string
caseSensitive bool
values [][]byte
@@ -32,35 +35,38 @@ type treeNode struct {
tsCmpValue time.Time
tsCmpValueShift time.Duration
tsUpdateInterval time.Duration
+
+ checkTypeOp bool
}
// nolint:gocritic
-func buildTree(node treeNode) (Node, error) {
+func buildTree(node *treeNode) (Node, error) {
switch {
- case node.fieldOp != "":
- return NewFieldOpNode(
- node.fieldOp,
- node.fieldName,
+ case node.stringOp != "":
+ return newStringOpNode(
+ node.stringOp,
node.caseSensitive,
node.values,
+ node.fieldName,
+ "",
)
case node.logicalOp != "":
operands := make([]Node, 0)
- for _, operandNode := range node.operands {
- operand, err := buildTree(operandNode)
+ for i := range node.operands {
+ operand, err := buildTree(&node.operands[i])
if err != nil {
return nil, fmt.Errorf("failed to build tree: %w", err)
}
operands = append(operands, operand)
}
- return NewLogicalNode(
+ return newLogicalNode(
node.logicalOp,
operands,
)
case node.lenCmpOp != "":
- return NewLenCmpOpNode(node.lenCmpOp, node.fieldName, node.cmpOp, node.cmpValue)
+ return newLenCmpOpNode(node.lenCmpOp, node.fieldName, node.cmpOp, node.cmpValue)
case node.tsCmpOp:
- return NewTsCmpOpNode(
+ return newTsCmpOpNode(
node.fieldName,
node.tsFormat,
node.cmpOp,
@@ -69,6 +75,11 @@ func buildTree(node treeNode) (Node, error) {
node.tsCmpValueShift,
node.tsUpdateInterval,
)
+ case node.checkTypeOp:
+ return newCheckTypeOpNode(
+ node.fieldName,
+ node.values,
+ )
default:
return nil, errors.New("unknown type of node")
}
@@ -77,40 +88,12 @@ func buildTree(node treeNode) (Node, error) {
func checkNode(t *testing.T, want, got Node) {
require.Equal(t, want.Type(), got.Type())
switch want.Type() {
- case NodeFieldOp:
- wantNode := want.(*fieldOpNode)
- gotNode := got.(*fieldOpNode)
- assert.Equal(t, wantNode.op, gotNode.op)
+ case NodeStringOp:
+ wantNode := want.(*stringOpNode)
+ gotNode := got.(*stringOpNode)
assert.Equal(t, 0, slices.Compare[[]string](wantNode.fieldPath, gotNode.fieldPath))
assert.Equal(t, wantNode.fieldPathStr, gotNode.fieldPathStr)
- assert.Equal(t, wantNode.caseSensitive, gotNode.caseSensitive)
- if wantNode.values == nil {
- assert.Equal(t, wantNode.values, gotNode.values)
- } else {
- require.Equal(t, len(wantNode.values), len(gotNode.values))
- for i := 0; i < len(wantNode.values); i++ {
- wantValues := wantNode.values[i]
- gotValues := gotNode.values[i]
- assert.Equal(t, 0, slices.Compare[[]byte](wantValues, gotValues))
- }
- }
- if wantNode.valuesBySize == nil {
- assert.Equal(t, wantNode.valuesBySize, gotNode.valuesBySize)
- } else {
- require.Equal(t, len(wantNode.valuesBySize), len(gotNode.valuesBySize))
- for k, wantVals := range wantNode.valuesBySize {
- gotVals, ok := gotNode.valuesBySize[k]
- assert.True(t, ok, "values by key %d not present in got node", k)
- if ok {
- require.Equal(t, len(wantVals), len(gotVals))
- for i := 0; i < len(wantVals); i++ {
- assert.Equal(t, 0, slices.Compare[[]byte](wantVals[i], gotVals[i]))
- }
- }
- }
- }
- assert.Equal(t, wantNode.minValLen, gotNode.minValLen)
- assert.Equal(t, wantNode.maxValLen, gotNode.maxValLen)
+ assert.NoError(t, str_checker.Equal(&wantNode.checker, &gotNode.checker))
case NodeLogicalOp:
wantNode := want.(*logicalNode)
gotNode := got.(*logicalNode)
@@ -151,63 +134,39 @@ func TestBuildNodes(t *testing.T) {
wantErr bool
}{
{
- name: "ok_field_op_node",
+ name: "ok_string_op_node",
tree: treeNode{
- fieldOp: "equal",
+ stringOp: "equal",
fieldName: "log.pod",
caseSensitive: true,
values: [][]byte{[]byte(`test-111`), []byte(`test-2`), []byte(`test-3`), []byte(`test-12345`)},
},
- want: &fieldOpNode{
- op: fieldEqualOp,
- fieldPath: []string{"log", "pod"},
- fieldPathStr: "log.pod",
- caseSensitive: true,
- values: nil,
- valuesBySize: map[int][][]byte{
- 6: [][]byte{
- []byte(`test-2`),
- []byte(`test-3`),
- },
- 8: [][]byte{
- []byte(`test-111`),
- },
- 10: [][]byte{
- []byte(`test-12345`),
- },
- },
- minValLen: 6,
- maxValLen: 10,
+ want: &stringOpNode{
+ fieldPath: []string{"log", "pod"},
+ fieldPathStr: "log.pod",
+ checker: str_checker.MustNew(
+ "equal",
+ true,
+ [][]byte{[]byte(`test-111`), []byte(`test-2`), []byte(`test-3`), []byte(`test-12345`)},
+ ),
},
},
{
- name: "ok_field_op_node_case_insensitive",
+ name: "ok_string_op_node_case_insensitive",
tree: treeNode{
- fieldOp: "equal",
+ stringOp: "equal",
fieldName: "log.pod",
caseSensitive: false,
values: [][]byte{[]byte(`TEST-111`), []byte(`Test-2`), []byte(`tesT-3`), []byte(`TeSt-12345`)},
},
- want: &fieldOpNode{
- op: fieldEqualOp,
- fieldPath: []string{"log", "pod"},
- fieldPathStr: "log.pod",
- caseSensitive: false,
- values: nil,
- valuesBySize: map[int][][]byte{
- 6: [][]byte{
- []byte(`test-2`),
- []byte(`test-3`),
- },
- 8: [][]byte{
- []byte(`test-111`),
- },
- 10: [][]byte{
- []byte(`test-12345`),
- },
- },
- minValLen: 6,
- maxValLen: 10,
+ want: &stringOpNode{
+ fieldPath: []string{"log", "pod"},
+ fieldPathStr: "log.pod",
+ checker: str_checker.MustNew(
+ "equal",
+ false,
+ [][]byte{[]byte(`TEST-111`), []byte(`Test-2`), []byte(`tesT-3`), []byte(`TeSt-12345`)},
+ ),
},
},
{
@@ -216,13 +175,13 @@ func TestBuildNodes(t *testing.T) {
logicalOp: "or",
operands: []treeNode{
{
- fieldOp: "equal",
+ stringOp: "equal",
fieldName: "log.pod",
caseSensitive: true,
values: [][]byte{[]byte(`test-111`), []byte(`test-2`), []byte(`test-3`), []byte(`test-12345`)},
},
{
- fieldOp: "contains",
+ stringOp: "contains",
fieldName: "service.msg",
caseSensitive: true,
values: [][]byte{[]byte(`test-0987`), []byte(`test-11`)},
@@ -230,40 +189,25 @@ func TestBuildNodes(t *testing.T) {
},
},
want: &logicalNode{
- op: logicalOr,
+ op: logic.Or,
operands: []Node{
- &fieldOpNode{
- op: fieldEqualOp,
- fieldPath: []string{"log", "pod"},
- fieldPathStr: "log.pod",
- caseSensitive: true,
- values: nil,
- valuesBySize: map[int][][]byte{
- 6: [][]byte{
- []byte(`test-2`),
- []byte(`test-3`),
- },
- 8: [][]byte{
- []byte(`test-111`),
- },
- 10: [][]byte{
- []byte(`test-12345`),
- },
- },
- minValLen: 6,
- maxValLen: 10,
+ &stringOpNode{
+ fieldPath: []string{"log", "pod"},
+ fieldPathStr: "log.pod",
+ checker: str_checker.MustNew(
+ "equal",
+ true,
+ [][]byte{[]byte(`test-111`), []byte(`test-2`), []byte(`test-3`), []byte(`test-12345`)},
+ ),
},
- &fieldOpNode{
- op: fieldContainsOp,
- fieldPath: []string{"service", "msg"},
- fieldPathStr: "service.msg",
- caseSensitive: true,
- values: [][]byte{
- []byte(`test-0987`),
- []byte(`test-11`),
- },
- minValLen: 7,
- maxValLen: 9,
+ &stringOpNode{
+ fieldPath: []string{"service", "msg"},
+ fieldPathStr: "service.msg",
+ checker: str_checker.MustNew(
+ "contains",
+ true,
+ [][]byte{[]byte(`test-0987`), []byte(`test-11`)},
+ ),
},
},
},
@@ -369,33 +313,33 @@ func TestBuildNodes(t *testing.T) {
},
},
{
- name: "err_field_op_node_empty_field",
+ name: "err_string_op_node_empty_field",
tree: treeNode{
- fieldOp: "equal",
+ stringOp: "equal",
},
wantErr: true,
},
{
- name: "err_field_op_node_empty_values",
+ name: "err_string_op_node_empty_values",
tree: treeNode{
- fieldOp: "equal",
+ stringOp: "equal",
fieldName: "pod",
},
wantErr: true,
},
{
- name: "err_field_op_node_invalid_regex",
+ name: "err_string_op_node_invalid_regex",
tree: treeNode{
- fieldOp: "regex",
+ stringOp: "regex",
fieldName: "pod",
values: [][]byte{[]byte(`\`)},
},
wantErr: true,
},
{
- name: "err_field_op_node_invalid_op_type",
+ name: "err_string_op_node_invalid_op_type",
tree: treeNode{
- fieldOp: "noop",
+ stringOp: "noop",
fieldName: "pod",
values: [][]byte{[]byte(`test`)},
},
@@ -478,7 +422,7 @@ func TestBuildNodes(t *testing.T) {
logicalOp: "noop",
operands: []treeNode{
{
- fieldOp: "contains",
+ stringOp: "contains",
fieldName: "service.msg",
caseSensitive: true,
values: [][]byte{[]byte(`test-0987`), []byte(`test-11`)},
@@ -493,13 +437,13 @@ func TestBuildNodes(t *testing.T) {
logicalOp: "not",
operands: []treeNode{
{
- fieldOp: "contains",
+ stringOp: "contains",
fieldName: "service.msg",
caseSensitive: true,
values: [][]byte{[]byte(`test-0987`), []byte(`test-11`)},
},
{
- fieldOp: "contains",
+ stringOp: "contains",
fieldName: "service.msg",
caseSensitive: true,
values: [][]byte{[]byte(`test-0987`), []byte(`test-11`)},
@@ -514,7 +458,7 @@ func TestBuildNodes(t *testing.T) {
tt := tt
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
- got, err := buildTree(tt.tree)
+ got, err := buildTree(&tt.tree)
if tt.wantErr {
require.Error(t, err)
return
@@ -546,7 +490,7 @@ func TestCheck(t *testing.T) {
{
name: "equal",
tree: treeNode{
- fieldOp: "equal",
+ stringOp: "equal",
fieldName: "pod",
caseSensitive: true,
values: [][]byte{[]byte("test-1"), []byte("test-2"), []byte("test-pod-123"), []byte("po-32")},
@@ -569,7 +513,7 @@ func TestCheck(t *testing.T) {
{
name: "contains",
tree: treeNode{
- fieldOp: "contains",
+ stringOp: "contains",
fieldName: "pod",
caseSensitive: true,
values: [][]byte{[]byte("test-1"), []byte("test-2")},
@@ -584,7 +528,7 @@ func TestCheck(t *testing.T) {
{
name: "prefix",
tree: treeNode{
- fieldOp: "prefix",
+ stringOp: "prefix",
fieldName: "pod",
caseSensitive: true,
values: [][]byte{[]byte("test-1"), []byte("test-2")},
@@ -599,7 +543,7 @@ func TestCheck(t *testing.T) {
{
name: "suffix",
tree: treeNode{
- fieldOp: "suffix",
+ stringOp: "suffix",
fieldName: "pod",
caseSensitive: true,
values: [][]byte{[]byte("test-1"), []byte("test-2")},
@@ -614,7 +558,7 @@ func TestCheck(t *testing.T) {
{
name: "regex",
tree: treeNode{
- fieldOp: "regex",
+ stringOp: "regex",
fieldName: "pod",
values: [][]byte{[]byte(`test-\d`)},
},
@@ -633,13 +577,13 @@ func TestCheck(t *testing.T) {
logicalOp: "or",
operands: []treeNode{
{
- fieldOp: "equal",
+ stringOp: "equal",
fieldName: "pod",
caseSensitive: true,
values: [][]byte{[]byte("test-1"), []byte("test-2")},
},
{
- fieldOp: "equal",
+ stringOp: "equal",
fieldName: "pod",
caseSensitive: true,
values: [][]byte{[]byte("test-3"), []byte("test-4")},
@@ -662,13 +606,13 @@ func TestCheck(t *testing.T) {
logicalOp: "and",
operands: []treeNode{
{
- fieldOp: "prefix",
+ stringOp: "prefix",
fieldName: "pod",
caseSensitive: true,
values: [][]byte{[]byte("test")},
},
{
- fieldOp: "suffix",
+ stringOp: "suffix",
fieldName: "pod",
caseSensitive: true,
values: [][]byte{[]byte("pod")},
@@ -691,7 +635,7 @@ func TestCheck(t *testing.T) {
logicalOp: "not",
operands: []treeNode{
{
- fieldOp: "equal",
+ stringOp: "equal",
fieldName: "pod",
caseSensitive: true,
values: [][]byte{[]byte("test-1"), []byte("test-2")},
@@ -709,7 +653,7 @@ func TestCheck(t *testing.T) {
{
name: "equal_case_insensitive",
tree: treeNode{
- fieldOp: "equal",
+ stringOp: "equal",
fieldName: "pod",
caseSensitive: false,
values: [][]byte{[]byte("Test-1"), []byte("tesT-2")},
@@ -724,7 +668,7 @@ func TestCheck(t *testing.T) {
{
name: "contains_case_insensitive",
tree: treeNode{
- fieldOp: "contains",
+ stringOp: "contains",
fieldName: "pod",
caseSensitive: false,
values: [][]byte{[]byte("Test-1"), []byte("tesT-2")},
@@ -739,7 +683,7 @@ func TestCheck(t *testing.T) {
{
name: "prefix_case_insensitive",
tree: treeNode{
- fieldOp: "prefix",
+ stringOp: "prefix",
fieldName: "pod",
caseSensitive: false,
values: [][]byte{[]byte("Test-1"), []byte("tesT-2")},
@@ -754,7 +698,7 @@ func TestCheck(t *testing.T) {
{
name: "suffix_case_insensitive",
tree: treeNode{
- fieldOp: "suffix",
+ stringOp: "suffix",
fieldName: "pod",
caseSensitive: false,
values: [][]byte{[]byte("Test-1"), []byte("tesT-2")},
@@ -769,7 +713,7 @@ func TestCheck(t *testing.T) {
{
name: "equal_nil_or_empty_string",
tree: treeNode{
- fieldOp: "equal",
+ stringOp: "equal",
fieldName: "test-field",
caseSensitive: false,
values: [][]byte{nil, []byte("")},
@@ -1078,7 +1022,7 @@ func TestCheck(t *testing.T) {
var eventRoot *insaneJSON.Root
var err error
t.Parallel()
- root, err = buildTree(tt.tree)
+ root, err = buildTree(&tt.tree)
require.NoError(t, err)
checker := newChecker(root)
for _, d := range tt.data {
@@ -1147,7 +1091,7 @@ func TestCheckLenCmpLtObject(t *testing.T) {
require.NoError(t, err)
for index, test := range tests {
- root, err := buildTree(treeNode{
+ root, err := buildTree(&treeNode{
fieldName: "user_info",
lenCmpOp: byteLenCmpOpTag,
cmpOp: "lt",
@@ -1164,7 +1108,7 @@ func TestCheckLenCmpLtObject(t *testing.T) {
require.NoError(t, err)
for index, test := range tests {
- root, err := buildTree(treeNode{
+ root, err := buildTree(&treeNode{
fieldName: "",
lenCmpOp: byteLenCmpOpTag,
cmpOp: "lt",
@@ -1188,7 +1132,7 @@ func TestCheckTsCmpValChangeModeNow(t *testing.T) {
ts1 := begin.Add(2 * dt)
ts2 := begin.Add(4 * dt)
- root, err := buildTree(treeNode{
+ root, err := buildTree(&treeNode{
tsCmpOp: true,
fieldName: "ts",
cmpOp: "lt",
@@ -1219,7 +1163,7 @@ func TestNodeIsEqual(t *testing.T) {
ts := time.Now()
fieldNode := treeNode{
- fieldOp: "equal",
+ stringOp: "equal",
fieldName: "service",
caseSensitive: true,
values: [][]byte{[]byte("test-1"), []byte("test-2")},
@@ -1248,7 +1192,7 @@ func TestNodeIsEqual(t *testing.T) {
logicalOp: "not",
operands: []treeNode{
{
- fieldOp: "equal",
+ stringOp: "equal",
fieldName: "service",
caseSensitive: true,
values: [][]byte{[]byte("test-1"), []byte("test-2")},
@@ -1262,13 +1206,13 @@ func TestNodeIsEqual(t *testing.T) {
logicalOp: "or",
operands: []treeNode{
{
- fieldOp: "equal",
+ stringOp: "equal",
fieldName: "service",
caseSensitive: true,
values: [][]byte{nil, []byte(""), []byte("null")},
},
{
- fieldOp: "contains",
+ stringOp: "contains",
fieldName: "pod",
caseSensitive: false,
values: [][]byte{[]byte("pod-1"), []byte("pod-2")},
@@ -1283,19 +1227,19 @@ func TestNodeIsEqual(t *testing.T) {
logicalOp: "and",
operands: []treeNode{
{
- fieldOp: "prefix",
+ stringOp: "prefix",
fieldName: "message",
caseSensitive: true,
values: [][]byte{[]byte("test-msg-1"), []byte("test-msg-2")},
},
{
- fieldOp: "suffix",
+ stringOp: "suffix",
fieldName: "message",
caseSensitive: true,
values: [][]byte{[]byte("test-msg-3"), []byte("test-msg-4")},
},
{
- fieldOp: "regex",
+ stringOp: "regex",
fieldName: "msg",
caseSensitive: true,
values: [][]byte{[]byte("test-\\d+"), []byte("test-000-\\d+")},
@@ -1373,15 +1317,15 @@ func TestNodeIsEqual(t *testing.T) {
wantErr: true,
},
{
- name: "not_equal_field_op_mismatch",
+ name: "not_equal_string_op_mismatch",
t1: treeNode{
- fieldOp: "equal",
+ stringOp: "equal",
fieldName: "service",
caseSensitive: false,
values: [][]byte{[]byte("test-1")},
},
t2: treeNode{
- fieldOp: "contains",
+ stringOp: "contains",
fieldName: "service",
caseSensitive: false,
values: [][]byte{[]byte("test-1")},
@@ -1389,15 +1333,15 @@ func TestNodeIsEqual(t *testing.T) {
wantErr: true,
},
{
- name: "not_equal_field_op_mismatch_2",
+ name: "not_equal_string_op_mismatch_2",
t1: treeNode{
- fieldOp: "prefix",
+ stringOp: "prefix",
fieldName: "service",
caseSensitive: false,
values: [][]byte{[]byte("test-1")},
},
t2: treeNode{
- fieldOp: "suffix",
+ stringOp: "suffix",
fieldName: "service",
caseSensitive: false,
values: [][]byte{[]byte("test-1")},
@@ -1405,15 +1349,15 @@ func TestNodeIsEqual(t *testing.T) {
wantErr: true,
},
{
- name: "not_equal_field_op_mismatch_3",
+ name: "not_equal_string_op_mismatch_3",
t1: treeNode{
- fieldOp: "regex",
+ stringOp: "regex",
fieldName: "service",
caseSensitive: false,
values: [][]byte{[]byte("test-1")},
},
t2: treeNode{
- fieldOp: "contains",
+ stringOp: "contains",
fieldName: "service",
caseSensitive: false,
values: [][]byte{[]byte("test-1")},
@@ -1423,13 +1367,13 @@ func TestNodeIsEqual(t *testing.T) {
{
name: "not_equal_field_case_sensitive_mismatch",
t1: treeNode{
- fieldOp: "equal",
+ stringOp: "equal",
fieldName: "service",
caseSensitive: false,
values: [][]byte{[]byte("test-1")},
},
t2: treeNode{
- fieldOp: "equal",
+ stringOp: "equal",
fieldName: "service",
caseSensitive: true,
values: [][]byte{[]byte("test-1")},
@@ -1439,13 +1383,13 @@ func TestNodeIsEqual(t *testing.T) {
{
name: "not_equal_field_field_path_mismatch",
t1: treeNode{
- fieldOp: "equal",
+ stringOp: "equal",
fieldName: "log.msg",
caseSensitive: true,
values: [][]byte{[]byte("test-1")},
},
t2: treeNode{
- fieldOp: "equal",
+ stringOp: "equal",
fieldName: "log.svc",
caseSensitive: true,
values: [][]byte{[]byte("test-1")},
@@ -1455,13 +1399,13 @@ func TestNodeIsEqual(t *testing.T) {
{
name: "not_equal_field_values_slice_len_mismatch",
t1: treeNode{
- fieldOp: "contains",
+ stringOp: "contains",
fieldName: "service",
caseSensitive: true,
values: [][]byte{[]byte("test-1"), []byte("test-2")},
},
t2: treeNode{
- fieldOp: "contains",
+ stringOp: "contains",
fieldName: "service",
caseSensitive: true,
values: [][]byte{[]byte("test-1")},
@@ -1471,13 +1415,13 @@ func TestNodeIsEqual(t *testing.T) {
{
name: "not_equal_field_values_slice_vals_mismatch",
t1: treeNode{
- fieldOp: "contains",
+ stringOp: "contains",
fieldName: "service",
caseSensitive: true,
values: [][]byte{[]byte("test-2")},
},
t2: treeNode{
- fieldOp: "contains",
+ stringOp: "contains",
fieldName: "service",
caseSensitive: true,
values: [][]byte{[]byte("test-1")},
@@ -1487,13 +1431,13 @@ func TestNodeIsEqual(t *testing.T) {
{
name: "not_equal_field_values_by_size_len_mismatch",
t1: treeNode{
- fieldOp: "equal",
+ stringOp: "equal",
fieldName: "service",
caseSensitive: true,
values: [][]byte{[]byte("test-1"), []byte("test-22")},
},
t2: treeNode{
- fieldOp: "equal",
+ stringOp: "equal",
fieldName: "service",
caseSensitive: true,
values: [][]byte{[]byte("test-1")},
@@ -1503,13 +1447,13 @@ func TestNodeIsEqual(t *testing.T) {
{
name: "not_equal_field_values_by_size_vals_key_mismatch",
t1: treeNode{
- fieldOp: "equal",
+ stringOp: "equal",
fieldName: "service",
caseSensitive: true,
values: [][]byte{[]byte("test-11")},
},
t2: treeNode{
- fieldOp: "equal",
+ stringOp: "equal",
fieldName: "service",
caseSensitive: true,
values: [][]byte{[]byte("test-1")},
@@ -1519,13 +1463,13 @@ func TestNodeIsEqual(t *testing.T) {
{
name: "not_equal_field_values_by_size_vals_len_mismatch",
t1: treeNode{
- fieldOp: "equal",
+ stringOp: "equal",
fieldName: "service",
caseSensitive: true,
values: [][]byte{[]byte("test-1"), []byte("test-2")},
},
t2: treeNode{
- fieldOp: "equal",
+ stringOp: "equal",
fieldName: "service",
caseSensitive: true,
values: [][]byte{[]byte("test-1")},
@@ -1535,13 +1479,13 @@ func TestNodeIsEqual(t *testing.T) {
{
name: "not_equal_field_values_by_size_vals_mismatch",
t1: treeNode{
- fieldOp: "equal",
+ stringOp: "equal",
fieldName: "service",
caseSensitive: true,
values: [][]byte{[]byte("test-2")},
},
t2: treeNode{
- fieldOp: "equal",
+ stringOp: "equal",
fieldName: "service",
caseSensitive: true,
values: [][]byte{[]byte("test-1")},
@@ -1551,13 +1495,13 @@ func TestNodeIsEqual(t *testing.T) {
{
name: "not_equal_field_reValues_len_mismatch",
t1: treeNode{
- fieldOp: "regex",
+ stringOp: "regex",
fieldName: "service",
caseSensitive: true,
values: [][]byte{[]byte("test-1"), []byte("test-2")},
},
t2: treeNode{
- fieldOp: "regex",
+ stringOp: "regex",
fieldName: "service",
caseSensitive: true,
values: [][]byte{[]byte("test-1")},
@@ -1567,13 +1511,13 @@ func TestNodeIsEqual(t *testing.T) {
{
name: "not_equal_field_reValues_vals_mismatch",
t1: treeNode{
- fieldOp: "regex",
+ stringOp: "regex",
fieldName: "service",
caseSensitive: true,
values: [][]byte{[]byte("test-2")},
},
t2: treeNode{
- fieldOp: "regex",
+ stringOp: "regex",
fieldName: "service",
caseSensitive: true,
values: [][]byte{[]byte("test-1")},
@@ -1796,7 +1740,7 @@ func TestNodeIsEqual(t *testing.T) {
logicalOp: "not",
operands: []treeNode{
{
- fieldOp: "equal",
+ stringOp: "equal",
fieldName: "service",
caseSensitive: false,
values: [][]byte{nil},
@@ -1807,7 +1751,7 @@ func TestNodeIsEqual(t *testing.T) {
logicalOp: "and",
operands: []treeNode{
{
- fieldOp: "equal",
+ stringOp: "equal",
fieldName: "service",
caseSensitive: false,
values: [][]byte{nil},
@@ -1822,7 +1766,7 @@ func TestNodeIsEqual(t *testing.T) {
logicalOp: "or",
operands: []treeNode{
{
- fieldOp: "equal",
+ stringOp: "equal",
fieldName: "service",
caseSensitive: false,
values: [][]byte{nil},
@@ -1833,13 +1777,13 @@ func TestNodeIsEqual(t *testing.T) {
logicalOp: "or",
operands: []treeNode{
{
- fieldOp: "equal",
+ stringOp: "equal",
fieldName: "service",
caseSensitive: false,
values: [][]byte{nil},
},
{
- fieldOp: "equal",
+ stringOp: "equal",
fieldName: "service",
caseSensitive: false,
values: [][]byte{nil},
@@ -1854,7 +1798,7 @@ func TestNodeIsEqual(t *testing.T) {
logicalOp: "or",
operands: []treeNode{
{
- fieldOp: "equal",
+ stringOp: "equal",
fieldName: "service",
caseSensitive: false,
values: [][]byte{nil},
@@ -1865,7 +1809,7 @@ func TestNodeIsEqual(t *testing.T) {
logicalOp: "or",
operands: []treeNode{
{
- fieldOp: "equal",
+ stringOp: "equal",
fieldName: "pod",
caseSensitive: false,
values: [][]byte{nil},
@@ -1879,9 +1823,9 @@ func TestNodeIsEqual(t *testing.T) {
tt := tt
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
- root1, err := buildTree(tt.t1)
+ root1, err := buildTree(&tt.t1)
require.NoError(t, err)
- root2, err := buildTree(tt.t2)
+ root2, err := buildTree(&tt.t2)
require.NoError(t, err)
c1 := newChecker(root1)
c2 := newChecker(root2)
diff --git a/pipeline/doif/len_cmp_op.go b/pipeline/do_if/len_cmp_op.go
similarity index 93%
rename from pipeline/doif/len_cmp_op.go
rename to pipeline/do_if/len_cmp_op.go
index c8904dca9..a268192cc 100644
--- a/pipeline/doif/len_cmp_op.go
+++ b/pipeline/do_if/len_cmp_op.go
@@ -1,4 +1,4 @@
-package doif
+package do_if
import (
"errors"
@@ -10,7 +10,7 @@ import (
)
/*{ do-if-len-cmp-op-node
-DoIf length comparison op node is considered to always be a leaf in the DoIf tree like DoIf field op node.
+DoIf length comparison op node is considered to always be a leaf in the DoIf tree like DoIf string op node.
It contains operation that compares field length in bytes or array length (for array fields) with certain value.
Params:
@@ -96,7 +96,7 @@ type lenCmpOpNode struct {
cmpValue int
}
-func NewLenCmpOpNode(op string, field string, cmpOp string, cmpValue int) (Node, error) {
+func newLenCmpOpNode(op string, field string, cmpOp string, cmpValue int) (Node, error) {
var lenCmpOp lenCmpOpType
switch op {
case byteLenCmpOpTag:
@@ -125,7 +125,7 @@ func NewLenCmpOpNode(op string, field string, cmpOp string, cmpValue int) (Node,
}, nil
}
-func (n *lenCmpOpNode) Type() NodeType {
+func (n *lenCmpOpNode) Type() nodeType {
return NodeLengthCmpOp
}
@@ -174,7 +174,7 @@ func getNodeBytesSize(node *insaneJSON.Node) int {
return size
}
-func (n *lenCmpOpNode) Check(eventRoot *insaneJSON.Root) bool {
+func (n *lenCmpOpNode) checkEvent(eventRoot *insaneJSON.Root) bool {
value := 0
switch n.lenCmpOp {
@@ -203,6 +203,10 @@ func (n *lenCmpOpNode) Check(eventRoot *insaneJSON.Root) bool {
return n.cmpOp.compare(value, n.cmpValue)
}
+func (n *lenCmpOpNode) CheckRaw(event []byte, sourceName []byte, metadata map[string]string) bool {
+ panic("not impl")
+}
+
func (n *lenCmpOpNode) isEqualTo(n2 Node, _ int) error {
n2Explicit, ok := n2.(*lenCmpOpNode)
if !ok {
diff --git a/pipeline/do_if/logic/logic.go b/pipeline/do_if/logic/logic.go
new file mode 100644
index 000000000..fe5e9af5a
--- /dev/null
+++ b/pipeline/do_if/logic/logic.go
@@ -0,0 +1,43 @@
+package logic
+
+import "fmt"
+
+type Op int
+
+const (
+ And Op = iota
+ Or
+ Not
+)
+
+const (
+ AndTag = "and"
+ OrTag = "or"
+ NotTag = "not"
+)
+
+func (op Op) String() string {
+ switch op {
+ case And:
+ return AndTag
+ case Or:
+ return OrTag
+ case Not:
+ return NotTag
+ default:
+ return "unknown"
+ }
+}
+
+func StringToOp(s string) (Op, error) {
+ switch s {
+ case AndTag:
+ return And, nil
+ case OrTag:
+ return Or, nil
+ case NotTag:
+ return Not, nil
+ default:
+ return -1, fmt.Errorf("unknown logic op tag: %s", s)
+ }
+}
diff --git a/pipeline/do_if/logical_op.go b/pipeline/do_if/logical_op.go
new file mode 100644
index 000000000..344722ee3
--- /dev/null
+++ b/pipeline/do_if/logical_op.go
@@ -0,0 +1,225 @@
+package do_if
+
+import (
+ "errors"
+ "fmt"
+
+ "github.com/ozontech/file.d/pipeline/do_if/logic"
+ insaneJSON "github.com/ozontech/insane-json"
+)
+
+/*{ do-if-logical-op-node
+DoIf logical op node is a node considered to be the root or an edge between nodes.
+It always has at least one operand which are other nodes and calls their checks
+to apply logical operation on their results.
+
+Params:
+ - `op` - value from logical operations list. Required.
+ - `operands` - list of another do-if nodes. Required non-empty.
+
+Example:
+```yaml
+pipelines:
+ test:
+ actions:
+ - type: discard
+ do_if:
+ op: and
+ operands:
+ - op: equal
+ field: pod
+ values: [test-pod-1, test-pod-2]
+ case_sensitive: true
+ - op: equal
+ field: service
+ values: [test-service]
+ case_sensitive: true
+```
+
+}*/
+
+/*{ do-if-logical-op
+Operation `or` accepts at least one operand and returns true on the first returned true from its operands.
+
+Example:
+```yaml
+pipelines:
+ test:
+ actions:
+ - type: discard
+ do_if:
+ op: or
+ operands:
+ - op: equal
+ field: pod
+ values: [test-pod-1, test-pod-2]
+ - op: equal
+ field: service
+ values: [test-service]
+```
+
+Result:
+```
+{"pod":"test-pod-1","service":"test-service"} # discarded
+{"pod":"test-pod-2","service":"test-service-2"} # discarded
+{"pod":"test-pod","service":"test-service"} # discarded
+{"pod":"test-pod","service":"test-service-1"} # not discarded
+```
+
+
+
+Operation `and` accepts at least one operand and returns true if all operands return true
+(in other words returns false on the first returned false from its operands).
+
+Example:
+```yaml
+pipelines:
+ test:
+ actions:
+ - type: discard
+ do_if:
+ op: and
+ operands:
+ - op: equal
+ field: pod
+ values: [test-pod-1, test-pod-2]
+ - op: equal
+ field: service
+ values: [test-service]
+```
+
+Result:
+```
+{"pod":"test-pod-1","service":"test-service"} # discarded
+{"pod":"test-pod-2","service":"test-service-2"} # not discarded
+{"pod":"test-pod","service":"test-service"} # not discarded
+{"pod":"test-pod","service":"test-service-1"} # not discarded
+```
+
+
+
+Operation `not` accepts exactly one operand and returns inverted result of its operand.
+
+Example:
+```yaml
+pipelines:
+ test:
+ actions:
+ - type: discard
+ do_if:
+ op: not
+ operands:
+ - op: equal
+ field: service
+ values: [test-service]
+```
+
+Result:
+```
+{"pod":"test-pod-1","service":"test-service"} # not discarded
+{"pod":"test-pod-2","service":"test-service-2"} # discarded
+{"pod":"test-pod","service":"test-service"} # not discarded
+{"pod":"test-pod","service":"test-service-1"} # discarded
+```
+
+
+
+}*/
+
+type logicalNode struct {
+ op logic.Op
+ operands []Node
+}
+
+func newLogicalNode(op string, operands []Node) (Node, error) {
+ if len(operands) == 0 {
+ return nil, errors.New("logical op must have at least one operand")
+ }
+
+ logicOp, err := logic.StringToOp(op)
+ if err != nil {
+ return nil, err
+ }
+
+ if logicOp == logic.Not && len(operands) != 1 {
+ return nil, fmt.Errorf("logical not must have exactly one operand, got %d", len(operands))
+ }
+
+ return &logicalNode{
+ op: logicOp,
+ operands: operands,
+ }, nil
+}
+
+func (n *logicalNode) Type() nodeType {
+ return NodeLogicalOp
+}
+
+func (n *logicalNode) checkEvent(eventRoot *insaneJSON.Root) bool {
+ switch n.op {
+ case logic.Or:
+ for _, op := range n.operands {
+ if op.checkEvent(eventRoot) {
+ return true
+ }
+ }
+ return false
+ case logic.And:
+ for _, op := range n.operands {
+ if !op.checkEvent(eventRoot) {
+ return false
+ }
+ }
+ return true
+ case logic.Not:
+ return !n.operands[0].checkEvent(eventRoot)
+ default:
+ panic("unknown logical op")
+ }
+}
+
+func (n *logicalNode) CheckRaw(event []byte, sourceName []byte, metadata map[string]string) bool {
+ switch n.op {
+ case logic.Or:
+ for _, op := range n.operands {
+ if op.CheckRaw(event, sourceName, metadata) {
+ return true
+ }
+ }
+ return false
+ case logic.And:
+ for _, op := range n.operands {
+ if !op.CheckRaw(event, sourceName, metadata) {
+ return false
+ }
+ }
+ return true
+ case logic.Not:
+ return !n.operands[0].CheckRaw(event, sourceName, metadata)
+ default:
+ panic("unknown logical op")
+ }
+}
+
+func (n *logicalNode) isEqualTo(n2 Node, level int) error {
+ n2l, ok := n2.(*logicalNode)
+ if !ok {
+ return errors.New("nodes have different types expected: logicalNode")
+ }
+ if n.op != n2l.op {
+ return fmt.Errorf("nodes have different op expected: %q", n.op)
+ }
+ if len(n.operands) != len(n2l.operands) {
+ return fmt.Errorf("nodes have different operands len expected: %d", len(n.operands))
+ }
+ for i := 0; i < len(n.operands); i++ {
+ if err := n.operands[i].isEqualTo(n2l.operands[i], level+1); err != nil {
+ tabs := make([]byte, 0, level)
+ for j := 0; j < level; j++ {
+ tabs = append(tabs, '\t')
+ }
+ return fmt.Errorf("nodes with op %q have different operand nodes on position %d:\n%s%w", n.op, i, tabs, err)
+ }
+ }
+ return nil
+}
diff --git a/pipeline/do_if/str_checker/checker.go b/pipeline/do_if/str_checker/checker.go
new file mode 100644
index 000000000..7d59924a9
--- /dev/null
+++ b/pipeline/do_if/str_checker/checker.go
@@ -0,0 +1,275 @@
+package str_checker
+
+import (
+ "bytes"
+ "errors"
+ "fmt"
+ "regexp"
+)
+
+type op int
+
+const (
+ OpEqual op = iota
+ OpContains
+ OpPrefix
+ OpSuffix
+ OpRegex
+)
+
+func (op op) String() string {
+ switch op {
+ case OpEqual:
+ return OpEqualTag
+ case OpContains:
+ return OpContainsTag
+ case OpPrefix:
+ return OpPrefixTag
+ case OpSuffix:
+ return OpSuffixTag
+ case OpRegex:
+ return OpRegexTag
+ default:
+ return "unknown"
+ }
+}
+
+const (
+ OpEqualTag = "equal"
+ OpContainsTag = "contains"
+ OpPrefixTag = "prefix"
+ OpSuffixTag = "suffix"
+ OpRegexTag = "regex"
+)
+
+func stringToOp(s string) (op, error) {
+ switch s {
+ case OpEqualTag:
+ return OpEqual, nil
+ case OpContainsTag:
+ return OpContains, nil
+ case OpPrefixTag:
+ return OpPrefix, nil
+ case OpSuffixTag:
+ return OpSuffix, nil
+ case OpRegexTag:
+ return OpRegex, nil
+ default:
+ return -1, fmt.Errorf("unknown string op %q", s)
+ }
+}
+
+type DataChecker struct {
+ Op op
+ CaseSensitive bool
+ Values [][]byte
+ ValuesBySize map[int][][]byte
+ ReValues []*regexp.Regexp
+
+ MinValLen int
+ MaxValLen int
+}
+
+func New(opTag string, caseSensitive bool, values [][]byte) (DataChecker, error) {
+ var def DataChecker
+
+ if len(values) == 0 {
+ return def, errors.New("values are not provided")
+ }
+
+ var vals [][]byte
+ var valsBySize map[int][][]byte
+ var reValues []*regexp.Regexp
+ var minValLen, maxValLen int
+
+ curOp, err := stringToOp(opTag)
+ if err != nil {
+ return def, err
+ }
+
+ if curOp == OpRegex {
+ reValues = make([]*regexp.Regexp, 0, len(values))
+ for _, v := range values {
+ re, err := regexp.Compile(string(v))
+ if err != nil {
+ return def, fmt.Errorf("failed to compile regex %q: %w", v, err)
+ }
+ reValues = append(reValues, re)
+ }
+ } else {
+ minValLen = len(values[0])
+ maxValLen = len(values[0])
+ if curOp == OpEqual {
+ valsBySize = make(map[int][][]byte)
+ } else {
+ vals = make([][]byte, len(values))
+ }
+ for i := range values {
+ var curVal []byte
+ if values[i] != nil {
+ curVal = make([]byte, len(values[i]))
+ copy(curVal, values[i])
+ }
+ if !caseSensitive && curVal != nil {
+ curVal = bytes.ToLower(curVal)
+ }
+ if len(values[i]) < minValLen {
+ minValLen = len(values[i])
+ }
+ if len(values[i]) > maxValLen {
+ maxValLen = len(values[i])
+ }
+ if curOp == OpEqual {
+ valsBySize[len(curVal)] = append(valsBySize[len(curVal)], curVal)
+ } else {
+ vals[i] = curVal
+ }
+ }
+ }
+
+ return DataChecker{
+ Op: curOp,
+ CaseSensitive: caseSensitive,
+ Values: vals,
+ ValuesBySize: valsBySize,
+ ReValues: reValues,
+ MinValLen: minValLen,
+ MaxValLen: maxValLen,
+ }, nil
+}
+
+func MustNew(opTag string, caseSensitive bool, values [][]byte) DataChecker {
+ res, err := New(opTag, caseSensitive, values)
+ if err != nil {
+ panic(err)
+ }
+
+ return res
+}
+
+func (n *DataChecker) Check(data []byte) bool {
+ // fast check for data
+ if n.Op != OpRegex && len(data) < n.MinValLen {
+ return false
+ }
+
+ switch n.Op {
+ case OpEqual:
+ vals, ok := n.ValuesBySize[len(data)]
+ if !ok {
+ return false
+ }
+ if !n.CaseSensitive && data != nil {
+ data = bytes.ToLower(data)
+ }
+ for _, val := range vals {
+ // null and empty strings are considered as different values
+ // null can also come if field value is absent
+ if (data == nil && val != nil) || (data != nil && val == nil) {
+ continue
+ }
+ if bytes.Equal(data, val) {
+ return true
+ }
+ }
+ case OpContains:
+ if !n.CaseSensitive {
+ data = bytes.ToLower(data)
+ }
+ for _, val := range n.Values {
+ if bytes.Contains(data, val) {
+ return true
+ }
+ }
+ case OpPrefix:
+ // check only necessary amount of bytes
+ if len(data) > n.MaxValLen {
+ data = data[:n.MaxValLen]
+ }
+ if !n.CaseSensitive {
+ data = bytes.ToLower(data)
+ }
+ for _, val := range n.Values {
+ if bytes.HasPrefix(data, val) {
+ return true
+ }
+ }
+ case OpSuffix:
+ // check only necessary amount of bytes
+ if len(data) > n.MaxValLen {
+ data = data[len(data)-n.MaxValLen:]
+ }
+ if !n.CaseSensitive {
+ data = bytes.ToLower(data)
+ }
+ for _, val := range n.Values {
+ if bytes.HasSuffix(data, val) {
+ return true
+ }
+ }
+ case OpRegex:
+ for _, re := range n.ReValues {
+ if re.Match(data) {
+ return true
+ }
+ }
+ }
+ return false
+}
+
+func assert(b bool, msg string) {
+ if !b {
+ panic(msg)
+ }
+}
+
+func assertEqual[T comparable](a, b T, msg string) {
+ assert(a == b, fmt.Sprintf("%s: %v != %v", msg, a, b))
+}
+
+func assertEqualValues(a, b [][]byte, msg string) {
+ assertEqual(len(a), len(b), fmt.Sprintf("%s: different values count", msg))
+ for i := range a {
+ assert(
+ bytes.Equal(a[i], b[i]),
+ fmt.Sprintf("%s: different values at pos %d: %s != %s",
+ msg, i, a[i], b[i],
+ ),
+ )
+ }
+}
+
+func Equal(a, b *DataChecker) (err error) {
+ defer func() {
+ if r := recover(); r != nil {
+ err = errors.New(r.(string))
+ }
+ }()
+
+ assertEqual(a.Op, b.Op, "different op")
+ assertEqual(a.CaseSensitive, b.CaseSensitive, "different case_sensitive")
+ assertEqualValues(a.Values, b.Values, "different values")
+
+ assertEqual(len(a.ValuesBySize), len(b.ValuesBySize), "different ValuesBySize len")
+ for size := range a.ValuesBySize {
+ _, found := b.ValuesBySize[size]
+ assert(found, fmt.Sprintf("not found values by size %d", size))
+ assertEqualValues(
+ a.ValuesBySize[size], b.ValuesBySize[size],
+ fmt.Sprintf("different values by size %d", size),
+ )
+ }
+
+ assertEqual(len(a.ReValues), len(b.ReValues), "different regex values count")
+ for i := range a.ReValues {
+ assertEqual(
+ a.ReValues[i].String(), b.ReValues[i].String(),
+ fmt.Sprintf("different regex values at pos %d", i),
+ )
+ }
+
+ assertEqual(a.MinValLen, b.MinValLen, "different min value len")
+ assertEqual(a.MaxValLen, b.MaxValLen, "different max value len")
+
+ return nil
+}
diff --git a/pipeline/do_if/str_checker/checker_test.go b/pipeline/do_if/str_checker/checker_test.go
new file mode 100644
index 000000000..1a8c7fc0a
--- /dev/null
+++ b/pipeline/do_if/str_checker/checker_test.go
@@ -0,0 +1,71 @@
+package str_checker
+
+import (
+ "testing"
+
+ "github.com/stretchr/testify/require"
+)
+
+func TestCheckerCtor(t *testing.T) {
+ type testCase struct {
+ opTag string
+ caseSensitive bool
+ values [][]byte
+
+ expected DataChecker
+ }
+
+ for _, tt := range []testCase{
+ {
+ opTag: OpEqualTag,
+ caseSensitive: true,
+ values: [][]byte{[]byte(`test-111`), []byte(`test-2`), []byte(`test-3`), []byte(`test-12345`)},
+
+ expected: DataChecker{
+ Op: OpEqual,
+ CaseSensitive: true,
+ Values: nil,
+ ValuesBySize: map[int][][]byte{
+ 6: {
+ []byte(`test-2`),
+ []byte(`test-3`),
+ },
+ 8: {
+ []byte(`test-111`),
+ },
+ 10: {
+ []byte(`test-12345`),
+ },
+ },
+ ReValues: nil,
+ MinValLen: 6,
+ MaxValLen: 10,
+ },
+ },
+ {
+ opTag: OpContainsTag,
+ caseSensitive: false,
+ values: [][]byte{
+ []byte(`test-0987`),
+ []byte(`test-11`),
+ },
+
+ expected: DataChecker{
+ Op: OpContains,
+ CaseSensitive: false,
+ Values: [][]byte{
+ []byte(`test-0987`),
+ []byte(`test-11`),
+ },
+ ValuesBySize: nil,
+ ReValues: nil,
+ MinValLen: 7,
+ MaxValLen: 9,
+ },
+ },
+ } {
+ got, err := New(tt.opTag, tt.caseSensitive, tt.values)
+ require.NoErrorf(t, err, "failed to init checker")
+ require.NoError(t, Equal(&got, &tt.expected), "checkers are not equal")
+ }
+}
diff --git a/pipeline/do_if/string_op.go b/pipeline/do_if/string_op.go
new file mode 100644
index 000000000..3f9077210
--- /dev/null
+++ b/pipeline/do_if/string_op.go
@@ -0,0 +1,303 @@
+package do_if
+
+import (
+ "errors"
+ "fmt"
+ "slices"
+ "strings"
+
+ "github.com/ozontech/file.d/cfg"
+ "github.com/ozontech/file.d/pipeline/do_if/str_checker"
+ insaneJSON "github.com/ozontech/insane-json"
+)
+
+/*{ do-if-string-op-node
+DoIf string op node is considered to always be a leaf in the DoIf tree. It checks byte representation of the value by the given field path.
+Array and object values are considered as not matched since encoding them to bytes leads towards large CPU and memory consumption.
+
+Params:
+ - `op` - value from string operations list. Required.
+ - `field` - path to field in JSON tree. If empty, root value is checked. Path to nested fields is delimited by dots `"."`, e.g. `"field.subfield"` for `{"field": {"subfield": "val"}}`.
+ If the field name contains dots in it they should be shielded with `"\"`, e.g. `"exception\.type"` for `{"exception.type": "example"}`. Default empty.
+ - `values` - list of values to check field. Required non-empty.
+ - `case_sensitive` - flag indicating whether checks are performed in case-sensitive way. Default `true`.
+ Note: case insensitive checks can cause CPU and memory overhead since every field value will be converted to lower letters.
+
+Example:
+```yaml
+pipelines:
+ tests:
+ actions:
+ - type: discard
+ do_if:
+ op: suffix
+ field: pod
+ values: [pod-1, pod-2]
+ case_sensitive: true
+```
+
+}*/
+
+/*{ do-if-string-op
+Operation `equal` checks whether the field value is equal to one of the elements in the values list.
+
+Example:
+```yaml
+pipelines:
+ test:
+ actions:
+ - type: discard
+ do_if:
+ op: equal
+ field: pod
+ values: [test-pod-1, test-pod-2]
+```
+
+Result:
+```
+{"pod":"test-pod-1","service":"test-service"} # discarded
+{"pod":"test-pod-2","service":"test-service-2"} # discarded
+{"pod":"test-pod","service":"test-service"} # not discarded
+{"pod":"test-pod","service":"test-service-1"} # not discarded
+```
+
+
+
+Operation `contains` checks whether the field value contains one of the elements the in values list.
+
+Example:
+```yaml
+pipelines:
+ test:
+ actions:
+ - type: discard
+ do_if:
+ op: contains
+ field: pod
+ values: [my-pod, my-test]
+```
+
+Result:
+```
+{"pod":"test-my-pod-1","service":"test-service"} # discarded
+{"pod":"test-not-my-pod","service":"test-service-2"} # discarded
+{"pod":"my-test-pod","service":"test-service"} # discarded
+{"pod":"test-pod","service":"test-service-1"} # not discarded
+```
+
+
+
+Operation `prefix` checks whether the field value has prefix equal to one of the elements in the values list.
+
+Example:
+```yaml
+pipelines:
+ test:
+ actions:
+ - type: discard
+ do_if:
+ op: prefix
+ field: pod
+ values: [test-1, test-2]
+```
+
+Result:
+```
+{"pod":"test-1-pod-1","service":"test-service"} # discarded
+{"pod":"test-2-pod-2","service":"test-service-2"} # discarded
+{"pod":"test-pod","service":"test-service"} # not discarded
+{"pod":"test-pod","service":"test-service-1"} # not discarded
+```
+
+
+
+Operation `suffix` checks whether the field value has suffix equal to one of the elements in the values list.
+
+Example:
+```yaml
+pipelines:
+ test:
+ actions:
+ - type: discard
+ do_if:
+ op: suffix
+ field: pod
+ values: [pod-1, pod-2]
+```
+
+Result:
+```
+{"pod":"test-1-pod-1","service":"test-service"} # discarded
+{"pod":"test-2-pod-2","service":"test-service-2"} # discarded
+{"pod":"test-pod","service":"test-service"} # not discarded
+{"pod":"test-pod","service":"test-service-1"} # not discarded
+```
+
+
+
+Operation `regex` checks whether the field matches any regex from the values list.
+
+Example:
+```yaml
+pipelines:
+ test:
+ actions:
+ - type: discard
+ do_if:
+ op: regex
+ field: pod
+ values: [pod-\d, my-test.*]
+```
+
+Result:
+```
+{"pod":"test-1-pod-1","service":"test-service"} # discarded
+{"pod":"test-2-pod-2","service":"test-service-2"} # discarded
+{"pod":"test-pod","service":"test-service"} # not discarded
+{"pod":"my-test-pod","service":"test-service-1"} # discarded
+{"pod":"my-test-instance","service":"test-service-1"} # discarded
+{"pod":"service123","service":"test-service-1"} # not discarded
+```
+
+
+
+}*/
+
+type dataType int
+
+const (
+ dataTypeEvent dataType = iota
+ dataTypeSourceName
+ dataTypeMeta
+)
+
+func (c dataType) String() string {
+ switch c {
+ case dataTypeEvent:
+ return DataTypeEventTag
+ case dataTypeSourceName:
+ return DataTypeSourceNameTag
+ case dataTypeMeta:
+ return DataTypeMetaTag
+ default:
+ panic(fmt.Sprintf("unknown checked data type: %d", c))
+ }
+}
+
+const (
+ DataTypeEventTag = "event"
+ DataTypeSourceNameTag = "source_name"
+ DataTypeMetaTag = "meta"
+
+ dataTypeMetaTagPrefix = "meta."
+)
+
+func stringToDataType(s string) (dataType, string, error) {
+ switch {
+ case s == DataTypeEventTag:
+ return dataTypeEvent, "", nil
+ case s == DataTypeSourceNameTag:
+ return dataTypeSourceName, "", nil
+ case strings.HasPrefix(s, dataTypeMetaTagPrefix):
+ return dataTypeMeta, strings.TrimPrefix(s, dataTypeMetaTagPrefix), nil
+ default:
+ return -1, "", fmt.Errorf("unparsable check data tag: %s", s)
+ }
+}
+
+type stringOpNode struct {
+ fieldPath []string
+ fieldPathStr string
+ dataType dataType
+ metaKey string
+
+ checker str_checker.DataChecker
+}
+
+func newStringOpNode(
+ op string,
+ caseSensitive bool,
+ values [][]byte,
+ field string,
+ dataTypeTag string,
+) (Node, error) {
+ if len(values) == 0 {
+ return nil, errors.New("values are not provided")
+ }
+
+ c, err := str_checker.New(op, caseSensitive, values)
+ if err != nil {
+ return nil, err
+ }
+
+ var curDataType dataType
+ var curMetaKey string
+ if dataTypeTag != "" {
+ curDataType, curMetaKey, err = stringToDataType(dataTypeTag)
+ if err != nil {
+ return nil, err
+ }
+ }
+
+ return &stringOpNode{
+ fieldPath: cfg.ParseFieldSelector(field),
+ fieldPathStr: field,
+ dataType: curDataType,
+ metaKey: curMetaKey,
+ checker: c,
+ }, nil
+}
+
+func (n *stringOpNode) Type() nodeType {
+ return NodeStringOp
+}
+
+func (n *stringOpNode) checkEvent(eventRoot *insaneJSON.Root) bool {
+ node := eventRoot.Dig(n.fieldPath...)
+ if node.IsArray() || node.IsObject() {
+ return false
+ }
+
+ if node.IsNull() {
+ return n.checker.Check(nil)
+ }
+
+ return n.checker.Check(node.AsBytes())
+}
+
+func (n *stringOpNode) CheckRaw(event []byte, sourceName []byte, metadata map[string]string) bool {
+ switch n.dataType {
+ case dataTypeEvent:
+ return n.checker.Check(event)
+ case dataTypeSourceName:
+ return n.checker.Check(sourceName)
+ case dataTypeMeta:
+ data, ok := metadata[n.metaKey]
+ return ok && n.checker.Check([]byte(data))
+ default:
+ panic(fmt.Sprintf("inknown type of checked data: %d", n.dataType))
+ }
+}
+
+func (n *stringOpNode) isEqualTo(n2 Node, _ int) error {
+ n2f, ok := n2.(*stringOpNode)
+ if !ok {
+ return errors.New("nodes have different types expected: stringOpNode")
+ }
+
+ if n.fieldPathStr != n2f.fieldPathStr || slices.Compare[[]string](n.fieldPath, n2f.fieldPath) != 0 {
+ return fmt.Errorf("nodes have different fieldPathStr expected: fieldPathStr=%q fieldPath=%v",
+ n.fieldPathStr, n.fieldPath,
+ )
+ }
+
+ if n.dataType != n2f.dataType {
+ return fmt.Errorf("nodes have different data types expected: dataType=%q", n.dataType)
+ }
+
+ if n.metaKey != n2f.metaKey {
+ return fmt.Errorf("nodes have different meta keys expected: mataKey=%q", n.metaKey)
+ }
+
+ return str_checker.Equal(&n.checker, &n2f.checker)
+}
diff --git a/pipeline/doif/ts_cmp_op.go b/pipeline/do_if/ts_cmp_op.go
similarity index 94%
rename from pipeline/doif/ts_cmp_op.go
rename to pipeline/do_if/ts_cmp_op.go
index f24753c9d..dd2238e47 100644
--- a/pipeline/doif/ts_cmp_op.go
+++ b/pipeline/do_if/ts_cmp_op.go
@@ -1,4 +1,4 @@
-package doif
+package do_if
import (
"errors"
@@ -13,7 +13,7 @@ import (
)
/*{ do-if-ts-cmp-op-node
-DoIf timestamp comparison op node is considered to always be a leaf in the DoIf tree like DoIf field op node.
+DoIf timestamp comparison op node is considered to always be a leaf in the DoIf tree like DoIf string op node.
It contains operation that compares timestamps with certain value.
Params:
@@ -84,7 +84,7 @@ type tsCmpOpNode struct {
updateInterval time.Duration
}
-func NewTsCmpOpNode(field string, format string, cmpOp string, cmpValChangeMode string, cmpValue time.Time, cmpValueShift time.Duration, updateInterval time.Duration) (Node, error) {
+func newTsCmpOpNode(field string, format string, cmpOp string, cmpValChangeMode string, cmpValue time.Time, cmpValueShift time.Duration, updateInterval time.Duration) (Node, error) {
typedCmpOp, err := newCmpOp(cmpOp)
if err != nil {
return nil, err
@@ -133,11 +133,11 @@ func (n *tsCmpOpNode) startUpdater() {
}
}
-func (n *tsCmpOpNode) Type() NodeType {
+func (n *tsCmpOpNode) Type() nodeType {
return NodeTimestampCmpOp
}
-func (n *tsCmpOpNode) Check(eventRoot *insaneJSON.Root) bool {
+func (n *tsCmpOpNode) checkEvent(eventRoot *insaneJSON.Root) bool {
node := eventRoot.Dig(n.fieldPath...)
if node == nil {
return false
@@ -169,6 +169,10 @@ func (n *tsCmpOpNode) Check(eventRoot *insaneJSON.Root) bool {
return n.cmpOp.compare(lhs, int(rhs))
}
+func (n *tsCmpOpNode) CheckRaw([]byte, []byte, map[string]string) bool {
+ panic("not impl")
+}
+
func (n *tsCmpOpNode) isEqualTo(n2 Node, _ int) error {
n2Explicit, ok := n2.(*tsCmpOpNode)
if !ok {
diff --git a/pipeline/doif/ctor_utils.go b/pipeline/doif/ctor_utils.go
deleted file mode 100644
index ace56edc2..000000000
--- a/pipeline/doif/ctor_utils.go
+++ /dev/null
@@ -1,57 +0,0 @@
-package doif
-
-import (
- "encoding/json"
- "errors"
- "fmt"
-)
-
-var (
- errFieldNotFound = errors.New("field not found")
- errTypeMismatch = errors.New("type mismatch")
-)
-
-func getAny(node map[string]any, field string) (any, error) {
- res, has := node[field]
- if !has {
- return nil, fmt.Errorf("field=%q: %w", field, errFieldNotFound)
- }
-
- return res, nil
-}
-
-func get[T any](node map[string]any, field string) (T, error) {
- var def T
-
- fieldNode, err := getAny(node, field)
- if err != nil {
- return def, err
- }
-
- result, ok := fieldNode.(T)
- if !ok {
- return def, fmt.Errorf(
- "field=%q expected=%T got=%T: %w",
- field, def, fieldNode, errTypeMismatch,
- )
- }
-
- return result, nil
-}
-
-func anyToInt(v any) (int, error) {
- switch vNum := v.(type) {
- case int:
- return vNum, nil
- case float64:
- return int(vNum), nil
- case json.Number:
- vInt64, err := vNum.Int64()
- if err != nil {
- return 0, err
- }
- return int(vInt64), nil
- default:
- return 0, fmt.Errorf("type=%T not convertable to int", v)
- }
-}
diff --git a/pipeline/doif/field_op.go b/pipeline/doif/field_op.go
deleted file mode 100644
index 589f674ff..000000000
--- a/pipeline/doif/field_op.go
+++ /dev/null
@@ -1,419 +0,0 @@
-package doif
-
-import (
- "bytes"
- "errors"
- "fmt"
- "regexp"
- "slices"
-
- "github.com/ozontech/file.d/cfg"
- insaneJSON "github.com/ozontech/insane-json"
-)
-
-// ! do-if-field-op
-// ^ do-if-field-op
-
-type fieldOpType int
-
-const (
- fieldUnknownOp fieldOpType = iota
- fieldEqualOp
- fieldContainsOp
- fieldPrefixOp
- fieldSuffixOp
- fieldRegexOp
-)
-
-func (t fieldOpType) String() string {
- switch t {
- case fieldEqualOp:
- return "equal"
- case fieldContainsOp:
- return "contains"
- case fieldPrefixOp:
- return "prefix"
- case fieldSuffixOp:
- return "suffix"
- case fieldRegexOp:
- return "regex"
- default:
- return "unknown"
- }
-}
-
-const (
- // > checks whether the field value is equal to one of the elements in the values list.
- // >
- // > Example:
- // > ```yaml
- // > pipelines:
- // > test:
- // > actions:
- // > - type: discard
- // > do_if:
- // > op: equal
- // > field: pod
- // > values: [test-pod-1, test-pod-2]
- // > ```
- // >
- // > result:
- // > ```
- // > {"pod":"test-pod-1","service":"test-service"} # discarded
- // > {"pod":"test-pod-2","service":"test-service-2"} # discarded
- // > {"pod":"test-pod","service":"test-service"} # not discarded
- // > {"pod":"test-pod","service":"test-service-1"} # not discarded
- // > ```
- fieldEqualOpTag = "equal" // *
-
- // > checks whether the field value contains one of the elements the in values list.
- // >
- // > Example:
- // > ```yaml
- // > pipelines:
- // > test:
- // > actions:
- // > - type: discard
- // > do_if:
- // > op: contains
- // > field: pod
- // > values: [my-pod, my-test]
- // > ```
- // >
- // > result:
- // > ```
- // > {"pod":"test-my-pod-1","service":"test-service"} # discarded
- // > {"pod":"test-not-my-pod","service":"test-service-2"} # discarded
- // > {"pod":"my-test-pod","service":"test-service"} # discarded
- // > {"pod":"test-pod","service":"test-service-1"} # not discarded
- // > ```
- fieldContainsOpTag = "contains" // *
-
- // > checks whether the field value has prefix equal to one of the elements in the values list.
- // >
- // > Example:
- // > ```yaml
- // > pipelines:
- // > test:
- // > actions:
- // > - type: discard
- // > do_if:
- // > op: prefix
- // > field: pod
- // > values: [test-1, test-2]
- // > ```
- // >
- // > result:
- // > ```
- // > {"pod":"test-1-pod-1","service":"test-service"} # discarded
- // > {"pod":"test-2-pod-2","service":"test-service-2"} # discarded
- // > {"pod":"test-pod","service":"test-service"} # not discarded
- // > {"pod":"test-pod","service":"test-service-1"} # not discarded
- // > ```
- fieldPrefixOpTag = "prefix" // *
-
- // > checks whether the field value has suffix equal to one of the elements in the values list.
- // >
- // > Example:
- // > ```yaml
- // > pipelines:
- // > test:
- // > actions:
- // > - type: discard
- // > do_if:
- // > op: suffix
- // > field: pod
- // > values: [pod-1, pod-2]
- // > ```
- // >
- // > result:
- // > ```
- // > {"pod":"test-1-pod-1","service":"test-service"} # discarded
- // > {"pod":"test-2-pod-2","service":"test-service-2"} # discarded
- // > {"pod":"test-pod","service":"test-service"} # not discarded
- // > {"pod":"test-pod","service":"test-service-1"} # not discarded
- // > ```
- fieldSuffixOpTag = "suffix" // *
-
- // > checks whether the field matches any regex from the values list.
- // >
- // > Example:
- // > ```yaml
- // > pipelines:
- // > test:
- // > actions:
- // > - type: discard
- // > do_if:
- // > op: regex
- // > field: pod
- // > values: [pod-\d, my-test.*]
- // > ```
- // >
- // > result:
- // > ```
- // > {"pod":"test-1-pod-1","service":"test-service"} # discarded
- // > {"pod":"test-2-pod-2","service":"test-service-2"} # discarded
- // > {"pod":"test-pod","service":"test-service"} # not discarded
- // > {"pod":"my-test-pod","service":"test-service-1"} # discarded
- // > {"pod":"my-test-instance","service":"test-service-1"} # discarded
- // > {"pod":"service123","service":"test-service-1"} # not discarded
- // > ```
- fieldRegexOpTag = "regex" // *
-)
-
-/*{ do-if-field-op-node
-DoIf field op node is considered to always be a leaf in the DoIf tree. It checks byte representation of the value by the given field path.
-Array and object values are considered as not matched since encoding them to bytes leads towards large CPU and memory consumption.
-
-Params:
- - `op` - value from field operations list. Required.
- - `field` - path to field in JSON tree. If empty, root value is checked. Path to nested fields is delimited by dots `"."`, e.g. `"field.subfield"` for `{"field": {"subfield": "val"}}`.
- If the field name contains dots in it they should be shielded with `"\"`, e.g. `"exception\.type"` for `{"exception.type": "example"}`. Default empty.
- - `values` - list of values to check field. Required non-empty.
- - `case_sensitive` - flag indicating whether checks are performed in case sensitive way. Default `true`.
- Note: case insensitive checks can cause CPU and memory overhead since every field value will be converted to lower letters.
-
-Example:
-```yaml
-pipelines:
- tests:
- actions:
- - type: discard
- do_if:
- op: suffix
- field: pod
- values: [pod-1, pod-2]
- case_sensitive: true
-```
-
-}*/
-
-type fieldOpNode struct {
- op fieldOpType
- fieldPath []string
- fieldPathStr string
- caseSensitive bool
- values [][]byte
- valuesBySize map[int][][]byte
- reValues []*regexp.Regexp
-
- minValLen int
- maxValLen int
-}
-
-func NewFieldOpNode(op string, field string, caseSensitive bool, values [][]byte) (Node, error) {
- if len(values) == 0 {
- return nil, errors.New("values are not provided")
- }
- var vals [][]byte
- var valsBySize map[int][][]byte
- var reValues []*regexp.Regexp
- var minValLen, maxValLen int
- var fop fieldOpType
-
- fieldPath := cfg.ParseFieldSelector(field)
-
- switch op {
- case fieldEqualOpTag:
- fop = fieldEqualOp
- case fieldContainsOpTag:
- fop = fieldContainsOp
- case fieldPrefixOpTag:
- fop = fieldPrefixOp
- case fieldSuffixOpTag:
- fop = fieldSuffixOp
- case fieldRegexOpTag:
- fop = fieldRegexOp
- reValues = make([]*regexp.Regexp, 0, len(values))
- for _, v := range values {
- re, err := regexp.Compile(string(v))
- if err != nil {
- return nil, fmt.Errorf("failed to compile regex %q: %w", v, err)
- }
- reValues = append(reValues, re)
- }
- default:
- return nil, fmt.Errorf("unknown field op %q", op)
- }
-
- if fop != fieldRegexOp {
- minValLen = len(values[0])
- maxValLen = len(values[0])
- if fop == fieldEqualOp {
- valsBySize = make(map[int][][]byte)
- } else {
- vals = make([][]byte, len(values))
- }
- for i := range values {
- var curVal []byte
- if values[i] != nil {
- curVal = make([]byte, len(values[i]))
- copy(curVal, values[i])
- }
- if !caseSensitive && curVal != nil {
- curVal = bytes.ToLower(curVal)
- }
- if len(values[i]) < minValLen {
- minValLen = len(values[i])
- }
- if len(values[i]) > maxValLen {
- maxValLen = len(values[i])
- }
- if fop == fieldEqualOp {
- valsBySize[len(curVal)] = append(valsBySize[len(curVal)], curVal)
- } else {
- vals[i] = curVal
- }
- }
- }
-
- return &fieldOpNode{
- op: fop,
- fieldPath: fieldPath,
- fieldPathStr: field,
- caseSensitive: caseSensitive,
- values: vals,
- valuesBySize: valsBySize,
- reValues: reValues,
- minValLen: minValLen,
- maxValLen: maxValLen,
- }, nil
-}
-
-func (n *fieldOpNode) Type() NodeType {
- return NodeFieldOp
-}
-
-func (n *fieldOpNode) Check(eventRoot *insaneJSON.Root) bool {
- var data []byte
- node := eventRoot.Dig(n.fieldPath...)
- if node.IsArray() || node.IsObject() {
- return false
- }
- if !node.IsNull() {
- data = node.AsBytes()
- }
- // fast check for data
- if n.op != fieldRegexOp && len(data) < n.minValLen {
- return false
- }
- switch n.op {
- case fieldEqualOp:
- vals, ok := n.valuesBySize[len(data)]
- if !ok {
- return false
- }
- if !n.caseSensitive && data != nil {
- data = bytes.ToLower(data)
- }
- for _, val := range vals {
- // null and empty strings are considered as different values
- // null can also come if field value is absent
- if (data == nil && val != nil) || (data != nil && val == nil) {
- continue
- }
- if bytes.Equal(data, val) {
- return true
- }
- }
- case fieldContainsOp:
- if !n.caseSensitive {
- data = bytes.ToLower(data)
- }
- for _, val := range n.values {
- if bytes.Contains(data, val) {
- return true
- }
- }
- case fieldPrefixOp:
- // check only necessary amount of bytes
- if len(data) > n.maxValLen {
- data = data[:n.maxValLen]
- }
- if !n.caseSensitive {
- data = bytes.ToLower(data)
- }
- for _, val := range n.values {
- if bytes.HasPrefix(data, val) {
- return true
- }
- }
- case fieldSuffixOp:
- // check only necessary amount of bytes
- if len(data) > n.maxValLen {
- data = data[len(data)-n.maxValLen:]
- }
- if !n.caseSensitive {
- data = bytes.ToLower(data)
- }
- for _, val := range n.values {
- if bytes.HasSuffix(data, val) {
- return true
- }
- }
- case fieldRegexOp:
- for _, re := range n.reValues {
- if re.Match(data) {
- return true
- }
- }
- }
- return false
-}
-
-func (n *fieldOpNode) isEqualTo(n2 Node, _ int) error {
- n2f, ok := n2.(*fieldOpNode)
- if !ok {
- return errors.New("nodes have different types expected: fieldOpNode")
- }
- if n.op != n2f.op {
- return fmt.Errorf("nodes have different op expected: %q", n.op)
- }
- if n.caseSensitive != n2f.caseSensitive {
- return fmt.Errorf("nodes have different caseSensitive expected: %v", n.caseSensitive)
- }
- if n.fieldPathStr != n2f.fieldPathStr || slices.Compare[[]string](n.fieldPath, n2f.fieldPath) != 0 {
- return fmt.Errorf("nodes have different fieldPathStr expected: fieldPathStr=%q fieldPath=%v",
- n.fieldPathStr, n.fieldPath,
- )
- }
- if len(n.values) != len(n2f.values) {
- return fmt.Errorf("nodes have different values slices len expected: %d", len(n.values))
- }
- for i := 0; i < len(n.values); i++ {
- if !bytes.Equal(n.values[i], n2f.values[i]) {
- return fmt.Errorf("nodes have different data in values expected: %v on position", n.values)
- }
- }
- if len(n.valuesBySize) != len(n2f.valuesBySize) {
- return fmt.Errorf("nodes have different valuesBySize len expected: %d", len(n.valuesBySize))
- }
- for k, v := range n.valuesBySize {
- if v2, has := n2f.valuesBySize[k]; !has {
- return fmt.Errorf("nodes have different valuesBySize keys expected key: %d", k)
- } else if len(v) != len(v2) {
- return fmt.Errorf("nodes have different valuesBySize values len under key %d expected: %d", k, len(v))
- } else {
- for i := 0; i < len(v); i++ {
- if !bytes.Equal(v[i], v2[i]) {
- return fmt.Errorf("nodes have different valuesBySize data under key %d: %v", k, v)
- }
- }
- }
- }
- if len(n.reValues) != len(n2f.reValues) {
- return fmt.Errorf("nodes have different reValues len expected: %d", len(n.reValues))
- }
- for i := 0; i < len(n.reValues); i++ {
- if n.reValues[i].String() != n2f.reValues[i].String() {
- return fmt.Errorf("nodes have different reValues data expected: %v", n.reValues)
- }
- }
- if n.minValLen != n2f.minValLen {
- return fmt.Errorf("nodes have different minValLem expected: %d", n.minValLen)
- }
- if n.maxValLen != n2f.maxValLen {
- return fmt.Errorf("nodes have different maxValLem expected: %d", n.maxValLen)
- }
- return nil
-}
diff --git a/pipeline/doif/logical_op.go b/pipeline/doif/logical_op.go
deleted file mode 100644
index 70930b077..000000000
--- a/pipeline/doif/logical_op.go
+++ /dev/null
@@ -1,225 +0,0 @@
-package doif
-
-import (
- "errors"
- "fmt"
-
- insaneJSON "github.com/ozontech/insane-json"
-)
-
-// ! do-if-logical-op
-// ^ do-if-logical-op
-
-type logicalOpType int
-
-const (
- logicalOpUnknown logicalOpType = iota
- logicalOr
- logicalAnd
- logicalNot
-)
-
-func (t logicalOpType) String() string {
- switch t {
- case logicalOr:
- return "or"
- case logicalAnd:
- return "and"
- case logicalNot:
- return "not"
- default:
- return "unknown"
- }
-}
-
-const (
- // > accepts at least one operand and returns true on the first returned true from its operands.
- // >
- // > Example:
- // > ```yaml
- // > pipelines:
- // > test:
- // > actions:
- // > - type: discard
- // > do_if:
- // > op: or
- // > operands:
- // > - op: equal
- // > field: pod
- // > values: [test-pod-1, test-pod-2]
- // > - op: equal
- // > field: service
- // > values: [test-service]
- // > ```
- // >
- // > result:
- // > ```
- // > {"pod":"test-pod-1","service":"test-service"} # discarded
- // > {"pod":"test-pod-2","service":"test-service-2"} # discarded
- // > {"pod":"test-pod","service":"test-service"} # discarded
- // > {"pod":"test-pod","service":"test-service-1"} # not discarded
- // > ```
- logicalOrTag = "or" // *
-
- // > accepts at least one operand and returns true if all operands return true
- // > (in other words returns false on the first returned false from its operands).
- // >
- // > Example:
- // > ```yaml
- // > pipelines:
- // > test:
- // > actions:
- // > - type: discard
- // > do_if:
- // > op: and
- // > operands:
- // > - op: equal
- // > field: pod
- // > values: [test-pod-1, test-pod-2]
- // > - op: equal
- // > field: service
- // > values: [test-service]
- // > ```
- // >
- // > result:
- // > ```
- // > {"pod":"test-pod-1","service":"test-service"} # discarded
- // > {"pod":"test-pod-2","service":"test-service-2"} # not discarded
- // > {"pod":"test-pod","service":"test-service"} # not discarded
- // > {"pod":"test-pod","service":"test-service-1"} # not discarded
- // > ```
- logicalAndTag = "and" // *
-
- // > accepts exactly one operand and returns inverted result of its operand.
- // >
- // > Example:
- // > ```yaml
- // > pipelines:
- // > test:
- // > actions:
- // > - type: discard
- // > do_if:
- // > op: not
- // > operands:
- // > - op: equal
- // > field: service
- // > values: [test-service]
- // > ```
- // >
- // > result:
- // > ```
- // > {"pod":"test-pod-1","service":"test-service"} # not discarded
- // > {"pod":"test-pod-2","service":"test-service-2"} # discarded
- // > {"pod":"test-pod","service":"test-service"} # not discarded
- // > {"pod":"test-pod","service":"test-service-1"} # discarded
- // > ```
- logicalNotTag = "not" // *
-)
-
-/*{ do-if-logical-op-node
-DoIf logical op node is a node considered to be the root or an edge between nodes.
-It always has at least one operand which are other nodes and calls their checks
-to apply logical operation on their results.
-
-Params:
- - `op` - value from logical operations list. Required.
- - `operands` - list of another do-if nodes. Required non-empty.
-
-Example:
-```yaml
-pipelines:
- test:
- actions:
- - type: discard
- do_if:
- op: and
- operands:
- - op: equal
- field: pod
- values: [test-pod-1, test-pod-2]
- case_sensitive: true
- - op: equal
- field: service
- values: [test-service]
- case_sensitive: true
-```
-
-}*/
-
-type logicalNode struct {
- op logicalOpType
- operands []Node
-}
-
-func NewLogicalNode(op string, operands []Node) (Node, error) {
- if len(operands) == 0 {
- return nil, errors.New("logical op must have at least one operand")
- }
- var lop logicalOpType
- switch op {
- case logicalOrTag:
- lop = logicalOr
- case logicalAndTag:
- lop = logicalAnd
- case logicalNotTag:
- lop = logicalNot
- if len(operands) > 1 {
- return nil, fmt.Errorf("logical not must have exactly one operand, got %d", len(operands))
- }
- default:
- return nil, fmt.Errorf("unknown logical op %q", op)
- }
- return &logicalNode{
- op: lop,
- operands: operands,
- }, nil
-}
-
-func (n *logicalNode) Type() NodeType {
- return NodeLogicalOp
-}
-
-func (n *logicalNode) Check(eventRoot *insaneJSON.Root) bool {
- switch n.op {
- case logicalOr:
- for _, op := range n.operands {
- if op.Check(eventRoot) {
- return true
- }
- }
- return false
- case logicalAnd:
- for _, op := range n.operands {
- if !op.Check(eventRoot) {
- return false
- }
- }
- return true
- case logicalNot:
- return !n.operands[0].Check(eventRoot)
- }
- return false
-}
-
-func (n *logicalNode) isEqualTo(n2 Node, level int) error {
- n2l, ok := n2.(*logicalNode)
- if !ok {
- return errors.New("nodes have different types expected: logicalNode")
- }
- if n.op != n2l.op {
- return fmt.Errorf("nodes have different op expected: %q", n.op)
- }
- if len(n.operands) != len(n2l.operands) {
- return fmt.Errorf("nodes have different operands len expected: %d", len(n.operands))
- }
- for i := 0; i < len(n.operands); i++ {
- if err := n.operands[i].isEqualTo(n2l.operands[i], level+1); err != nil {
- tabs := make([]byte, 0, level)
- for j := 0; j < level; j++ {
- tabs = append(tabs, '\t')
- }
- return fmt.Errorf("nodes with op %q have different operand nodes on position %d:\n%s%w", n.op, i, tabs, err)
- }
- }
- return nil
-}
diff --git a/pipeline/pipeline.go b/pipeline/pipeline.go
index f1bb39104..9c31b01bf 100644
--- a/pipeline/pipeline.go
+++ b/pipeline/pipeline.go
@@ -154,6 +154,7 @@ type Settings struct {
EventTimeout time.Duration
AntispamThreshold int
AntispamExceptions antispam.Exceptions
+ Antispam map[string]any
SourceNameMetaField string
AvgEventSize int
MaxEventSize int
@@ -214,6 +215,7 @@ func New(name string, settings *Settings, registry *prometheus.Registry, lg *zap
MetricsController: metricCtl,
MetricHolder: metricHolder,
Exceptions: settings.AntispamExceptions,
+ Config: settings.Antispam,
}),
eventLog: make([]string, 0, 128),
@@ -491,7 +493,7 @@ func (p *Pipeline) In(sourceID SourceID, sourceName string, offsets Offsets, byt
p.Error(fmt.Sprintf("cannot parse raw time %s: %v", row.Time, err))
}
}
- isSpam := p.antispamer.IsSpam(checkSourceID, checkSourceName, isNewSource, bytes, eventTime)
+ isSpam := p.antispamer.IsSpam(checkSourceID, checkSourceName, isNewSource, bytes, eventTime, meta)
if isSpam {
return EventSeqIDError
}
diff --git a/pipeline/plugin.go b/pipeline/plugin.go
index 3e0a7c749..13eb087ac 100644
--- a/pipeline/plugin.go
+++ b/pipeline/plugin.go
@@ -6,7 +6,7 @@ import (
"strings"
"github.com/ozontech/file.d/metric"
- "github.com/ozontech/file.d/pipeline/doif"
+ "github.com/ozontech/file.d/pipeline/do_if"
"go.uber.org/zap"
)
@@ -99,7 +99,7 @@ type ActionPluginStaticInfo struct {
MatchMode MatchMode
MatchInvert bool
- DoIfChecker *doif.Checker
+ DoIfChecker *do_if.Checker
}
type ActionPluginInfo struct {
diff --git a/plugin/action/mask/mask.go b/plugin/action/mask/mask.go
index af134ebdd..2d628b322 100644
--- a/plugin/action/mask/mask.go
+++ b/plugin/action/mask/mask.go
@@ -11,7 +11,7 @@ import (
"github.com/ozontech/file.d/fd"
"github.com/ozontech/file.d/metric"
"github.com/ozontech/file.d/pipeline"
- "github.com/ozontech/file.d/pipeline/doif"
+ "github.com/ozontech/file.d/pipeline/do_if"
insaneJSON "github.com/ozontech/insane-json"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
@@ -160,7 +160,7 @@ type Mask struct {
mode mode
DoIfCheckerMap map[string]any `json:"do_if"`
- DoIfChecker *doif.Checker
+ DoIfChecker *do_if.Checker
use bool
@@ -249,7 +249,7 @@ func compileMask(m *Mask, logger *zap.Logger) {
if m.DoIfCheckerMap != nil {
var err error
- m.DoIfChecker, err = doif.NewFromMap(m.DoIfCheckerMap)
+ m.DoIfChecker, err = do_if.NewFromMap(m.DoIfCheckerMap)
if err != nil {
logger.Fatal("can't init do_if for mask", zap.Error(err))
}