Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions .chloggen/tcp.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. crosslink)
component: tcp

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add support for the tcp stanza

# One or more tracking issues related to the change
issues: [64]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ require (
github.com/hashicorp/go-version v1.8.0 // indirect
github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect
github.com/jonboulle/clockwork v0.5.0 // indirect
github.com/jpillora/backoff v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/compress v1.18.4 // indirect
github.com/knadh/koanf/maps v0.1.2 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs
github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM=
github.com/jonboulle/clockwork v0.5.0 h1:Hyh9A8u51kptdkR+cqRpT1EebBwTn1oK9YfGYbdFz6I=
github.com/jonboulle/clockwork v0.5.0/go.mod h1:3mZlmanh0g2NDKO5TWZVJAfofYk64M7XN3SzBPjZF60=
github.com/jpillora/backoff v1.0.0 h1:uvFg412JmmHBHw7iwprIxkPMI+sGQ4kzOWsMeHnm2EA=
github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4=
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
github.com/klauspost/compress v1.18.4 h1:RPhnKRAQ4Fh8zU2FY/6ZFDwTVTxgJ/EMydqSTzE9a2c=
Expand Down
19 changes: 19 additions & 0 deletions internal/collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"os"
"path/filepath"

"github.com/splunk/tarunner/internal/receiver/tcpreceiver"

"github.com/splunk/tarunner/internal/receiver/wineventlogreceiver"

"go.opentelemetry.io/collector/component"
Expand Down Expand Up @@ -205,6 +207,23 @@ func createReceiver(baseDir string, next consumer.Logs, input conf.Input, transf
},
next)
return l, err
case "tcp":
f := tcpreceiver.NewFactory()
l, err := f.CreateLogs(context.Background(), receiver.Settings{
ID: component.MustNewIDWithName(f.Type().String(), parsed.Path),
TelemetrySettings: component.TelemetrySettings{
Logger: logger,
MeterProvider: meterProvider,
TracerProvider: tracerProvider,
},
}, tcpreceiver.Config{
Input: input,
BaseDir: baseDir,
Transforms: transforms,
Props: props,
},
next)
return l, err
default:
return nil, fmt.Errorf("unsupported scheme %q", parsed.Scheme)
}
Expand Down
31 changes: 31 additions & 0 deletions internal/collector/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package collector

import (
"context"
"net"
"path/filepath"
"testing"
"time"
Expand Down Expand Up @@ -174,3 +175,33 @@ func TestReadTransforms(t *testing.T) {
})
}
}

