From 7244cb7ca8dbbe5da9e58fdf15af60c1e5d44ea0 Mon Sep 17 00:00:00 2001 From: Jeffrey Buchbinder Date: Thu, 27 Mar 2025 09:34:58 -0400 Subject: [PATCH 1/5] Initial implementation, prometheus wire codec --- codec/prometheus/codecprometheus.go | 230 +++++++++++++++++++++++ codec/prometheus/codecprometheus_test.go | 62 ++++++ modloader/modloader.go | 2 + 3 files changed, 294 insertions(+) create mode 100644 codec/prometheus/codecprometheus.go create mode 100644 codec/prometheus/codecprometheus_test.go diff --git a/codec/prometheus/codecprometheus.go b/codec/prometheus/codecprometheus.go new file mode 100644 index 0000000..5781f41 --- /dev/null +++ b/codec/prometheus/codecprometheus.go @@ -0,0 +1,230 @@ +package codecprometheus + +import ( + "context" + "fmt" + "regexp" + "strings" + "time" + + "github.com/tsaikd/gogstash/config" + "github.com/tsaikd/gogstash/config/goglog" + "github.com/tsaikd/gogstash/config/logevent" +) + +// ModuleName is the name used in config file +const ModuleName = "prometheus" + +// ErrorTag tag added to event when process module failed +const ErrorTag = "gogstash_codec_prometheus_error" + +// Codec default struct for codec +type Codec struct { + config.CodecConfig +} + +var ( + nameMatch = `(\S+)\{([^\}]+)\}` + nameMatchRegex *regexp.Regexp + fullStringMatch = `(\S+)\{([^\}]+)\}\s+(.+)` + fullStringMatchRegex *regexp.Regexp +) + +// InitHandler initialize the codec plugin +func InitHandler(context.Context, config.ConfigRaw) (config.TypeCodecConfig, error) { + var err error + nameMatchRegex, err = regexp.Compile(nameMatch) + if err != nil { + goto INITPCODEC + } + fullStringMatchRegex, err = regexp.Compile(fullStringMatch) +INITPCODEC: + return &Codec{ + CodecConfig: config.CodecConfig{ + CommonConfig: config.CommonConfig{ + Type: ModuleName, + }, + }, + }, err +} + +// Decode returns an event from 'data' as JSON format, adding provided 'eventExtra' +func (c *Codec) Decode(ctx context.Context, data any, eventExtra map[string]any, tags []string, msgChan chan<- logevent.LogEvent) (ok bool, err error) { + event := logevent.LogEvent{ + Timestamp: time.Now(), + Extra: eventExtra, + } + if eventExtra == nil { + event.Extra = map[string]any{} + } + event.AddTag(tags...) + + datastr := "" + switch data.(type) { + case string: + datastr = data.(string) + case []byte: + datastr = string(data.([]byte)) + default: + err = config.ErrDecodeData + event.AddTag(ErrorTag) + goglog.Logger.Error(err) + return false, err + } + + // Split into individual metrics + + // Convert to string, make sure we have a \n suffix + if !strings.HasSuffix(datastr, "\n") { + datastr += "\n" + } + + lines := strings.Split(datastr, "\n") + savetype := map[string]string{} + for _, line := range lines { + if len(strings.TrimSpace(line)) == 0 { + continue + } + + //fmt.Printf("Decode line = '%s'\n", line) + // Preserve formatted type definitions + if strings.HasPrefix(line, "# TYPE") { + p := strings.Split(line, " ") + if len(p) < 4 { + goglog.Logger.Error(fmt.Errorf("prometheus: TYPE line: %s", line)) + continue + } + savetype[p[2]] = p[3] + } + // Skip line if comment, we don't process them further + if strings.HasPrefix(line, "#") { + continue + } + + // Decode lines, every metric is an possible individual event + { + e := event + err = c.decodePrometheusEvent(line, savetype, &e) + if err == nil { + //fmt.Printf("event = %#v\n", e) + msgChan <- e + ok = true + } + } + + } + + return ok, err +} + +func (c *Codec) DecodeEvent(msg []byte, event *logevent.LogEvent) (err error) { + return fmt.Errorf("unimplemented") +} + +// decodePrometheusEvent decodes 'data' as prometheus format to event +func (c *Codec) decodePrometheusEvent(line string, typeref map[string]string, event *logevent.LogEvent) (err error) { + //fmt.Printf("decodePrometheusEvent line = '%s'\n", line) + // If the pointer is empty, raise error + if event == nil { + goglog.Logger.Errorf("Provided DecodeEvent target event pointer is nil") + return config.ErrDecodeNilTarget + } + + if event.Timestamp.IsZero() { + event.Timestamp = time.Now() + } + + event.Message = line + + p := strings.Split(line, " ") + if len(p) < 2 { + return + } + name := p[0] + value := strings.TrimSpace(strings.Join(p[1:], " ")) + + // Detect bracket format + if !fullStringMatchRegex.Match([]byte(line)) { + //fmt.Printf("Does not match regex!!!\n") + // Deal with straight name/value pair + event.Extra["name"] = name + event.Extra["value"] = value + t, ok := typeref[name] + if ok { + event.Extra["type"] = t + } + c.populateEventExtras(event) + return + } + + m := fullStringMatchRegex.FindStringSubmatch(line) + if len(m) < 4 { + //fmt.Printf("m[] = %#v\n", m) + goglog.Logger.Errorf("incorrect bracket format") + return config.ErrorInitCodecFailed1 + } + //fmt.Printf("m = %#v\n", m) + + name = strings.Split(line, "{")[0] + event.Extra["name"] = name + event.Extra["value"] = strings.Split(line, "}")[1] + t, ok := typeref[name] + if ok { + event.Extra["type"] = t + } + + dimensions := map[string]string{} + for _, d := range strings.Split(m[2], ",") { + kv := strings.Split(d, "=") + if len(kv) < 2 { + continue + } + dimensions[strings.ToLower(kv[0])] = strings.ReplaceAll(kv[1], `"`, "") + } + event.Extra["dimensions"] = dimensions + + return +} + +// Encode encodes the event to a JSON encoded message +func (c *Codec) Encode(ctx context.Context, event logevent.LogEvent, dataChan chan<- []byte) (ok bool, err error) { + output, err := event.MarshalJSON() + if err != nil { + return false, err + } + select { + case <-ctx.Done(): + return false, nil + case dataChan <- output: + } + return true, nil +} + +func (c *Codec) populateEventExtras(event *logevent.LogEvent) { + if event.Extra != nil { + // try to fill basic log event by json message + if value, ok := event.Extra["message"]; ok { + switch v := value.(type) { + case string: + event.Message = v + delete(event.Extra, "message") + } + } + if value, ok := event.Extra["@timestamp"]; ok { + switch v := value.(type) { + case string: + if timestamp, err2 := time.Parse(time.RFC3339Nano, v); err2 == nil { + event.Timestamp = timestamp + delete(event.Extra, "@timestamp") + } + } + } + if value, ok := event.Extra[logevent.TagsField]; ok { + if event.ParseTags(value) { + delete(event.Extra, logevent.TagsField) + } else { + goglog.Logger.Warnf("malformed tags: %v", value) + } + } + } +} diff --git a/codec/prometheus/codecprometheus_test.go b/codec/prometheus/codecprometheus_test.go new file mode 100644 index 0000000..2fdfc20 --- /dev/null +++ b/codec/prometheus/codecprometheus_test.go @@ -0,0 +1,62 @@ +package codecprometheus + +import ( + "context" + "testing" + + "github.com/sirupsen/logrus" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/tsaikd/gogstash/config/goglog" + "github.com/tsaikd/gogstash/config/logevent" +) + +func init() { + goglog.Logger.SetLevel(logrus.DebugLevel) +} + +func TestDecode(t *testing.T) { + assert := assert.New(t) + assert.NotNil(assert) + require := require.New(t) + require.NotNil(require) + + ctx := context.Background() + codec, err := InitHandler(ctx, nil) + require.NoError(err) + + msgChan := make(chan logevent.LogEvent, 30) + emptyTags := []string{} + + ok, err := codec.Decode(ctx, []byte(`# +# HELP http_requests_total The total number of HTTP requests. +# TYPE http_requests_total counter +http_requests_total{method="post",code="200"} 1027 1395066363000 +http_requests_total{method="post",code="400"} 3 1395066363000 + +# Escaping in label values: +msdos_file_access_time_seconds{path="C:\\DIR\\FILE.TXT",error="Cannot find file:\n\"FILE.TXT\""} 1.458255915e9 + +# Minimalistic line: +metric_without_timestamp_and_labels 12.47 + +# A weird metric from before the epoch: +something_weird{problem="division by zero"} +Inf -3982045 + +# A histogram, which has a pretty complex representation in the text format: +# HELP http_request_duration_seconds A histogram of the request duration. +# TYPE http_request_duration_seconds histogram +http_request_duration_seconds_bucket{le="0.05"} 24054 +http_request_duration_seconds_bucket{le="0.1"} 33444 +http_request_duration_seconds_bucket{le="0.2"} 100392 +http_request_duration_seconds_bucket{le="0.5"} 129389 +http_request_duration_seconds_bucket{le="1"} 133988 +http_request_duration_seconds_bucket{le="+Inf"} 144320 +http_request_duration_seconds_sum 53423 +http_request_duration_seconds_count 144320 +`), nil, emptyTags, msgChan) + require.NoError(err) + assert.True(ok) + require.Len(msgChan, 13) +} diff --git a/modloader/modloader.go b/modloader/modloader.go index 68e3816..66de60a 100644 --- a/modloader/modloader.go +++ b/modloader/modloader.go @@ -3,6 +3,7 @@ package modloader import ( codecazureeventhubjson "github.com/tsaikd/gogstash/codec/azureeventhubjson" codecjson "github.com/tsaikd/gogstash/codec/json" + codecprometheus "github.com/tsaikd/gogstash/codec/prometheus" "github.com/tsaikd/gogstash/config" filteraddfield "github.com/tsaikd/gogstash/filter/addfield" filtercond "github.com/tsaikd/gogstash/filter/cond" @@ -115,4 +116,5 @@ func init() { config.RegistCodecHandler(config.DefaultCodecName, config.DefaultCodecInitHandler) config.RegistCodecHandler(codecjson.ModuleName, codecjson.InitHandler) config.RegistCodecHandler(codecazureeventhubjson.ModuleName, codecazureeventhubjson.InitHandler) + config.RegistCodecHandler(codecprometheus.ModuleName, codecprometheus.InitHandler) } From d8fb45b358de82caea92b409f31fe87c0cdf820c Mon Sep 17 00:00:00 2001 From: Jeffrey Buchbinder Date: Fri, 28 Mar 2025 10:39:36 -0400 Subject: [PATCH 2/5] Linter --- codec/prometheus/codecprometheus.go | 42 ++++++++++++++--------------- 1 file changed, 20 insertions(+), 22 deletions(-) diff --git a/codec/prometheus/codecprometheus.go b/codec/prometheus/codecprometheus.go index 5781f41..26d5fbf 100644 --- a/codec/prometheus/codecprometheus.go +++ b/codec/prometheus/codecprometheus.go @@ -24,8 +24,8 @@ type Codec struct { } var ( - nameMatch = `(\S+)\{([^\}]+)\}` - nameMatchRegex *regexp.Regexp + // nameMatch = `(\S+)\{([^\}]+)\}` + // nameMatchRegex *regexp.Regexp fullStringMatch = `(\S+)\{([^\}]+)\}\s+(.+)` fullStringMatchRegex *regexp.Regexp ) @@ -33,12 +33,12 @@ var ( // InitHandler initialize the codec plugin func InitHandler(context.Context, config.ConfigRaw) (config.TypeCodecConfig, error) { var err error - nameMatchRegex, err = regexp.Compile(nameMatch) - if err != nil { - goto INITPCODEC - } + // nameMatchRegex, err = regexp.Compile(nameMatch) + // if err != nil { + // goto INITPCODEC + // } fullStringMatchRegex, err = regexp.Compile(fullStringMatch) -INITPCODEC: + // INITPCODEC: return &Codec{ CodecConfig: config.CodecConfig{ CommonConfig: config.CommonConfig{ @@ -60,11 +60,11 @@ func (c *Codec) Decode(ctx context.Context, data any, eventExtra map[string]any, event.AddTag(tags...) datastr := "" - switch data.(type) { + switch data := data.(type) { case string: - datastr = data.(string) + datastr = data case []byte: - datastr = string(data.([]byte)) + datastr = string(data) default: err = config.ErrDecodeData event.AddTag(ErrorTag) @@ -86,7 +86,7 @@ func (c *Codec) Decode(ctx context.Context, data any, eventExtra map[string]any, continue } - //fmt.Printf("Decode line = '%s'\n", line) + // fmt.Printf("Decode line = '%s'\n", line) // Preserve formatted type definitions if strings.HasPrefix(line, "# TYPE") { p := strings.Split(line, " ") @@ -106,14 +106,12 @@ func (c *Codec) Decode(ctx context.Context, data any, eventExtra map[string]any, e := event err = c.decodePrometheusEvent(line, savetype, &e) if err == nil { - //fmt.Printf("event = %#v\n", e) + // fmt.Printf("event = %#v\n", e) msgChan <- e ok = true } } - } - return ok, err } @@ -123,7 +121,7 @@ func (c *Codec) DecodeEvent(msg []byte, event *logevent.LogEvent) (err error) { // decodePrometheusEvent decodes 'data' as prometheus format to event func (c *Codec) decodePrometheusEvent(line string, typeref map[string]string, event *logevent.LogEvent) (err error) { - //fmt.Printf("decodePrometheusEvent line = '%s'\n", line) + // fmt.Printf("decodePrometheusEvent line = '%s'\n", line) // If the pointer is empty, raise error if event == nil { goglog.Logger.Errorf("Provided DecodeEvent target event pointer is nil") @@ -138,14 +136,14 @@ func (c *Codec) decodePrometheusEvent(line string, typeref map[string]string, ev p := strings.Split(line, " ") if len(p) < 2 { - return + return err } name := p[0] value := strings.TrimSpace(strings.Join(p[1:], " ")) // Detect bracket format - if !fullStringMatchRegex.Match([]byte(line)) { - //fmt.Printf("Does not match regex!!!\n") + if !fullStringMatchRegex.MatchString(line) { + // fmt.Printf("Does not match regex!!!\n") // Deal with straight name/value pair event.Extra["name"] = name event.Extra["value"] = value @@ -154,16 +152,16 @@ func (c *Codec) decodePrometheusEvent(line string, typeref map[string]string, ev event.Extra["type"] = t } c.populateEventExtras(event) - return + return err } m := fullStringMatchRegex.FindStringSubmatch(line) if len(m) < 4 { - //fmt.Printf("m[] = %#v\n", m) + // fmt.Printf("m[] = %#v\n", m) goglog.Logger.Errorf("incorrect bracket format") return config.ErrorInitCodecFailed1 } - //fmt.Printf("m = %#v\n", m) + // fmt.Printf("m = %#v\n", m) name = strings.Split(line, "{")[0] event.Extra["name"] = name @@ -183,7 +181,7 @@ func (c *Codec) decodePrometheusEvent(line string, typeref map[string]string, ev } event.Extra["dimensions"] = dimensions - return + return err } // Encode encodes the event to a JSON encoded message From 33fd3690191fd05e0dec430e3760d78f4f14c8b5 Mon Sep 17 00:00:00 2001 From: Jeffrey Buchbinder Date: Fri, 28 Mar 2025 10:42:27 -0400 Subject: [PATCH 3/5] codec[prometheus]: Linter, remove unused commented code --- codec/prometheus/codecprometheus.go | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/codec/prometheus/codecprometheus.go b/codec/prometheus/codecprometheus.go index 26d5fbf..4587d38 100644 --- a/codec/prometheus/codecprometheus.go +++ b/codec/prometheus/codecprometheus.go @@ -24,8 +24,6 @@ type Codec struct { } var ( - // nameMatch = `(\S+)\{([^\}]+)\}` - // nameMatchRegex *regexp.Regexp fullStringMatch = `(\S+)\{([^\}]+)\}\s+(.+)` fullStringMatchRegex *regexp.Regexp ) @@ -33,12 +31,7 @@ var ( // InitHandler initialize the codec plugin func InitHandler(context.Context, config.ConfigRaw) (config.TypeCodecConfig, error) { var err error - // nameMatchRegex, err = regexp.Compile(nameMatch) - // if err != nil { - // goto INITPCODEC - // } fullStringMatchRegex, err = regexp.Compile(fullStringMatch) - // INITPCODEC: return &Codec{ CodecConfig: config.CodecConfig{ CommonConfig: config.CommonConfig{ @@ -106,7 +99,6 @@ func (c *Codec) Decode(ctx context.Context, data any, eventExtra map[string]any, e := event err = c.decodePrometheusEvent(line, savetype, &e) if err == nil { - // fmt.Printf("event = %#v\n", e) msgChan <- e ok = true } @@ -157,11 +149,9 @@ func (c *Codec) decodePrometheusEvent(line string, typeref map[string]string, ev m := fullStringMatchRegex.FindStringSubmatch(line) if len(m) < 4 { - // fmt.Printf("m[] = %#v\n", m) goglog.Logger.Errorf("incorrect bracket format") return config.ErrorInitCodecFailed1 } - // fmt.Printf("m = %#v\n", m) name = strings.Split(line, "{")[0] event.Extra["name"] = name From cecc05905f0324f1e97dfe2b04cf04182f2356a9 Mon Sep 17 00:00:00 2001 From: Jeffrey Buchbinder Date: Mon, 31 Mar 2025 12:58:15 -0400 Subject: [PATCH 4/5] Convert values to floats (as per protocol) --- codec/prometheus/codecprometheus.go | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/codec/prometheus/codecprometheus.go b/codec/prometheus/codecprometheus.go index 4587d38..835de9a 100644 --- a/codec/prometheus/codecprometheus.go +++ b/codec/prometheus/codecprometheus.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "regexp" + "strconv" "strings" "time" @@ -138,7 +139,10 @@ func (c *Codec) decodePrometheusEvent(line string, typeref map[string]string, ev // fmt.Printf("Does not match regex!!!\n") // Deal with straight name/value pair event.Extra["name"] = name - event.Extra["value"] = value + event.Extra["value"], err = strconv.ParseFloat(value, 64) + if err != nil { + event.Extra["value"] = value + } t, ok := typeref[name] if ok { event.Extra["type"] = t @@ -155,7 +159,10 @@ func (c *Codec) decodePrometheusEvent(line string, typeref map[string]string, ev name = strings.Split(line, "{")[0] event.Extra["name"] = name - event.Extra["value"] = strings.Split(line, "}")[1] + event.Extra["value"], err = strconv.ParseFloat(strings.Split(line, "}")[1], 64) + if err != nil { + event.Extra["value"] = strings.Split(line, "}")[1] + } t, ok := typeref[name] if ok { event.Extra["type"] = t From 1122cf0dadd07866915b06a76d8c8720f40e74e6 Mon Sep 17 00:00:00 2001 From: Jeff Buchbinder Date: Tue, 1 Apr 2025 12:20:10 -0400 Subject: [PATCH 5/5] Update codec/prometheus/codecprometheus.go Co-authored-by: Tsai KD --- codec/prometheus/codecprometheus.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/codec/prometheus/codecprometheus.go b/codec/prometheus/codecprometheus.go index 835de9a..fb18237 100644 --- a/codec/prometheus/codecprometheus.go +++ b/codec/prometheus/codecprometheus.go @@ -100,8 +100,12 @@ func (c *Codec) Decode(ctx context.Context, data any, eventExtra map[string]any, e := event err = c.decodePrometheusEvent(line, savetype, &e) if err == nil { - msgChan <- e - ok = true + select { + case <-ctx.Done(): + // error handling ... + case msgChan <- e: + ok = true + } } } }