Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
229 changes: 229 additions & 0 deletions codec/prometheus/codecprometheus.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
package codecprometheus

import (
"context"
"fmt"
"regexp"
"strconv"
"strings"
"time"

"github.com/tsaikd/gogstash/config"
"github.com/tsaikd/gogstash/config/goglog"
"github.com/tsaikd/gogstash/config/logevent"
)

// ModuleName is the name used in config file
const ModuleName = "prometheus"

// ErrorTag tag added to event when process module failed
const ErrorTag = "gogstash_codec_prometheus_error"

// Codec default struct for codec
type Codec struct {
config.CodecConfig
}

var (
fullStringMatch = `(\S+)\{([^\}]+)\}\s+(.+)`
fullStringMatchRegex *regexp.Regexp
)

// InitHandler initialize the codec plugin
func InitHandler(context.Context, config.ConfigRaw) (config.TypeCodecConfig, error) {
var err error
fullStringMatchRegex, err = regexp.Compile(fullStringMatch)
return &Codec{
CodecConfig: config.CodecConfig{
CommonConfig: config.CommonConfig{
Type: ModuleName,
},
},
}, err
}

// Decode returns an event from 'data' as JSON format, adding provided 'eventExtra'
func (c *Codec) Decode(ctx context.Context, data any, eventExtra map[string]any, tags []string, msgChan chan<- logevent.LogEvent) (ok bool, err error) {
event := logevent.LogEvent{
Timestamp: time.Now(),
Extra: eventExtra,
}
if eventExtra == nil {
event.Extra = map[string]any{}
}
event.AddTag(tags...)

datastr := ""
switch data := data.(type) {
case string:
datastr = data
case []byte:
datastr = string(data)
default:
err = config.ErrDecodeData
event.AddTag(ErrorTag)
goglog.Logger.Error(err)
return false, err
}

// Split into individual metrics

// Convert to string, make sure we have a \n suffix
if !strings.HasSuffix(datastr, "\n") {
datastr += "\n"
}

lines := strings.Split(datastr, "\n")
savetype := map[string]string{}
for _, line := range lines {
if len(strings.TrimSpace(line)) == 0 {
continue
}

// fmt.Printf("Decode line = '%s'\n", line)
// Preserve formatted type definitions
if strings.HasPrefix(line, "# TYPE") {
p := strings.Split(line, " ")
if len(p) < 4 {
goglog.Logger.Error(fmt.Errorf("prometheus: TYPE line: %s", line))
continue
}
savetype[p[2]] = p[3]
}
// Skip line if comment, we don't process them further
if strings.HasPrefix(line, "#") {
continue
}

// Decode lines, every metric is an possible individual event
{
e := event
err = c.decodePrometheusEvent(line, savetype, &e)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

	e1 := logevent.LogEvent{
		Extra: map[string]any{
			"foo": "bar",
		},
	}
	e2 := e1
	e2.Extra["foo"] = "omg" // e1.Extra["foo"] is also assigned to "omg"

Is it what you want?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's supposed to be a copy, not a reference. Rather than iteratively instantiating logevent.LogEvent{} objects, I was just trying to copy them and then modify then with the actual events.

if err == nil {
select {
case <-ctx.Done():
// error handling ...
Copy link
Owner

@tsaikd tsaikd Apr 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you want to continue processing when ctx has been canceled?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably not, since it would indicate the connection being closed.

case msgChan <- e:
ok = true
}
}
}
}
return ok, err
}

func (c *Codec) DecodeEvent(msg []byte, event *logevent.LogEvent) (err error) {
return fmt.Errorf("unimplemented")
}

// decodePrometheusEvent decodes 'data' as prometheus format to event
func (c *Codec) decodePrometheusEvent(line string, typeref map[string]string, event *logevent.LogEvent) (err error) {
// fmt.Printf("decodePrometheusEvent line = '%s'\n", line)
// If the pointer is empty, raise error
if event == nil {
goglog.Logger.Errorf("Provided DecodeEvent target event pointer is nil")
return config.ErrDecodeNilTarget
}

if event.Timestamp.IsZero() {
event.Timestamp = time.Now()
}

event.Message = line

p := strings.Split(line, " ")
if len(p) < 2 {
return err
}
name := p[0]
value := strings.TrimSpace(strings.Join(p[1:], " "))

// Detect bracket format
if !fullStringMatchRegex.MatchString(line) {
// fmt.Printf("Does not match regex!!!\n")
// Deal with straight name/value pair
event.Extra["name"] = name
event.Extra["value"], err = strconv.ParseFloat(value, 64)
if err != nil {
event.Extra["value"] = value
}
t, ok := typeref[name]
if ok {
event.Extra["type"] = t
}
c.populateEventExtras(event)
return err
}

m := fullStringMatchRegex.FindStringSubmatch(line)
if len(m) < 4 {
goglog.Logger.Errorf("incorrect bracket format")
return config.ErrorInitCodecFailed1
}

name = strings.Split(line, "{")[0]
event.Extra["name"] = name
event.Extra["value"], err = strconv.ParseFloat(strings.Split(line, "}")[1], 64)
if err != nil {
event.Extra["value"] = strings.Split(line, "}")[1]
}
t, ok := typeref[name]
if ok {
event.Extra["type"] = t
}

dimensions := map[string]string{}
for _, d := range strings.Split(m[2], ",") {
kv := strings.Split(d, "=")
if len(kv) < 2 {
continue
}
dimensions[strings.ToLower(kv[0])] = strings.ReplaceAll(kv[1], `"`, "")
}
event.Extra["dimensions"] = dimensions

return err
}