func TestUseTCP(t *testing.T) {
rootDir := filepath.Join("testdata", "tcp")
logsSink := &consumertest.LogsSink{}
cfg := otlpreceiver.NewFactory().CreateDefaultConfig().(*otlpreceiver.Config)
cfg.HTTP.GetOrInsertDefault().ServerConfig.NetAddr.Endpoint = "localhost:1342"
rcvr, err := otlpreceiver.NewFactory().CreateLogs(context.Background(), receivertest.NewNopSettings(otlpreceiver.NewFactory().Type()), cfg, logsSink)
require.NoError(t, err)
err = rcvr.Start(context.Background(), componenttest.NewNopHost())
require.NoError(t, err)
defer func() {
_ = rcvr.Shutdown(context.Background())
}()
cancel, err := Run(rootDir, "http://localhost:1342")
require.NoError(t, err)
defer cancel()

conn, err := net.Dial("tcp", "localhost:4000")
require.NoError(t, err)
_, err = conn.Write([]byte("test"))
require.NoError(t, err)
err = conn.Close()
require.NoError(t, err)

require.EventuallyWithT(t, func(tt *assert.CollectT) {
require.GreaterOrEqual(tt, logsSink.LogRecordCount(), 1)
lr := logsSink.AllLogs()[0].ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0)
assert.Equal(tt, "test", lr.Body().Str())
}, 2*time.Second, 10*time.Millisecond)
}
1 change: 1 addition & 0 deletions internal/collector/testdata/tcp/default/inputs.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
[tcp://localhost:4000]
23 changes: 23 additions & 0 deletions internal/receiver/tcpreceiver/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// Copyright Splunk, Inc.
// SPDX-License-Identifier: Apache-2.0

package tcpreceiver

import (
"net/url"

"github.com/splunk/tarunner/internal/conf"
)

type Config struct {
Transforms []conf.Transform `mapstructure:"-"`
Props []conf.Prop `mapstructure:"-"`

BaseDir string `mapstructure:"-"`
Input conf.Input `mapstructure:"-"`
}

func (cfg *Config) Validate() error {
_, err := url.Parse(cfg.Input.Configuration.Stanza.Name)
return err
}
4 changes: 4 additions & 0 deletions internal/receiver/tcpreceiver/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
// Copyright Splunk, Inc.
// SPDX-License-Identifier: Apache-2.0

package tcpreceiver
14 changes: 14 additions & 0 deletions internal/receiver/tcpreceiver/factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
// Copyright Splunk, Inc.
// SPDX-License-Identifier: Apache-2.0

package tcpreceiver

import (
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/adapter"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/receiver"
)

func NewFactory() receiver.Factory {
return adapter.NewFactory(monitor{}, component.StabilityLevelAlpha)
}
14 changes: 14 additions & 0 deletions internal/receiver/tcpreceiver/package_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
// Copyright Splunk, Inc.
// SPDX-License-Identifier: Apache-2.0

package tcpreceiver

import (
"testing"

"go.uber.org/goleak"
)

func TestMain(m *testing.M) {
goleak.VerifyTestMain(m)
}
120 changes: 120 additions & 0 deletions internal/receiver/tcpreceiver/receiver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
// Copyright Splunk, Inc.
// SPDX-License-Identifier: Apache-2.0

package tcpreceiver

import (
"net/url"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/input/tcp"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/transformer/move"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/entry"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/transformer/noop"

"github.com/splunk/tarunner/internal/operator/prop"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/adapter"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper"
"go.opentelemetry.io/collector/component"
)

type monitor struct{}

// Type is the receiver type
func (monitor) Type() component.Type {
return component.MustNewType("tcp")
}

// CreateDefaultConfig creates a config with type and version
func (monitor) CreateDefaultConfig() component.Config {
return createDefaultConfig()
}

func createDefaultConfig() *Config {
return &Config{}
}

// BaseConfig gets the base config from config
func (monitor) BaseConfig(cfg component.Config) adapter.BaseConfig {
rcfg := cfg.(Config)
var operators []operator.Config

for _, p := range rcfg.Props {
ops := prop.CreateOperatorConfigs(p, rcfg.Transforms)
operators = append(operators, ops...)
}

endNoop := noop.NewConfigWithID("end")

metadata := renameMetadata()
endNoop.OutputIDs = []string{metadata[0].ID()}
operators = append(operators, operator.NewConfig(endNoop))
operators = append(operators, metadata...)

return adapter.BaseConfig{
Operators: operators,
}
}

func (t monitor) InputConfig(config component.Config) operator.Config {
rcfg := config.(Config)
oc := tcp.NewConfig()
listenAddress, _ := url.Parse(rcfg.Input.Configuration.Stanza.Name)
oc.ListenAddress = listenAddress.Host
oc.Attributes = map[string]helper.ExprStringConfig{}
if hostParam := rcfg.Input.Configuration.Stanza.Params.Get("host"); hostParam != nil {
// TODO: find a way to run host detection when requested.
oc.Attributes["host"] = helper.ExprStringConfig(hostParam.Value)
}

if indexParam := rcfg.Input.Configuration.Stanza.Params.Get("index"); indexParam != nil {
oc.Attributes["index"] = helper.ExprStringConfig(indexParam.Value)
}

if sourceTypeParam := rcfg.Input.Configuration.Stanza.Params.Get("sourcetype"); sourceTypeParam != nil {
oc.Attributes["sourcetype"] = helper.ExprStringConfig(sourceTypeParam.Value)
}

if sourceParam := rcfg.Input.Configuration.Stanza.Params.Get("source"); sourceParam != nil {
oc.Attributes["source"] = helper.ExprStringConfig(sourceParam.Value)
}

oc.Encoding = "nop"

return operator.NewConfig(oc)
}

func renameMetadata() []operator.Config {
source := move.NewConfigWithID("end-source")
source.From = entry.NewAttributeField("source")
source.To = entry.NewAttributeField("com.splunk.source")
source.OnError = "send_quiet"
source.OutputIDs = []string{"end-sourcetype"}

sourceType := move.NewConfigWithID("end-sourcetype")
sourceType.From = entry.NewAttributeField("sourcetype")
sourceType.To = entry.NewAttributeField("com.splunk.sourcetype")
sourceType.OnError = "send_quiet"
sourceType.OutputIDs = []string{"end-host"}

host := move.NewConfigWithID("end-host")
host.From = entry.NewAttributeField("host")
host.To = entry.NewAttributeField("com.splunk.host")
host.OnError = "send_quiet"
sourceType.OutputIDs = []string{"end-index"}

index := move.NewConfigWithID("end-index")
index.From = entry.NewAttributeField("index")
index.To = entry.NewAttributeField("com.splunk.index")
index.OnError = "send_quiet"

return []operator.Config{
operator.NewConfig(source),
operator.NewConfig(sourceType),
operator.NewConfig(host),
operator.NewConfig(index),
}
}