// Encode encodes the event to a JSON encoded message
func (c *Codec) Encode(ctx context.Context, event logevent.LogEvent, dataChan chan<- []byte) (ok bool, err error) {
output, err := event.MarshalJSON()
if err != nil {
return false, err
}
select {
case <-ctx.Done():
return false, nil
case dataChan <- output:
}
return true, nil
}

func (c *Codec) populateEventExtras(event *logevent.LogEvent) {
if event.Extra != nil {
// try to fill basic log event by json message
if value, ok := event.Extra["message"]; ok {
switch v := value.(type) {
case string:
event.Message = v
delete(event.Extra, "message")
}
}
if value, ok := event.Extra["@timestamp"]; ok {
switch v := value.(type) {
case string:
if timestamp, err2 := time.Parse(time.RFC3339Nano, v); err2 == nil {
event.Timestamp = timestamp
delete(event.Extra, "@timestamp")
}
}
}
if value, ok := event.Extra[logevent.TagsField]; ok {
if event.ParseTags(value) {
delete(event.Extra, logevent.TagsField)
} else {
goglog.Logger.Warnf("malformed tags: %v", value)
}
}
}
}
62 changes: 62 additions & 0 deletions codec/prometheus/codecprometheus_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package codecprometheus

import (
"context"
"testing"

"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/tsaikd/gogstash/config/goglog"
"github.com/tsaikd/gogstash/config/logevent"
)

func init() {
goglog.Logger.SetLevel(logrus.DebugLevel)
}

func TestDecode(t *testing.T) {
assert := assert.New(t)
assert.NotNil(assert)
require := require.New(t)
require.NotNil(require)

ctx := context.Background()
codec, err := InitHandler(ctx, nil)
require.NoError(err)

msgChan := make(chan logevent.LogEvent, 30)
emptyTags := []string{}

ok, err := codec.Decode(ctx, []byte(`#
# HELP http_requests_total The total number of HTTP requests.
# TYPE http_requests_total counter
http_requests_total{method="post",code="200"} 1027 1395066363000
http_requests_total{method="post",code="400"} 3 1395066363000

# Escaping in label values:
msdos_file_access_time_seconds{path="C:\\DIR\\FILE.TXT",error="Cannot find file:\n\"FILE.TXT\""} 1.458255915e9

# Minimalistic line:
metric_without_timestamp_and_labels 12.47

# A weird metric from before the epoch:
something_weird{problem="division by zero"} +Inf -3982045

# A histogram, which has a pretty complex representation in the text format:
# HELP http_request_duration_seconds A histogram of the request duration.
# TYPE http_request_duration_seconds histogram
http_request_duration_seconds_bucket{le="0.05"} 24054
http_request_duration_seconds_bucket{le="0.1"} 33444
http_request_duration_seconds_bucket{le="0.2"} 100392
http_request_duration_seconds_bucket{le="0.5"} 129389
http_request_duration_seconds_bucket{le="1"} 133988
http_request_duration_seconds_bucket{le="+Inf"} 144320
http_request_duration_seconds_sum 53423
http_request_duration_seconds_count 144320
`), nil, emptyTags, msgChan)
require.NoError(err)
assert.True(ok)
require.Len(msgChan, 13)
}
2 changes: 2 additions & 0 deletions modloader/modloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package modloader
import (
codecazureeventhubjson "github.com/tsaikd/gogstash/codec/azureeventhubjson"
codecjson "github.com/tsaikd/gogstash/codec/json"
codecprometheus "github.com/tsaikd/gogstash/codec/prometheus"
"github.com/tsaikd/gogstash/config"
filteraddfield "github.com/tsaikd/gogstash/filter/addfield"
filtercond "github.com/tsaikd/gogstash/filter/cond"
Expand Down Expand Up @@ -115,4 +116,5 @@ func init() {
config.RegistCodecHandler(config.DefaultCodecName, config.DefaultCodecInitHandler)
config.RegistCodecHandler(codecjson.ModuleName, codecjson.InitHandler)
config.RegistCodecHandler(codecazureeventhubjson.ModuleName, codecazureeventhubjson.InitHandler)
config.RegistCodecHandler(codecprometheus.ModuleName, codecprometheus.InitHandler)
}
Loading