From 01e22f08081b9b02a265f8ece80e1410894336c6 Mon Sep 17 00:00:00 2001 From: Dmitry Romanov Date: Mon, 21 Jul 2025 18:22:03 +0700 Subject: [PATCH 01/19] add router && set dead queue --- fd/file.d.go | 21 +++++++++++++++++++++ pipeline/pipeline.go | 29 ++++++++++++++++------------- pipeline/plugin.go | 1 + pipeline/processor.go | 10 +++++----- pipeline/router.go | 32 ++++++++++++++++++++++++++++++++ 5 files changed, 75 insertions(+), 18 deletions(-) create mode 100644 pipeline/router.go diff --git a/fd/file.d.go b/fd/file.d.go index 055083883..abe6e1879 100644 --- a/fd/file.d.go +++ b/fd/file.d.go @@ -222,6 +222,11 @@ func (f *FileD) setupOutput(p *pipeline.Pipeline, pipelineConfig *cfg.PipelineCo PluginRuntimeInfo: f.instantiatePlugin(info), }) + p.SetDeadQueueOutput(&pipeline.OutputPluginInfo{ + PluginStaticInfo: info.DeadQueueInfo, + PluginRuntimeInfo: f.instantiatePlugin(info.DeadQueueInfo), + }) + return nil } @@ -249,6 +254,19 @@ func (f *FileD) getStaticInfo(pipelineConfig *cfg.PipelineConfig, pluginKind pip if err != nil { return nil, err } + deadqueue := configJSON.Get("deadqueue") + var deadqueueInfo *pipeline.PluginStaticInfo + if deadqueue.MustMap() != nil { + deadqueueType := deadqueue.Get("type").MustString() + if deadqueueType == "" { + return nil, fmt.Errorf("%s doesn't have type", pluginKind) + } + deadqueueInfo, err = f.plugins.Get(pluginKind, deadqueueType) + if err != nil { + return nil, err + } + configJSON.Del("deadqueue") + } configJson, err := configJSON.Encode() if err != nil { logger.Panicf("can't create config json for %s", t) @@ -260,6 +278,9 @@ func (f *FileD) getStaticInfo(pipelineConfig *cfg.PipelineConfig, pluginKind pip infoCopy := *info infoCopy.Config = config + if deadqueueInfo != nil { + infoCopy.DeadQueueInfo = deadqueueInfo + } return &infoCopy, nil } diff --git a/pipeline/pipeline.go b/pipeline/pipeline.go index 50d896f4f..34d8e75c4 100644 --- a/pipeline/pipeline.go +++ b/pipeline/pipeline.go @@ -113,8 +113,7 @@ type Pipeline struct { procCount *atomic.Int32 activeProcs *atomic.Int32 - output OutputPlugin - outputInfo *OutputPluginInfo + router *router metricHolder *metric.Holder @@ -199,6 +198,7 @@ func New(name string, settings *Settings, registry *prometheus.Registry, lg *zap PipelineSettings: settings, MetricCtl: metricCtl, }, + router: newRouter(), actionMetrics: actionMetrics{ m: make(map[string]*actionMetric), mu: new(sync.RWMutex), @@ -301,7 +301,7 @@ func (p *Pipeline) SetupHTTPHandlers(mux *http.ServeMux) { if p.input == nil { p.logger.Panic("input isn't set") } - if p.output == nil { + if p.router.output == nil { p.logger.Panic("output isn't set") } @@ -321,7 +321,7 @@ func (p *Pipeline) SetupHTTPHandlers(mux *http.ServeMux) { } } - for hName, handler := range p.outputInfo.PluginStaticInfo.Endpoints { + for hName, handler := range p.router.outputInfo.PluginStaticInfo.Endpoints { mux.HandleFunc(fmt.Sprintf("%s/%d/%s", prefix, len(p.actionInfos)+1, hName), handler) } } @@ -330,7 +330,7 @@ func (p *Pipeline) Start() { if p.input == nil { p.logger.Panic("input isn't set") } - if p.output == nil { + if p.router.output == nil { p.logger.Panic("output isn't set") } @@ -339,11 +339,11 @@ func (p *Pipeline) Start() { outputParams := &OutputPluginParams{ PluginDefaultParams: p.actionParams, Controller: p, - Logger: p.logger.Sugar().Named("output").Named(p.outputInfo.Type), + Logger: p.logger.Sugar().Named("output").Named(p.router.outputInfo.Type), } - p.logger.Info("starting output plugin", zap.String("name", p.outputInfo.Type)) + p.logger.Info("starting output plugin", zap.String("name", p.router.outputInfo.Type)) - p.output.Start(p.outputInfo.Config, outputParams) + p.router.Start(p.router.outputInfo.Config, outputParams) p.logger.Info("stating processors", zap.Int("count", len(p.Procs))) for _, processor := range p.Procs { @@ -382,7 +382,7 @@ func (p *Pipeline) Stop() { p.input.Stop() p.logger.Info("stopping output") - p.output.Stop() + p.router.Stop() p.shouldStop.Store(true) @@ -399,12 +399,15 @@ func (p *Pipeline) GetInput() InputPlugin { } func (p *Pipeline) SetOutput(info *OutputPluginInfo) { - p.outputInfo = info - p.output = info.Plugin.(OutputPlugin) + p.router.SetOutput(info) +} + +func (p *Pipeline) SetDeadQueueOutput(info *OutputPluginInfo) { + p.router.SetDeadQueueOutput(info) } func (p *Pipeline) GetOutput() OutputPlugin { - return p.output + return p.router.output } // In decodes message and passes it to event stream. @@ -758,7 +761,7 @@ func (p *Pipeline) newProc(id int) *processor { id, &p.actionMetrics, p.activeProcs, - p.output, + p.router, p.streamer, p.finalize, p.IncMaxEventSizeExceeded, diff --git a/pipeline/plugin.go b/pipeline/plugin.go index 3e0a7c749..98390a08f 100644 --- a/pipeline/plugin.go +++ b/pipeline/plugin.go @@ -77,6 +77,7 @@ type PluginStaticInfo struct { // Every plugin can provide their own API through Endpoints. Endpoints map[string]func(http.ResponseWriter, *http.Request) AdditionalActions []string // used only for input plugins, defines actions that should be run right after input plugin with input config + DeadQueueInfo *PluginStaticInfo } type PluginRuntimeInfo struct { diff --git a/pipeline/processor.go b/pipeline/processor.go index cf02c51c8..f575574ab 100644 --- a/pipeline/processor.go +++ b/pipeline/processor.go @@ -55,7 +55,7 @@ func allEventStatuses() []eventStatus { type processor struct { id int streamer *streamer - output OutputPlugin + router router finalize finalizeFn activeCounter *atomic.Int32 @@ -78,7 +78,7 @@ func newProcessor( id int, actionMetrics *actionMetrics, activeCounter *atomic.Int32, - output OutputPlugin, + router *router, streamer *streamer, finalizeFn finalizeFn, incMaxEventSizeExceededFn func(lvs ...string), @@ -88,7 +88,7 @@ func newProcessor( id: id, streamer: streamer, actionMetrics: actionMetrics, - output: output, + router: *router, finalize: finalizeFn, activeCounter: activeCounter, @@ -153,7 +153,7 @@ func (p *processor) processSequence(event *Event) bool { } event.stage = eventStageOutput - p.output.Out(event) + p.router.Out(event) } return true @@ -447,7 +447,7 @@ func (p *processor) Spawn(parent *Event, nodes []*insaneJSON.Node) { ok, _ := p.doActions(child) if ok { child.stage = eventStageOutput - p.output.Out(child) + p.router.Out(child) } } diff --git a/pipeline/router.go b/pipeline/router.go new file mode 100644 index 000000000..308f3203a --- /dev/null +++ b/pipeline/router.go @@ -0,0 +1,32 @@ +package pipeline + +type router struct { + output OutputPlugin + deadQueue OutputPlugin + outputInfo *OutputPluginInfo +} + +func newRouter() *router { + return &router{} +} + +func (r *router) SetOutput(info *OutputPluginInfo) { + r.outputInfo = info + r.output = info.Plugin.(OutputPlugin) +} + +func (r *router) SetDeadQueueOutput(info *OutputPluginInfo) { + r.deadQueue = info.Plugin.(OutputPlugin) +} + +func (p *router) Out(event *Event) { + p.output.Out(event) +} + +func (p *router) Stop() { + p.output.Stop() +} + +func (p *router) Start(config AnyConfig, params *OutputPluginParams) { + p.output.Start(config, params) +} From e35f0d291d679f726f91ebedafb2fe885705f575 Mon Sep 17 00:00:00 2001 From: Dmitry Romanov Date: Mon, 21 Jul 2025 18:57:59 +0700 Subject: [PATCH 02/19] ack event in router --- pipeline/pipeline.go | 6 +++--- pipeline/plugin.go | 1 + pipeline/processor.go | 4 ++-- pipeline/router.go | 37 +++++++++++++++++++++----------- plugin/output/devnull/devnull.go | 2 ++ plugin/output/stdout/stdout.go | 3 +++ 6 files changed, 36 insertions(+), 17 deletions(-) diff --git a/pipeline/pipeline.go b/pipeline/pipeline.go index 34d8e75c4..b721d4669 100644 --- a/pipeline/pipeline.go +++ b/pipeline/pipeline.go @@ -113,7 +113,7 @@ type Pipeline struct { procCount *atomic.Int32 activeProcs *atomic.Int32 - router *router + router *Router metricHolder *metric.Holder @@ -198,7 +198,7 @@ func New(name string, settings *Settings, registry *prometheus.Registry, lg *zap PipelineSettings: settings, MetricCtl: metricCtl, }, - router: newRouter(), + router: NewRouter(), actionMetrics: actionMetrics{ m: make(map[string]*actionMetric), mu: new(sync.RWMutex), @@ -343,7 +343,7 @@ func (p *Pipeline) Start() { } p.logger.Info("starting output plugin", zap.String("name", p.router.outputInfo.Type)) - p.router.Start(p.router.outputInfo.Config, outputParams) + p.router.Start(outputParams) p.logger.Info("stating processors", zap.Int("count", len(p.Procs))) for _, processor := range p.Procs { diff --git a/pipeline/plugin.go b/pipeline/plugin.go index 98390a08f..51c585a54 100644 --- a/pipeline/plugin.go +++ b/pipeline/plugin.go @@ -59,6 +59,7 @@ type ActionPluginParams struct { type OutputPluginParams struct { PluginDefaultParams Controller OutputPluginController + Router Router Logger *zap.SugaredLogger } diff --git a/pipeline/processor.go b/pipeline/processor.go index f575574ab..d5a0d745a 100644 --- a/pipeline/processor.go +++ b/pipeline/processor.go @@ -55,7 +55,7 @@ func allEventStatuses() []eventStatus { type processor struct { id int streamer *streamer - router router + router Router finalize finalizeFn activeCounter *atomic.Int32 @@ -78,7 +78,7 @@ func newProcessor( id int, actionMetrics *actionMetrics, activeCounter *atomic.Int32, - router *router, + router *Router, streamer *streamer, finalizeFn finalizeFn, incMaxEventSizeExceededFn func(lvs ...string), diff --git a/pipeline/router.go b/pipeline/router.go index 308f3203a..e1988fb71 100644 --- a/pipeline/router.go +++ b/pipeline/router.go @@ -1,32 +1,45 @@ package pipeline -type router struct { +type Router struct { output OutputPlugin - deadQueue OutputPlugin outputInfo *OutputPluginInfo + + deadQueue OutputPlugin + deadQueueInfo *OutputPluginInfo } -func newRouter() *router { - return &router{} +func NewRouter() *Router { + return &Router{} } -func (r *router) SetOutput(info *OutputPluginInfo) { +func (r *Router) SetOutput(info *OutputPluginInfo) { r.outputInfo = info r.output = info.Plugin.(OutputPlugin) } -func (r *router) SetDeadQueueOutput(info *OutputPluginInfo) { +func (r *Router) SetDeadQueueOutput(info *OutputPluginInfo) { + r.deadQueueInfo = info r.deadQueue = info.Plugin.(OutputPlugin) } -func (p *router) Out(event *Event) { - p.output.Out(event) +func (r *Router) Ack(event *Event) { + +} + +func (r *Router) Fail(event *Event) { + r.deadQueue.Out(event) +} + +func (r *Router) Out(event *Event) { + r.output.Out(event) } -func (p *router) Stop() { - p.output.Stop() +func (r *Router) Stop() { + r.output.Stop() } -func (p *router) Start(config AnyConfig, params *OutputPluginParams) { - p.output.Start(config, params) +func (r *Router) Start(params *OutputPluginParams) { + params.Router = *r + r.output.Start(r.outputInfo.Config, params) + r.deadQueue.Start(r.deadQueueInfo.Config, params) } diff --git a/plugin/output/devnull/devnull.go b/plugin/output/devnull/devnull.go index 09522be46..50d3d28d6 100644 --- a/plugin/output/devnull/devnull.go +++ b/plugin/output/devnull/devnull.go @@ -12,6 +12,7 @@ It provides an API to test pipelines and other plugins. type Plugin struct { controller pipeline.OutputPluginController + router pipeline.Router outFn func(event *pipeline.Event) total *atomic.Int64 } @@ -31,6 +32,7 @@ func Factory() (pipeline.AnyPlugin, pipeline.AnyConfig) { func (p *Plugin) Start(_ pipeline.AnyConfig, params *pipeline.OutputPluginParams) { p.controller = params.Controller + p.router = params.Router p.total = &atomic.Int64{} } diff --git a/plugin/output/stdout/stdout.go b/plugin/output/stdout/stdout.go index 98fa2b60b..2e48fb125 100644 --- a/plugin/output/stdout/stdout.go +++ b/plugin/output/stdout/stdout.go @@ -13,6 +13,7 @@ It writes events to stdout(also known as console). type Plugin struct { controller pipeline.OutputPluginController + router pipeline.Router } type Config struct{} @@ -30,6 +31,7 @@ func Factory() (pipeline.AnyPlugin, pipeline.AnyConfig) { func (p *Plugin) Start(_ pipeline.AnyConfig, params *pipeline.OutputPluginParams) { p.controller = params.Controller + p.router = params.Router } func (_ *Plugin) Stop() {} @@ -38,4 +40,5 @@ func (p *Plugin) Out(event *pipeline.Event) { // nolint: forbidigo fmt.Println(event.Root.EncodeToString()) p.controller.Commit(event) + p.router.Ack(event) } From 06bf840dc1fe56b55baee98b873cd0f0c4ad62c4 Mon Sep 17 00:00:00 2001 From: Dmitry Romanov Date: Tue, 22 Jul 2025 14:57:48 +0700 Subject: [PATCH 03/19] pass events on backoff error --- pipeline/backoff.go | 6 +++--- pipeline/router.go | 4 ++++ plugin/output/clickhouse/clickhouse.go | 2 +- plugin/output/elasticsearch/elasticsearch.go | 2 +- plugin/output/gelf/gelf.go | 2 +- plugin/output/kafka/kafka.go | 10 ++++++++-- plugin/output/loki/loki.go | 2 +- plugin/output/postgres/postgres.go | 2 +- plugin/output/splunk/splunk.go | 2 +- 9 files changed, 21 insertions(+), 11 deletions(-) diff --git a/pipeline/backoff.go b/pipeline/backoff.go index 06ca8db89..347599b81 100644 --- a/pipeline/backoff.go +++ b/pipeline/backoff.go @@ -11,7 +11,7 @@ type RetriableBatcher struct { outFn RetriableBatcherOutFn batcher *Batcher backoffOpts BackoffOpts - onRetryError func(err error) + onRetryError func(err error, events []*Event) } type RetriableBatcherOutFn func(*WorkerData, *Batch) error @@ -22,7 +22,7 @@ type BackoffOpts struct { AttemptNum int } -func NewRetriableBatcher(batcherOpts *BatcherOptions, batcherOutFn RetriableBatcherOutFn, opts BackoffOpts, onError func(err error)) *RetriableBatcher { +func NewRetriableBatcher(batcherOpts *BatcherOptions, batcherOutFn RetriableBatcherOutFn, opts BackoffOpts, onError func(err error, events []*Event)) *RetriableBatcher { batcherBackoff := &RetriableBatcher{ outFn: batcherOutFn, backoffOpts: opts, @@ -58,7 +58,7 @@ func (b *RetriableBatcher) Out(data *WorkerData, batch *Batch) { } next := exponentionalBackoff.NextBackOff() if next == backoff.Stop || (b.backoffOpts.AttemptNum >= 0 && numTries > b.backoffOpts.AttemptNum) { - b.onRetryError(err) + b.onRetryError(err, batch.events) return } numTries++ diff --git a/pipeline/router.go b/pipeline/router.go index e1988fb71..c58c4d1f7 100644 --- a/pipeline/router.go +++ b/pipeline/router.go @@ -38,6 +38,10 @@ func (r *Router) Stop() { r.output.Stop() } +func (r *Router) DeadQueueIsAvailable() bool { + return r.deadQueue != nil +} + func (r *Router) Start(params *OutputPluginParams) { params.Router = *r r.output.Start(r.outputInfo.Config, params) diff --git a/plugin/output/clickhouse/clickhouse.go b/plugin/output/clickhouse/clickhouse.go index 04e69ef32..49753cd60 100644 --- a/plugin/output/clickhouse/clickhouse.go +++ b/plugin/output/clickhouse/clickhouse.go @@ -443,7 +443,7 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP AttemptNum: p.config.Retry, } - onError := func(err error) { + onError := func(err error, _ []*pipeline.Event) { var level zapcore.Level if p.config.FatalOnFailedInsert { level = zapcore.FatalLevel diff --git a/plugin/output/elasticsearch/elasticsearch.go b/plugin/output/elasticsearch/elasticsearch.go index c96fea372..15b39e45b 100644 --- a/plugin/output/elasticsearch/elasticsearch.go +++ b/plugin/output/elasticsearch/elasticsearch.go @@ -262,7 +262,7 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP AttemptNum: p.config.Retry, } - onError := func(err error) { + onError := func(err error, _ []*pipeline.Event) { var level zapcore.Level if p.config.FatalOnFailedInsert { level = zapcore.FatalLevel diff --git a/plugin/output/gelf/gelf.go b/plugin/output/gelf/gelf.go index 1b01226c7..b87607258 100644 --- a/plugin/output/gelf/gelf.go +++ b/plugin/output/gelf/gelf.go @@ -236,7 +236,7 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP AttemptNum: p.config.Retry, } - onError := func(err error) { + onError := func(err error, _ []*pipeline.Event) { var level zapcore.Level if p.config.FatalOnFailedInsert { level = zapcore.FatalLevel diff --git a/plugin/output/kafka/kafka.go b/plugin/output/kafka/kafka.go index e82b0a76e..7f9d8022e 100644 --- a/plugin/output/kafka/kafka.go +++ b/plugin/output/kafka/kafka.go @@ -40,6 +40,7 @@ type Plugin struct { // plugin metrics sendErrorMetric prometheus.Counter + router pipeline.Router } // ! config-params @@ -263,9 +264,9 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP AttemptNum: p.config.Retry, } - onError := func(err error) { + onError := func(err error, events []*pipeline.Event) { var level zapcore.Level - if p.config.FatalOnFailedInsert { + if p.config.FatalOnFailedInsert && !p.router.DeadQueueIsAvailable() { level = zapcore.FatalLevel } else { level = zapcore.ErrorLevel @@ -274,6 +275,10 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP p.logger.Desugar().Log(level, "can't write batch", zap.Int("retries", p.config.Retry), ) + + for i := range events { + p.router.Fail(events[i]) + } } p.batcher = pipeline.NewRetriableBatcher( @@ -284,6 +289,7 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP ) p.batcher.Start(context.TODO()) + p.router = params.Router } func (p *Plugin) Out(event *pipeline.Event) { diff --git a/plugin/output/loki/loki.go b/plugin/output/loki/loki.go index 8353ee7eb..aaeff4e4c 100644 --- a/plugin/output/loki/loki.go +++ b/plugin/output/loki/loki.go @@ -276,7 +276,7 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP AttemptNum: p.config.Retry, } - onError := func(err error) { + onError := func(err error, _ []*pipeline.Event) { var level zapcore.Level if p.config.FatalOnFailedInsert { level = zapcore.FatalLevel diff --git a/plugin/output/postgres/postgres.go b/plugin/output/postgres/postgres.go index 0126fc43f..5a822421a 100644 --- a/plugin/output/postgres/postgres.go +++ b/plugin/output/postgres/postgres.go @@ -292,7 +292,7 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP AttemptNum: p.config.Retry, } - onError := func(err error) { + onError := func(err error, _ []*pipeline.Event) { var level zapcore.Level if p.config.FatalOnFailedInsert { level = zapcore.FatalLevel diff --git a/plugin/output/splunk/splunk.go b/plugin/output/splunk/splunk.go index 84d1f005b..5657b55f6 100644 --- a/plugin/output/splunk/splunk.go +++ b/plugin/output/splunk/splunk.go @@ -269,7 +269,7 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP AttemptNum: p.config.Retry, } - onError := func(err error) { + onError := func(err error, _ []*pipeline.Event) { var level zapcore.Level if p.config.FatalOnFailedInsert { level = zapcore.FatalLevel From d767920bc2bb4e787904d6f6e5ad9bd27c8d0ddb Mon Sep 17 00:00:00 2001 From: Dmitry Romanov Date: Wed, 23 Jul 2025 17:54:47 +0700 Subject: [PATCH 04/19] reset batch --- fd/file.d.go | 15 ++++++--- pipeline/backoff.go | 33 ++++++++++++------ pipeline/backoff_test.go | 4 +-- pipeline/batch.go | 35 +++++++++++--------- pipeline/pipeline.go | 1 + pipeline/plugin.go | 3 +- pipeline/router.go | 11 ++++-- plugin/output/clickhouse/clickhouse.go | 18 +++++++--- plugin/output/elasticsearch/elasticsearch.go | 18 +++++++--- plugin/output/gelf/gelf.go | 11 ++++-- plugin/output/kafka/kafka.go | 9 ++--- plugin/output/loki/loki.go | 18 +++++++--- plugin/output/postgres/postgres.go | 18 +++++++--- plugin/output/splunk/splunk.go | 18 +++++++--- plugin/output/stdout/stdout.go | 1 - 15 files changed, 146 insertions(+), 67 deletions(-) diff --git a/fd/file.d.go b/fd/file.d.go index abe6e1879..d8c9f961c 100644 --- a/fd/file.d.go +++ b/fd/file.d.go @@ -222,10 +222,12 @@ func (f *FileD) setupOutput(p *pipeline.Pipeline, pipelineConfig *cfg.PipelineCo PluginRuntimeInfo: f.instantiatePlugin(info), }) - p.SetDeadQueueOutput(&pipeline.OutputPluginInfo{ - PluginStaticInfo: info.DeadQueueInfo, - PluginRuntimeInfo: f.instantiatePlugin(info.DeadQueueInfo), - }) + if info.DeadQueueInfo != nil { + p.SetDeadQueueOutput(&pipeline.OutputPluginInfo{ + PluginStaticInfo: info.DeadQueueInfo, + PluginRuntimeInfo: f.instantiatePlugin(info.DeadQueueInfo), + }) + } return nil } @@ -254,6 +256,7 @@ func (f *FileD) getStaticInfo(pipelineConfig *cfg.PipelineConfig, pluginKind pip if err != nil { return nil, err } + deadqueue := configJSON.Get("deadqueue") var deadqueueInfo *pipeline.PluginStaticInfo if deadqueue.MustMap() != nil { @@ -265,8 +268,12 @@ func (f *FileD) getStaticInfo(pipelineConfig *cfg.PipelineConfig, pluginKind pip if err != nil { return nil, err } + + // TODO: recursive deadqueue config + // deadqueueForDeadqueue := deadqueue.Get("deadqueue").MustMap() configJSON.Del("deadqueue") } + configJson, err := configJSON.Encode() if err != nil { logger.Panicf("can't create config json for %s", t) diff --git a/pipeline/backoff.go b/pipeline/backoff.go index 347599b81..6a843863b 100644 --- a/pipeline/backoff.go +++ b/pipeline/backoff.go @@ -8,25 +8,28 @@ import ( ) type RetriableBatcher struct { - outFn RetriableBatcherOutFn - batcher *Batcher - backoffOpts BackoffOpts - onRetryError func(err error, events []*Event) + outFn RetriableBatcherOutFn + batcher *Batcher + backoffOpts BackoffOpts + DeadQueueIsAvailable bool + onRetryError func(err error, events []*Event) } type RetriableBatcherOutFn func(*WorkerData, *Batch) error type BackoffOpts struct { - MinRetention time.Duration - Multiplier float64 - AttemptNum int + MinRetention time.Duration + Multiplier float64 + AttemptNum int + DeadQueueIsAvailable bool } func NewRetriableBatcher(batcherOpts *BatcherOptions, batcherOutFn RetriableBatcherOutFn, opts BackoffOpts, onError func(err error, events []*Event)) *RetriableBatcher { batcherBackoff := &RetriableBatcher{ - outFn: batcherOutFn, - backoffOpts: opts, - onRetryError: onError, + outFn: batcherOutFn, + backoffOpts: opts, + onRetryError: onError, + DeadQueueIsAvailable: opts.DeadQueueIsAvailable, } batcherBackoff.setBatcher(batcherOpts) return batcherBackoff @@ -58,7 +61,15 @@ func (b *RetriableBatcher) Out(data *WorkerData, batch *Batch) { } next := exponentionalBackoff.NextBackOff() if next == backoff.Stop || (b.backoffOpts.AttemptNum >= 0 && numTries > b.backoffOpts.AttemptNum) { - b.onRetryError(err, batch.events) + var events []*Event + if batch != nil { + events = batch.events + } + b.onRetryError(err, events) + if batch != nil && b.DeadQueueIsAvailable { + batch.reset() + batch.status = BatchStatusInDeadQueue + } return } numTries++ diff --git a/pipeline/backoff_test.go b/pipeline/backoff_test.go index 1d73a2bf1..16a25a76b 100644 --- a/pipeline/backoff_test.go +++ b/pipeline/backoff_test.go @@ -17,7 +17,7 @@ func TestBackoff(t *testing.T) { eventCount := &atomic.Int32{} eventCountBefore := eventCount.Load() - errorFn := func(err error) { + errorFn := func(err error, events []*Event) { errorCount.Inc() } @@ -42,7 +42,7 @@ func TestBackoff(t *testing.T) { func TestBackoffWithError(t *testing.T) { errorCount := &atomic.Int32{} prevValue := errorCount.Load() - errorFn := func(err error) { + errorFn := func(err error, events []*Event) { errorCount.Inc() } diff --git a/pipeline/batch.go b/pipeline/batch.go index 22ee3bcfa..03af1fe01 100644 --- a/pipeline/batch.go +++ b/pipeline/batch.go @@ -16,6 +16,7 @@ const ( BatchStatusNotReady BatchStatus = iota BatchStatusMaxSizeExceeded BatchStatusTimeoutExceeded + BatchStatusInDeadQueue ) type Batch struct { @@ -125,11 +126,12 @@ type Batcher struct { outSeq int64 commitSeq int64 - batchOutFnSeconds prometheus.Observer - commitWaitingSeconds prometheus.Observer - workersInProgress prometheus.Gauge - batchesDoneByMaxSize prometheus.Counter - batchesDoneByTimeout prometheus.Counter + batchOutFnSeconds prometheus.Observer + commitWaitingSeconds prometheus.Observer + workersInProgress prometheus.Gauge + batchesDoneByMaxSize prometheus.Counter + batchesDoneByTimeout prometheus.Counter + batchesRoutedToDeadQueue prometheus.Counter } type ( @@ -163,16 +165,17 @@ func NewBatcher(opts BatcherOptions) *Batcher { // nolint: gocritic // hugeParam seqMu := &sync.Mutex{} return &Batcher{ - seqMu: seqMu, - cond: sync.NewCond(seqMu), - freeBatches: freeBatches, - fullBatches: fullBatches, - opts: opts, - batchOutFnSeconds: ctl.RegisterHistogram("batcher_out_fn_seconds", "", metric.SecondsBucketsLong), - commitWaitingSeconds: ctl.RegisterHistogram("batcher_commit_waiting_seconds", "", metric.SecondsBucketsDetailed), - workersInProgress: ctl.RegisterGauge("batcher_workers_in_progress", ""), - batchesDoneByMaxSize: jobsDone.WithLabelValues("max_size_exceeded"), - batchesDoneByTimeout: jobsDone.WithLabelValues("timeout_exceeded"), + seqMu: seqMu, + cond: sync.NewCond(seqMu), + freeBatches: freeBatches, + fullBatches: fullBatches, + opts: opts, + batchOutFnSeconds: ctl.RegisterHistogram("batcher_out_fn_seconds", "", metric.SecondsBucketsLong), + commitWaitingSeconds: ctl.RegisterHistogram("batcher_commit_waiting_seconds", "", metric.SecondsBucketsDetailed), + workersInProgress: ctl.RegisterGauge("batcher_workers_in_progress", ""), + batchesDoneByMaxSize: jobsDone.WithLabelValues("max_size_exceeded"), + batchesDoneByTimeout: jobsDone.WithLabelValues("timeout_exceeded"), + batchesRoutedToDeadQueue: jobsDone.WithLabelValues("routed_in_deadqueue"), } } @@ -220,6 +223,8 @@ func (b *Batcher) work() { b.batchesDoneByMaxSize.Inc() case BatchStatusTimeoutExceeded: b.batchesDoneByTimeout.Inc() + case BatchStatusInDeadQueue: + b.batchesRoutedToDeadQueue.Inc() default: logger.Panic("unreachable") } diff --git a/pipeline/pipeline.go b/pipeline/pipeline.go index b721d4669..261e5bc57 100644 --- a/pipeline/pipeline.go +++ b/pipeline/pipeline.go @@ -627,6 +627,7 @@ func (p *Pipeline) streamEvent(event *Event) uint64 { } func (p *Pipeline) Commit(event *Event) { + p.router.Ack(event) p.finalize(event, true, true) } diff --git a/pipeline/plugin.go b/pipeline/plugin.go index 51c585a54..93a4ebb6d 100644 --- a/pipeline/plugin.go +++ b/pipeline/plugin.go @@ -78,7 +78,8 @@ type PluginStaticInfo struct { // Every plugin can provide their own API through Endpoints. Endpoints map[string]func(http.ResponseWriter, *http.Request) AdditionalActions []string // used only for input plugins, defines actions that should be run right after input plugin with input config - DeadQueueInfo *PluginStaticInfo + // TODO: maybe to OutputPluginStaticInfo cause uses by output and action plugins? + DeadQueueInfo *PluginStaticInfo } type PluginRuntimeInfo struct { diff --git a/pipeline/router.go b/pipeline/router.go index c58c4d1f7..bbfbb9907 100644 --- a/pipeline/router.go +++ b/pipeline/router.go @@ -27,7 +27,9 @@ func (r *Router) Ack(event *Event) { } func (r *Router) Fail(event *Event) { - r.deadQueue.Out(event) + if r.DeadQueueIsAvailable() { + r.deadQueue.Out(event) + } } func (r *Router) Out(event *Event) { @@ -36,6 +38,9 @@ func (r *Router) Out(event *Event) { func (r *Router) Stop() { r.output.Stop() + if r.DeadQueueIsAvailable() { + r.deadQueue.Stop() + } } func (r *Router) DeadQueueIsAvailable() bool { @@ -45,5 +50,7 @@ func (r *Router) DeadQueueIsAvailable() bool { func (r *Router) Start(params *OutputPluginParams) { params.Router = *r r.output.Start(r.outputInfo.Config, params) - r.deadQueue.Start(r.deadQueueInfo.Config, params) + if r.DeadQueueIsAvailable() { + r.deadQueue.Start(r.deadQueueInfo.Config, params) + } } diff --git a/plugin/output/clickhouse/clickhouse.go b/plugin/output/clickhouse/clickhouse.go index 49753cd60..f65c3d342 100644 --- a/plugin/output/clickhouse/clickhouse.go +++ b/plugin/output/clickhouse/clickhouse.go @@ -58,6 +58,8 @@ type Plugin struct { // plugin metrics insertErrorsMetric prometheus.Counter queriesCountMetric prometheus.Counter + + router pipeline.Router } type Setting struct { @@ -437,15 +439,17 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP MetricCtl: params.MetricCtl, } + p.router = params.Router backoffOpts := pipeline.BackoffOpts{ - MinRetention: p.config.Retention_, - Multiplier: float64(p.config.RetentionExponentMultiplier), - AttemptNum: p.config.Retry, + MinRetention: p.config.Retention_, + Multiplier: float64(p.config.RetentionExponentMultiplier), + AttemptNum: p.config.Retry, + DeadQueueIsAvailable: p.router.DeadQueueIsAvailable(), } - onError := func(err error, _ []*pipeline.Event) { + onError := func(err error, events []*pipeline.Event) { var level zapcore.Level - if p.config.FatalOnFailedInsert { + if p.config.FatalOnFailedInsert && !p.router.DeadQueueIsAvailable() { level = zapcore.FatalLevel } else { level = zapcore.ErrorLevel @@ -454,6 +458,10 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP p.logger.Log(level, "can't insert to the table", zap.Error(err), zap.Int("retries", p.config.Retry), zap.String("table", p.config.Table)) + + for i := range events { + p.router.Fail(events[i]) + } } p.batcher = pipeline.NewRetriableBatcher( diff --git a/plugin/output/elasticsearch/elasticsearch.go b/plugin/output/elasticsearch/elasticsearch.go index 15b39e45b..cf91ef8af 100644 --- a/plugin/output/elasticsearch/elasticsearch.go +++ b/plugin/output/elasticsearch/elasticsearch.go @@ -50,6 +50,8 @@ type Plugin struct { // plugin metrics sendErrorMetric *prometheus.CounterVec indexingErrorsMetric prometheus.Counter + + router pipeline.Router } // ! config-params @@ -256,15 +258,17 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP MetricCtl: params.MetricCtl, } + p.router = params.Router backoffOpts := pipeline.BackoffOpts{ - MinRetention: p.config.Retention_, - Multiplier: float64(p.config.RetentionExponentMultiplier), - AttemptNum: p.config.Retry, + MinRetention: p.config.Retention_, + Multiplier: float64(p.config.RetentionExponentMultiplier), + AttemptNum: p.config.Retry, + DeadQueueIsAvailable: p.router.DeadQueueIsAvailable(), } - onError := func(err error, _ []*pipeline.Event) { + onError := func(err error, events []*pipeline.Event) { var level zapcore.Level - if p.config.FatalOnFailedInsert { + if p.config.FatalOnFailedInsert && !p.router.DeadQueueIsAvailable() { level = zapcore.FatalLevel } else { level = zapcore.ErrorLevel @@ -273,6 +277,10 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP p.logger.Log(level, "can't send to the elastic", zap.Error(err), zap.Int("retries", p.config.Retry), ) + + for i := range events { + p.router.Fail(events[i]) + } } p.batcher = pipeline.NewRetriableBatcher( diff --git a/plugin/output/gelf/gelf.go b/plugin/output/gelf/gelf.go index b87607258..149a0d4ad 100644 --- a/plugin/output/gelf/gelf.go +++ b/plugin/output/gelf/gelf.go @@ -48,6 +48,8 @@ type Plugin struct { // plugin metrics sendErrorMetric prometheus.Counter + + router pipeline.Router } // ! config-params @@ -230,15 +232,16 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP MetricCtl: params.MetricCtl, } + p.router = params.Router backoffOpts := pipeline.BackoffOpts{ MinRetention: p.config.Retention_, Multiplier: float64(p.config.RetentionExponentMultiplier), AttemptNum: p.config.Retry, } - onError := func(err error, _ []*pipeline.Event) { + onError := func(err error, events []*pipeline.Event) { var level zapcore.Level - if p.config.FatalOnFailedInsert { + if p.config.FatalOnFailedInsert && !p.router.DeadQueueIsAvailable() { level = zapcore.FatalLevel } else { level = zapcore.ErrorLevel @@ -247,6 +250,10 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP p.logger.Desugar().Log(level, "can't send to gelf", zap.Error(err), zap.Int("retries", p.config.Retry), ) + + for i := range events { + p.router.Fail(events[i]) + } } p.batcher = pipeline.NewRetriableBatcher( diff --git a/plugin/output/kafka/kafka.go b/plugin/output/kafka/kafka.go index 7f9d8022e..d1ebe7e6f 100644 --- a/plugin/output/kafka/kafka.go +++ b/plugin/output/kafka/kafka.go @@ -258,10 +258,12 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP MetricCtl: params.MetricCtl, } + p.router = params.Router backoffOpts := pipeline.BackoffOpts{ - MinRetention: p.config.Retention_, - Multiplier: float64(p.config.RetentionExponentMultiplier), - AttemptNum: p.config.Retry, + MinRetention: p.config.Retention_, + Multiplier: float64(p.config.RetentionExponentMultiplier), + AttemptNum: p.config.Retry, + DeadQueueIsAvailable: p.router.DeadQueueIsAvailable(), } onError := func(err error, events []*pipeline.Event) { @@ -289,7 +291,6 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP ) p.batcher.Start(context.TODO()) - p.router = params.Router } func (p *Plugin) Out(event *pipeline.Event) { diff --git a/plugin/output/loki/loki.go b/plugin/output/loki/loki.go index aaeff4e4c..d2216d362 100644 --- a/plugin/output/loki/loki.go +++ b/plugin/output/loki/loki.go @@ -235,6 +235,8 @@ type Plugin struct { sendErrorMetric *prometheus.CounterVec labels map[string]string + + router pipeline.Router } func init() { @@ -270,15 +272,17 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP MetricCtl: params.MetricCtl, } + p.router = params.Router backoffOpts := pipeline.BackoffOpts{ - MinRetention: p.config.Retention_, - Multiplier: float64(p.config.RetentionExponentMultiplier), - AttemptNum: p.config.Retry, + MinRetention: p.config.Retention_, + Multiplier: float64(p.config.RetentionExponentMultiplier), + AttemptNum: p.config.Retry, + DeadQueueIsAvailable: p.router.DeadQueueIsAvailable(), } - onError := func(err error, _ []*pipeline.Event) { + onError := func(err error, events []*pipeline.Event) { var level zapcore.Level - if p.config.FatalOnFailedInsert { + if p.config.FatalOnFailedInsert && !p.router.DeadQueueIsAvailable() { level = zapcore.FatalLevel } else { level = zapcore.ErrorLevel @@ -286,6 +290,10 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP p.logger.Log(level, "can't send data to loki", zap.Error(err), zap.Int("retries", p.config.Retry)) + + for i := range events { + p.router.Fail(events[i]) + } } p.batcher = pipeline.NewRetriableBatcher( diff --git a/plugin/output/postgres/postgres.go b/plugin/output/postgres/postgres.go index 5a822421a..28c583e5c 100644 --- a/plugin/output/postgres/postgres.go +++ b/plugin/output/postgres/postgres.go @@ -111,6 +111,8 @@ type Plugin struct { duplicatedEventMetric prometheus.Counter writtenEventMetric prometheus.Counter insertErrorsMetric prometheus.Counter + + router pipeline.Router } type ConfigColumn struct { @@ -286,15 +288,17 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP MetricCtl: params.MetricCtl, } + p.router = params.Router backoffOpts := pipeline.BackoffOpts{ - MinRetention: p.config.Retention_, - Multiplier: float64(p.config.RetentionExponentMultiplier), - AttemptNum: p.config.Retry, + MinRetention: p.config.Retention_, + Multiplier: float64(p.config.RetentionExponentMultiplier), + AttemptNum: p.config.Retry, + DeadQueueIsAvailable: p.router.DeadQueueIsAvailable(), } - onError := func(err error, _ []*pipeline.Event) { + onError := func(err error, events []*pipeline.Event) { var level zapcore.Level - if p.config.FatalOnFailedInsert { + if p.config.FatalOnFailedInsert && !p.router.DeadQueueIsAvailable() { level = zapcore.FatalLevel } else { level = zapcore.ErrorLevel @@ -303,6 +307,10 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP p.logger.Desugar().Log(level, "can't insert to the table", zap.Error(err), zap.Int("retries", p.config.Retry), zap.String("table", p.config.Table)) + + for i := range events { + p.router.Fail(events[i]) + } } p.batcher = pipeline.NewRetriableBatcher( diff --git a/plugin/output/splunk/splunk.go b/plugin/output/splunk/splunk.go index 5657b55f6..35506096d 100644 --- a/plugin/output/splunk/splunk.go +++ b/plugin/output/splunk/splunk.go @@ -96,6 +96,8 @@ type Plugin struct { // plugin metrics sendErrorMetric *prometheus.CounterVec + + router pipeline.Router } type CopyField struct { @@ -263,15 +265,17 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP MetricCtl: params.MetricCtl, } + p.router = params.Router backoffOpts := pipeline.BackoffOpts{ - MinRetention: p.config.Retention_, - Multiplier: float64(p.config.RetentionExponentMultiplier), - AttemptNum: p.config.Retry, + MinRetention: p.config.Retention_, + Multiplier: float64(p.config.RetentionExponentMultiplier), + AttemptNum: p.config.Retry, + DeadQueueIsAvailable: p.router.DeadQueueIsAvailable(), } - onError := func(err error, _ []*pipeline.Event) { + onError := func(err error, events []*pipeline.Event) { var level zapcore.Level - if p.config.FatalOnFailedInsert { + if p.config.FatalOnFailedInsert && !p.router.DeadQueueIsAvailable() { level = zapcore.FatalLevel } else { level = zapcore.ErrorLevel @@ -279,6 +283,10 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP p.logger.Desugar().Log(level, "can't send data to splunk", zap.Error(err), zap.Int("retries", p.config.Retry)) + + for i := range events { + p.router.Fail(events[i]) + } } p.batcher = pipeline.NewRetriableBatcher( diff --git a/plugin/output/stdout/stdout.go b/plugin/output/stdout/stdout.go index 2e48fb125..d4238f683 100644 --- a/plugin/output/stdout/stdout.go +++ b/plugin/output/stdout/stdout.go @@ -40,5 +40,4 @@ func (p *Plugin) Out(event *pipeline.Event) { // nolint: forbidigo fmt.Println(event.Root.EncodeToString()) p.controller.Commit(event) - p.router.Ack(event) } From ec706ea8d3231fab77e73242d8a23de690b99564 Mon Sep 17 00:00:00 2001 From: Dmitry Romanov Date: Tue, 29 Jul 2025 15:24:31 +0700 Subject: [PATCH 05/19] set deadqueue config --- fd/file.d.go | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/fd/file.d.go b/fd/file.d.go index d8c9f961c..c7f7830eb 100644 --- a/fd/file.d.go +++ b/fd/file.d.go @@ -269,6 +269,19 @@ func (f *FileD) getStaticInfo(pipelineConfig *cfg.PipelineConfig, pluginKind pip return nil, err } + deadqueue.Del("type") + + deadqueueConfigJson, err := deadqueue.Encode() + if err != nil { + logger.Panicf("can't create config json for %s", t) + } + + config, err := pipeline.GetConfig(deadqueueInfo, deadqueueConfigJson, values) + if err != nil { + logger.Fatalf("error on creating %s with type %q: %s", t, pluginKind, err.Error()) + } + deadqueueInfo.Config = config + // TODO: recursive deadqueue config // deadqueueForDeadqueue := deadqueue.Get("deadqueue").MustMap() configJSON.Del("deadqueue") From e8395a591a4850063b3d971ad56646e6f5d35299 Mon Sep 17 00:00:00 2001 From: Dmitry Romanov Date: Tue, 29 Jul 2025 15:48:05 +0700 Subject: [PATCH 06/19] deadqueue is empty --- fd/file.d.go | 50 ++++++++++++++++++++++++++------------------------ 1 file changed, 26 insertions(+), 24 deletions(-) diff --git a/fd/file.d.go b/fd/file.d.go index c7f7830eb..a52267f75 100644 --- a/fd/file.d.go +++ b/fd/file.d.go @@ -259,31 +259,33 @@ func (f *FileD) getStaticInfo(pipelineConfig *cfg.PipelineConfig, pluginKind pip deadqueue := configJSON.Get("deadqueue") var deadqueueInfo *pipeline.PluginStaticInfo - if deadqueue.MustMap() != nil { - deadqueueType := deadqueue.Get("type").MustString() - if deadqueueType == "" { - return nil, fmt.Errorf("%s doesn't have type", pluginKind) + if deadqueueMap := deadqueue.MustMap(); deadqueueMap != nil { + if len(deadqueueMap) > 0 { + deadqueueType := deadqueue.Get("type").MustString() + if deadqueueType == "" { + return nil, fmt.Errorf("%s doesn't have type", pluginKind) + } + deadqueueInfo, err = f.plugins.Get(pluginKind, deadqueueType) + if err != nil { + return nil, err + } + + deadqueue.Del("type") + + deadqueueConfigJson, err := deadqueue.Encode() + if err != nil { + logger.Panicf("can't create config json for %s", t) + } + + config, err := pipeline.GetConfig(deadqueueInfo, deadqueueConfigJson, values) + if err != nil { + logger.Fatalf("error on creating %s with type %q: %s", t, pluginKind, err.Error()) + } + deadqueueInfo.Config = config + + // TODO: recursive deadqueue config + // deadqueueForDeadqueue := deadqueue.Get("deadqueue").MustMap() } - deadqueueInfo, err = f.plugins.Get(pluginKind, deadqueueType) - if err != nil { - return nil, err - } - - deadqueue.Del("type") - - deadqueueConfigJson, err := deadqueue.Encode() - if err != nil { - logger.Panicf("can't create config json for %s", t) - } - - config, err := pipeline.GetConfig(deadqueueInfo, deadqueueConfigJson, values) - if err != nil { - logger.Fatalf("error on creating %s with type %q: %s", t, pluginKind, err.Error()) - } - deadqueueInfo.Config = config - - // TODO: recursive deadqueue config - // deadqueueForDeadqueue := deadqueue.Get("deadqueue").MustMap() configJSON.Del("deadqueue") } From 99f8c97dd5e913a3521823ef00138119cfe1dd11 Mon Sep 17 00:00:00 2001 From: Dmitry Romanov Date: Fri, 1 Aug 2025 13:50:21 +0700 Subject: [PATCH 07/19] add router test --- pipeline/router_test.go | 160 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 160 insertions(+) create mode 100644 pipeline/router_test.go diff --git a/pipeline/router_test.go b/pipeline/router_test.go new file mode 100644 index 000000000..4bc0ba569 --- /dev/null +++ b/pipeline/router_test.go @@ -0,0 +1,160 @@ +package pipeline_test + +import ( + "sync" + "testing" + "time" + + "github.com/ozontech/file.d/pipeline" + "github.com/ozontech/file.d/plugin/output/devnull" + "github.com/ozontech/file.d/test" + insaneJSON "github.com/ozontech/insane-json" + "github.com/stretchr/testify/assert" + "go.uber.org/atomic" +) + +type fakeOutputPluginController struct { + mu sync.Mutex + commits []*pipeline.Event + errors []string +} + +func (f *fakeOutputPluginController) Commit(event *pipeline.Event) { + f.mu.Lock() + defer f.mu.Unlock() + f.commits = append(f.commits, event) +} +func (f *fakeOutputPluginController) Error(err string) { + f.mu.Lock() + defer f.mu.Unlock() + f.errors = append(f.errors, err) +} + +func (f *fakeOutputPluginController) getCommits() []*pipeline.Event { + f.mu.Lock() + defer f.mu.Unlock() + return f.commits +} + +func (f *fakeOutputPluginController) getErrors() []string { + f.mu.Lock() + defer f.mu.Unlock() + return f.errors +} + +func TestRouterNormalProcessing(t *testing.T) { + t.Parallel() + + r := pipeline.NewRouter() + controller := &fakeOutputPluginController{} + + // Setup main output that succeeds + var outputCount atomic.Int32 + outputPlugin, outputConfig := createDevNullPlugin(func(event *pipeline.Event) { + outputCount.Add(1) + }) + + r.SetOutput(&pipeline.OutputPluginInfo{ + PluginStaticInfo: &pipeline.PluginStaticInfo{Config: outputConfig}, + PluginRuntimeInfo: &pipeline.PluginRuntimeInfo{Plugin: outputPlugin}, + }) + + // Setup dead queue that shouldn't be used + var deadQueueCount atomic.Int32 + deadQueuePlugin, deadQueueConfig := createDevNullPlugin(func(event *pipeline.Event) { + deadQueueCount.Add(1) + }) + + r.SetDeadQueueOutput(&pipeline.OutputPluginInfo{ + PluginStaticInfo: &pipeline.PluginStaticInfo{Config: deadQueueConfig}, + PluginRuntimeInfo: &pipeline.PluginRuntimeInfo{Plugin: deadQueuePlugin}, + }) + + params := test.NewEmptyOutputPluginParams() + params.PipelineName = "test_pipeline" + params.Router = *r + params.Controller = controller + r.Start(params) + defer r.Stop() + + // Send test event + event := newEvent(t) + r.Out(event) + + // Wait for processing + assert.Eventually(t, func() bool { + return outputCount.Load() == 1 + }, 100*time.Millisecond, 10*time.Millisecond, "event should be processed") + + assert.Equal(t, int32(0), deadQueueCount.Load(), "dead queue should not be used") + assert.Len(t, controller.getCommits(), 1, "should commit successful event") + assert.Empty(t, controller.getErrors(), "should not produce errors") +} + +func TestRouterDeadQueueProcessing(t *testing.T) { + t.Parallel() + + r := pipeline.NewRouter() + controller := &fakeOutputPluginController{} + + // Setup main output that fails + var outputCount atomic.Int32 + outputPlugin, outputConfig := createDevNullPlugin(func(event *pipeline.Event) { + outputCount.Add(1) + r.Fail(event) + }) + + r.SetOutput(&pipeline.OutputPluginInfo{ + PluginStaticInfo: &pipeline.PluginStaticInfo{Config: outputConfig}, + PluginRuntimeInfo: &pipeline.PluginRuntimeInfo{Plugin: outputPlugin}, + }) + + // Setup dead queue + var deadQueueCount atomic.Int32 + deadQueuePlugin, deadQueueConfig := createDevNullPlugin(func(event *pipeline.Event) { + deadQueueCount.Add(1) + }) + + r.SetDeadQueueOutput(&pipeline.OutputPluginInfo{ + PluginStaticInfo: &pipeline.PluginStaticInfo{Config: deadQueueConfig}, + PluginRuntimeInfo: &pipeline.PluginRuntimeInfo{Plugin: deadQueuePlugin}, + }) + + params := test.NewEmptyOutputPluginParams() + params.PipelineName = "test_pipeline" + params.Router = *r + params.Controller = controller + r.Start(params) + defer r.Stop() + + // Send test event + event := newEvent(t) + r.Out(event) + + // Wait for processing + assert.Eventually(t, func() bool { + return deadQueueCount.Load() == 1 + }, 100*time.Millisecond, 10*time.Millisecond, "event should go to dead queue") + + assert.Equal(t, int32(1), outputCount.Load(), "main output should try to process") + assert.Len(t, controller.getCommits(), 2, "should commit successful event") + assert.Empty(t, controller.getErrors(), "should not produce errors") +} + +func createDevNullPlugin(outFn func(event *pipeline.Event)) (*devnull.Plugin, pipeline.AnyConfig) { + plugin, config := devnull.Factory() + p := plugin.(*devnull.Plugin) + p.SetOutFn(outFn) + return p, config +} + +func newEvent(t *testing.T) *pipeline.Event { + root, err := insaneJSON.DecodeString(`{}`) + if err != nil { + t.Skip() // ignore invalid input + } + return &pipeline.Event{ + Root: root, + Buf: make([]byte, 0, 1024), + } +} From 9258691825235ddd1ef34b8c5d807b39c811086e Mon Sep 17 00:00:00 2001 From: Dmitry Romanov Date: Wed, 6 Aug 2025 17:44:30 +0700 Subject: [PATCH 08/19] add TestBackoffWithErrorWithDeadQueue --- pipeline/backoff_test.go | 54 ++++++++++++++++++++++++++++++++++++---- 1 file changed, 49 insertions(+), 5 deletions(-) diff --git a/pipeline/backoff_test.go b/pipeline/backoff_test.go index 16a25a76b..7b0701a9c 100644 --- a/pipeline/backoff_test.go +++ b/pipeline/backoff_test.go @@ -3,6 +3,7 @@ package pipeline import ( "errors" "testing" + "time" "github.com/ozontech/file.d/metric" "github.com/prometheus/client_golang/prometheus" @@ -29,12 +30,17 @@ func TestBackoff(t *testing.T) { eventCount.Inc() return nil }, - BackoffOpts{AttemptNum: 3}, + BackoffOpts{ + AttemptNum: 3, + DeadQueueIsAvailable: false, + }, errorFn, ) - batcherBackoff.Out(nil, nil) - + data := WorkerData(nil) + batch := newBatch(1, 1, 1*time.Second) + batch.append(&Event{}) + batcherBackoff.Out(&data, batch) assert.Equal(t, errorCountBefore, errorCount.Load(), "wrong error count") assert.Equal(t, eventCountBefore+1, eventCount.Load(), "wrong event count") } @@ -53,10 +59,48 @@ func TestBackoffWithError(t *testing.T) { func(workerData *WorkerData, batch *Batch) error { return errors.New("some error") }, - BackoffOpts{AttemptNum: 3}, + BackoffOpts{ + AttemptNum: 3, + DeadQueueIsAvailable: false, + }, + errorFn, + ) + + data := WorkerData(nil) + batch := newBatch(1, 1, 1*time.Second) + batch.append(&Event{}) + batcherBackoff.Out(&data, batch) + assert.Equal(t, prevValue+1, errorCount.Load(), "wrong error count") + assert.Equal(t, 1, len(batch.events), "wrong number of events in batch") + assert.Equal(t, BatchStatusNotReady, batch.status, "wrong batch status") +} + +func TestBackoffWithErrorWithDeadQueue(t *testing.T) { + errorCount := &atomic.Int32{} + prevValue := errorCount.Load() + errorFn := func(err error, events []*Event) { + errorCount.Inc() + } + + batcherBackoff := NewRetriableBatcher( + &BatcherOptions{ + MetricCtl: metric.NewCtl("", prometheus.NewRegistry()), + }, + func(workerData *WorkerData, batch *Batch) error { + return errors.New("some error") + }, + BackoffOpts{ + AttemptNum: 3, + DeadQueueIsAvailable: true, + }, errorFn, ) - batcherBackoff.Out(nil, nil) + data := WorkerData(nil) + batch := newBatch(1, 1, 1*time.Second) + batch.append(&Event{}) + batcherBackoff.Out(&data, batch) assert.Equal(t, prevValue+1, errorCount.Load(), "wrong error count") + assert.Equal(t, 0, len(batch.events), "wrong number of events in batch") + assert.Equal(t, BatchStatusInDeadQueue, batch.status, "wrong batch status") } From f958baf5883236d00e470f0fa0913d125a493577 Mon Sep 17 00:00:00 2001 From: Dmitry Romanov Date: Thu, 7 Aug 2025 18:57:15 +0700 Subject: [PATCH 09/19] e2e test for deadqueue --- e2e/file_clickhouse/config.yml | 50 +++++++++++++++++++++++++++++++++- 1 file changed, 49 insertions(+), 1 deletion(-) diff --git a/e2e/file_clickhouse/config.yml b/e2e/file_clickhouse/config.yml index 63de453ee..7097ea0e2 100644 --- a/e2e/file_clickhouse/config.yml +++ b/e2e/file_clickhouse/config.yml @@ -21,11 +21,59 @@ pipelines: override: true - type: debug output: + deadqueue: + type: clickhouse + addresses: + - 127.0.0.1:9001 + table: test_table_insert + insert_timeout: 1m + columns: + - name: c1 + type: String + - name: c2 + type: Int8 + - name: c3 + type: Int16 + - name: c4 + type: Nullable(Int16) + - name: c5 + type: Nullable(String) + - name: level + type: Enum8('error'=1, 'warn'=2, 'info'=3, 'debug'=4) + - name: ipv4 + type: Nullable(IPv4) + - name: ipv6 + type: Nullable(IPv6) + - name: ts + type: DateTime + - name: ts_with_tz + type: DateTime('Europe/Moscow') + - name: ts64 + type: DateTime64(3, 'UTC') + - name: ts64_auto + type: DateTime64(9, 'UTC') + - name: ts_rfc3339nano + type: DateTime64(9) + - name: f32 + type: Float32 + - name: f64 + type: Float64 + - name: lc_str + type: LowCardinality(String) + - name: str_arr + type: Array(String) + - name: map_str_str + type: Map(String,String) + - name: uuid + type: UUID + - name: uuid_nullable + type: Nullable(UUID) type: clickhouse addresses: - 127.0.0.1:9001 - table: test_table_insert + table: test_table_insert_not_exists insert_timeout: 1m + retry: 0 columns: - name: c1 type: String From 08cb7f1d5f47244eeb2b96b92858e431766539b5 Mon Sep 17 00:00:00 2001 From: Dmitry Romanov Date: Mon, 11 Aug 2025 14:47:32 +0700 Subject: [PATCH 10/19] fix after review --- fd/file.d.go | 6 +++--- pipeline/backoff.go | 8 ++++---- pipeline/backoff_test.go | 6 +++--- pipeline/plugin.go | 2 +- pipeline/processor.go | 4 ++-- pipeline/router.go | 10 +++++----- pipeline/router_test.go | 4 ++-- plugin/output/clickhouse/clickhouse.go | 6 +++--- plugin/output/devnull/devnull.go | 2 +- plugin/output/elasticsearch/elasticsearch.go | 6 +++--- plugin/output/gelf/gelf.go | 11 ++++++----- plugin/output/kafka/kafka.go | 6 +++--- plugin/output/loki/loki.go | 6 +++--- plugin/output/postgres/postgres.go | 6 +++--- plugin/output/splunk/splunk.go | 6 +++--- plugin/output/stdout/stdout.go | 2 +- test/test.go | 1 + 17 files changed, 47 insertions(+), 45 deletions(-) diff --git a/fd/file.d.go b/fd/file.d.go index a52267f75..11e1eb0eb 100644 --- a/fd/file.d.go +++ b/fd/file.d.go @@ -263,7 +263,7 @@ func (f *FileD) getStaticInfo(pipelineConfig *cfg.PipelineConfig, pluginKind pip if len(deadqueueMap) > 0 { deadqueueType := deadqueue.Get("type").MustString() if deadqueueType == "" { - return nil, fmt.Errorf("%s doesn't have type", pluginKind) + return nil, fmt.Errorf("deadqueue of %s doesn't have type", pluginKind) } deadqueueInfo, err = f.plugins.Get(pluginKind, deadqueueType) if err != nil { @@ -274,12 +274,12 @@ func (f *FileD) getStaticInfo(pipelineConfig *cfg.PipelineConfig, pluginKind pip deadqueueConfigJson, err := deadqueue.Encode() if err != nil { - logger.Panicf("can't create config json for %s", t) + logger.Panicf("can't create config json for %s deadqueue", deadqueueType) } config, err := pipeline.GetConfig(deadqueueInfo, deadqueueConfigJson, values) if err != nil { - logger.Fatalf("error on creating %s with type %q: %s", t, pluginKind, err.Error()) + logger.Fatalf("error on creating deadqueue of %s with type %q: %s", deadqueueType, pluginKind, err.Error()) } deadqueueInfo.Config = config diff --git a/pipeline/backoff.go b/pipeline/backoff.go index 6a843863b..eb8fc40da 100644 --- a/pipeline/backoff.go +++ b/pipeline/backoff.go @@ -11,7 +11,7 @@ type RetriableBatcher struct { outFn RetriableBatcherOutFn batcher *Batcher backoffOpts BackoffOpts - DeadQueueIsAvailable bool + IsDeadQueueAvailable bool onRetryError func(err error, events []*Event) } @@ -21,7 +21,7 @@ type BackoffOpts struct { MinRetention time.Duration Multiplier float64 AttemptNum int - DeadQueueIsAvailable bool + IsDeadQueueAvailable bool } func NewRetriableBatcher(batcherOpts *BatcherOptions, batcherOutFn RetriableBatcherOutFn, opts BackoffOpts, onError func(err error, events []*Event)) *RetriableBatcher { @@ -29,7 +29,7 @@ func NewRetriableBatcher(batcherOpts *BatcherOptions, batcherOutFn RetriableBatc outFn: batcherOutFn, backoffOpts: opts, onRetryError: onError, - DeadQueueIsAvailable: opts.DeadQueueIsAvailable, + IsDeadQueueAvailable: opts.IsDeadQueueAvailable, } batcherBackoff.setBatcher(batcherOpts) return batcherBackoff @@ -66,7 +66,7 @@ func (b *RetriableBatcher) Out(data *WorkerData, batch *Batch) { events = batch.events } b.onRetryError(err, events) - if batch != nil && b.DeadQueueIsAvailable { + if batch != nil && b.IsDeadQueueAvailable { batch.reset() batch.status = BatchStatusInDeadQueue } diff --git a/pipeline/backoff_test.go b/pipeline/backoff_test.go index 7b0701a9c..fd5c01ea3 100644 --- a/pipeline/backoff_test.go +++ b/pipeline/backoff_test.go @@ -32,7 +32,7 @@ func TestBackoff(t *testing.T) { }, BackoffOpts{ AttemptNum: 3, - DeadQueueIsAvailable: false, + IsDeadQueueAvailable: false, }, errorFn, ) @@ -61,7 +61,7 @@ func TestBackoffWithError(t *testing.T) { }, BackoffOpts{ AttemptNum: 3, - DeadQueueIsAvailable: false, + IsDeadQueueAvailable: false, }, errorFn, ) @@ -91,7 +91,7 @@ func TestBackoffWithErrorWithDeadQueue(t *testing.T) { }, BackoffOpts{ AttemptNum: 3, - DeadQueueIsAvailable: true, + IsDeadQueueAvailable: true, }, errorFn, ) diff --git a/pipeline/plugin.go b/pipeline/plugin.go index 93a4ebb6d..95da53d8e 100644 --- a/pipeline/plugin.go +++ b/pipeline/plugin.go @@ -59,7 +59,7 @@ type ActionPluginParams struct { type OutputPluginParams struct { PluginDefaultParams Controller OutputPluginController - Router Router + Router *Router Logger *zap.SugaredLogger } diff --git a/pipeline/processor.go b/pipeline/processor.go index d5a0d745a..f0efbbe33 100644 --- a/pipeline/processor.go +++ b/pipeline/processor.go @@ -55,7 +55,7 @@ func allEventStatuses() []eventStatus { type processor struct { id int streamer *streamer - router Router + router *Router finalize finalizeFn activeCounter *atomic.Int32 @@ -88,7 +88,7 @@ func newProcessor( id: id, streamer: streamer, actionMetrics: actionMetrics, - router: *router, + router: router, finalize: finalizeFn, activeCounter: activeCounter, diff --git a/pipeline/router.go b/pipeline/router.go index bbfbb9907..6d36faf54 100644 --- a/pipeline/router.go +++ b/pipeline/router.go @@ -27,7 +27,7 @@ func (r *Router) Ack(event *Event) { } func (r *Router) Fail(event *Event) { - if r.DeadQueueIsAvailable() { + if r.IsDeadQueueAvailable() { r.deadQueue.Out(event) } } @@ -38,19 +38,19 @@ func (r *Router) Out(event *Event) { func (r *Router) Stop() { r.output.Stop() - if r.DeadQueueIsAvailable() { + if r.IsDeadQueueAvailable() { r.deadQueue.Stop() } } -func (r *Router) DeadQueueIsAvailable() bool { +func (r *Router) IsDeadQueueAvailable() bool { return r.deadQueue != nil } func (r *Router) Start(params *OutputPluginParams) { - params.Router = *r + params.Router = r r.output.Start(r.outputInfo.Config, params) - if r.DeadQueueIsAvailable() { + if r.IsDeadQueueAvailable() { r.deadQueue.Start(r.deadQueueInfo.Config, params) } } diff --git a/pipeline/router_test.go b/pipeline/router_test.go index 4bc0ba569..f6518b681 100644 --- a/pipeline/router_test.go +++ b/pipeline/router_test.go @@ -72,7 +72,7 @@ func TestRouterNormalProcessing(t *testing.T) { params := test.NewEmptyOutputPluginParams() params.PipelineName = "test_pipeline" - params.Router = *r + params.Router = r params.Controller = controller r.Start(params) defer r.Stop() @@ -122,7 +122,7 @@ func TestRouterDeadQueueProcessing(t *testing.T) { params := test.NewEmptyOutputPluginParams() params.PipelineName = "test_pipeline" - params.Router = *r + params.Router = r params.Controller = controller r.Start(params) defer r.Stop() diff --git a/plugin/output/clickhouse/clickhouse.go b/plugin/output/clickhouse/clickhouse.go index f65c3d342..c4e0524ca 100644 --- a/plugin/output/clickhouse/clickhouse.go +++ b/plugin/output/clickhouse/clickhouse.go @@ -59,7 +59,7 @@ type Plugin struct { insertErrorsMetric prometheus.Counter queriesCountMetric prometheus.Counter - router pipeline.Router + router *pipeline.Router } type Setting struct { @@ -444,12 +444,12 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP MinRetention: p.config.Retention_, Multiplier: float64(p.config.RetentionExponentMultiplier), AttemptNum: p.config.Retry, - DeadQueueIsAvailable: p.router.DeadQueueIsAvailable(), + IsDeadQueueAvailable: p.router.IsDeadQueueAvailable(), } onError := func(err error, events []*pipeline.Event) { var level zapcore.Level - if p.config.FatalOnFailedInsert && !p.router.DeadQueueIsAvailable() { + if p.config.FatalOnFailedInsert && !p.router.IsDeadQueueAvailable() { level = zapcore.FatalLevel } else { level = zapcore.ErrorLevel diff --git a/plugin/output/devnull/devnull.go b/plugin/output/devnull/devnull.go index 50d3d28d6..4c2668ff6 100644 --- a/plugin/output/devnull/devnull.go +++ b/plugin/output/devnull/devnull.go @@ -12,7 +12,7 @@ It provides an API to test pipelines and other plugins. type Plugin struct { controller pipeline.OutputPluginController - router pipeline.Router + router *pipeline.Router outFn func(event *pipeline.Event) total *atomic.Int64 } diff --git a/plugin/output/elasticsearch/elasticsearch.go b/plugin/output/elasticsearch/elasticsearch.go index cf91ef8af..ec0229c91 100644 --- a/plugin/output/elasticsearch/elasticsearch.go +++ b/plugin/output/elasticsearch/elasticsearch.go @@ -51,7 +51,7 @@ type Plugin struct { sendErrorMetric *prometheus.CounterVec indexingErrorsMetric prometheus.Counter - router pipeline.Router + router *pipeline.Router } // ! config-params @@ -263,12 +263,12 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP MinRetention: p.config.Retention_, Multiplier: float64(p.config.RetentionExponentMultiplier), AttemptNum: p.config.Retry, - DeadQueueIsAvailable: p.router.DeadQueueIsAvailable(), + IsDeadQueueAvailable: p.router.IsDeadQueueAvailable(), } onError := func(err error, events []*pipeline.Event) { var level zapcore.Level - if p.config.FatalOnFailedInsert && !p.router.DeadQueueIsAvailable() { + if p.config.FatalOnFailedInsert && !p.router.IsDeadQueueAvailable() { level = zapcore.FatalLevel } else { level = zapcore.ErrorLevel diff --git a/plugin/output/gelf/gelf.go b/plugin/output/gelf/gelf.go index 149a0d4ad..d16e0e314 100644 --- a/plugin/output/gelf/gelf.go +++ b/plugin/output/gelf/gelf.go @@ -49,7 +49,7 @@ type Plugin struct { // plugin metrics sendErrorMetric prometheus.Counter - router pipeline.Router + router *pipeline.Router } // ! config-params @@ -234,14 +234,15 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP p.router = params.Router backoffOpts := pipeline.BackoffOpts{ - MinRetention: p.config.Retention_, - Multiplier: float64(p.config.RetentionExponentMultiplier), - AttemptNum: p.config.Retry, + MinRetention: p.config.Retention_, + Multiplier: float64(p.config.RetentionExponentMultiplier), + AttemptNum: p.config.Retry, + IsDeadQueueAvailable: p.router.IsDeadQueueAvailable(), } onError := func(err error, events []*pipeline.Event) { var level zapcore.Level - if p.config.FatalOnFailedInsert && !p.router.DeadQueueIsAvailable() { + if p.config.FatalOnFailedInsert && !p.router.IsDeadQueueAvailable() { level = zapcore.FatalLevel } else { level = zapcore.ErrorLevel diff --git a/plugin/output/kafka/kafka.go b/plugin/output/kafka/kafka.go index d1ebe7e6f..186d73063 100644 --- a/plugin/output/kafka/kafka.go +++ b/plugin/output/kafka/kafka.go @@ -40,7 +40,7 @@ type Plugin struct { // plugin metrics sendErrorMetric prometheus.Counter - router pipeline.Router + router *pipeline.Router } // ! config-params @@ -263,12 +263,12 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP MinRetention: p.config.Retention_, Multiplier: float64(p.config.RetentionExponentMultiplier), AttemptNum: p.config.Retry, - DeadQueueIsAvailable: p.router.DeadQueueIsAvailable(), + IsDeadQueueAvailable: p.router.IsDeadQueueAvailable(), } onError := func(err error, events []*pipeline.Event) { var level zapcore.Level - if p.config.FatalOnFailedInsert && !p.router.DeadQueueIsAvailable() { + if p.config.FatalOnFailedInsert && !p.router.IsDeadQueueAvailable() { level = zapcore.FatalLevel } else { level = zapcore.ErrorLevel diff --git a/plugin/output/loki/loki.go b/plugin/output/loki/loki.go index d2216d362..8dfb6f995 100644 --- a/plugin/output/loki/loki.go +++ b/plugin/output/loki/loki.go @@ -236,7 +236,7 @@ type Plugin struct { labels map[string]string - router pipeline.Router + router *pipeline.Router } func init() { @@ -277,12 +277,12 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP MinRetention: p.config.Retention_, Multiplier: float64(p.config.RetentionExponentMultiplier), AttemptNum: p.config.Retry, - DeadQueueIsAvailable: p.router.DeadQueueIsAvailable(), + IsDeadQueueAvailable: p.router.IsDeadQueueAvailable(), } onError := func(err error, events []*pipeline.Event) { var level zapcore.Level - if p.config.FatalOnFailedInsert && !p.router.DeadQueueIsAvailable() { + if p.config.FatalOnFailedInsert && !p.router.IsDeadQueueAvailable() { level = zapcore.FatalLevel } else { level = zapcore.ErrorLevel diff --git a/plugin/output/postgres/postgres.go b/plugin/output/postgres/postgres.go index 28c583e5c..3d3392b59 100644 --- a/plugin/output/postgres/postgres.go +++ b/plugin/output/postgres/postgres.go @@ -112,7 +112,7 @@ type Plugin struct { writtenEventMetric prometheus.Counter insertErrorsMetric prometheus.Counter - router pipeline.Router + router *pipeline.Router } type ConfigColumn struct { @@ -293,12 +293,12 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP MinRetention: p.config.Retention_, Multiplier: float64(p.config.RetentionExponentMultiplier), AttemptNum: p.config.Retry, - DeadQueueIsAvailable: p.router.DeadQueueIsAvailable(), + IsDeadQueueAvailable: p.router.IsDeadQueueAvailable(), } onError := func(err error, events []*pipeline.Event) { var level zapcore.Level - if p.config.FatalOnFailedInsert && !p.router.DeadQueueIsAvailable() { + if p.config.FatalOnFailedInsert && !p.router.IsDeadQueueAvailable() { level = zapcore.FatalLevel } else { level = zapcore.ErrorLevel diff --git a/plugin/output/splunk/splunk.go b/plugin/output/splunk/splunk.go index 35506096d..0b2b1f98f 100644 --- a/plugin/output/splunk/splunk.go +++ b/plugin/output/splunk/splunk.go @@ -97,7 +97,7 @@ type Plugin struct { // plugin metrics sendErrorMetric *prometheus.CounterVec - router pipeline.Router + router *pipeline.Router } type CopyField struct { @@ -270,12 +270,12 @@ func (p *Plugin) Start(config pipeline.AnyConfig, params *pipeline.OutputPluginP MinRetention: p.config.Retention_, Multiplier: float64(p.config.RetentionExponentMultiplier), AttemptNum: p.config.Retry, - DeadQueueIsAvailable: p.router.DeadQueueIsAvailable(), + IsDeadQueueAvailable: p.router.IsDeadQueueAvailable(), } onError := func(err error, events []*pipeline.Event) { var level zapcore.Level - if p.config.FatalOnFailedInsert && !p.router.DeadQueueIsAvailable() { + if p.config.FatalOnFailedInsert && !p.router.IsDeadQueueAvailable() { level = zapcore.FatalLevel } else { level = zapcore.ErrorLevel diff --git a/plugin/output/stdout/stdout.go b/plugin/output/stdout/stdout.go index d4238f683..b581d0456 100644 --- a/plugin/output/stdout/stdout.go +++ b/plugin/output/stdout/stdout.go @@ -13,7 +13,7 @@ It writes events to stdout(also known as console). type Plugin struct { controller pipeline.OutputPluginController - router pipeline.Router + router *pipeline.Router } type Config struct{} diff --git a/test/test.go b/test/test.go index 37e73e12e..6efd3a307 100644 --- a/test/test.go +++ b/test/test.go @@ -210,6 +210,7 @@ func NewEmptyOutputPluginParams() *pipeline.OutputPluginParams { return &pipeline.OutputPluginParams{ PluginDefaultParams: newDefaultParams(), Logger: newLogger().Named("output"), + Router: pipeline.NewRouter(), } } From 0dcad00725d84ebdca75c1f66850d3fb20ac186f Mon Sep 17 00:00:00 2001 From: Dmitry Romanov Date: Tue, 12 Aug 2025 17:34:17 +0700 Subject: [PATCH 11/19] add dead-queue docs --- _sidebar.idoc.md | 2 +- _sidebar.md | 2 +- plugin/README.md | 14 ++++ plugin/output/README.idoc.md | 73 ++++++++++++++++- plugin/output/README.md | 85 ++++++++++++++++++++ plugin/output/clickhouse/README.md | 5 +- plugin/output/clickhouse/clickhouse.go | 5 +- plugin/output/elasticsearch/README.md | 5 +- plugin/output/elasticsearch/elasticsearch.go | 5 +- plugin/output/gelf/README.md | 5 +- plugin/output/gelf/gelf.go | 5 +- plugin/output/kafka/README.md | 5 +- plugin/output/kafka/kafka.go | 5 +- plugin/output/loki/README.md | 5 +- plugin/output/loki/loki.go | 5 +- plugin/output/postgres/README.md | 5 +- plugin/output/postgres/postgres.go | 5 +- plugin/output/s3/README.md | 1 - plugin/output/s3/s3.go | 1 - plugin/output/splunk/README.md | 5 +- plugin/output/splunk/splunk.go | 5 +- 21 files changed, 215 insertions(+), 33 deletions(-) diff --git a/_sidebar.idoc.md b/_sidebar.idoc.md index 0573472e5..a70f5603b 100644 --- a/_sidebar.idoc.md +++ b/_sidebar.idoc.md @@ -16,7 +16,7 @@ @global-contents-table-plugin-input|links-list - Action @global-contents-table-plugin-action|links-list - - Output + - [Output](/plugin/output/README.md) @global-contents-table-plugin-output|links-list - **Pipeline** diff --git a/_sidebar.md b/_sidebar.md index 637498584..cc4491022 100644 --- a/_sidebar.md +++ b/_sidebar.md @@ -50,7 +50,7 @@ - [split](plugin/action/split/README.md) - [throttle](plugin/action/throttle/README.md) - - Output + - [Output](/plugin/output/README.md) - [clickhouse](plugin/output/clickhouse/README.md) - [devnull](plugin/output/devnull/README.md) - [elasticsearch](plugin/output/elasticsearch/README.md) diff --git a/plugin/README.md b/plugin/README.md index 1e6c8f9e4..ac1f3fe5b 100755 --- a/plugin/README.md +++ b/plugin/README.md @@ -764,6 +764,8 @@ It sends the event batches to Clickhouse database using File.d uses low level Go client - [ch-go](https://github.com/ClickHouse/ch-go) to provide these features. +Supports [dead queue](/plugin/output/README.md#dead-queue). + [More details...](plugin/output/clickhouse/README.md) ## devnull It provides an API to test pipelines and other plugins. @@ -773,6 +775,8 @@ It provides an API to test pipelines and other plugins. It sends events into Elasticsearch. It uses `_bulk` API to send events in batches. If a network error occurs, the batch will infinitely try to be delivered to the random endpoint. +Supports [dead queue](/plugin/output/README.md#dead-queue). + [More details...](plugin/output/elasticsearch/README.md) ## file It sends event batches into files. @@ -796,18 +800,26 @@ GELF messages are separated by null byte. Each message is a JSON with the follow Every field with an underscore prefix `_` will be treated as an extra field. Allowed characters in field names are letters, numbers, underscores, dashes, and dots. +Supports [dead queue](/plugin/output/README.md#dead-queue). + [More details...](plugin/output/gelf/README.md) ## kafka It sends the event batches to kafka brokers using `franz-go` lib. +Supports [dead queue](/plugin/output/README.md#dead-queue). + [More details...](plugin/output/kafka/README.md) ## loki It sends the logs batches to Loki using HTTP API. +Supports [dead queue](/plugin/output/README.md#dead-queue). + [More details...](plugin/output/loki/README.md) ## postgres It sends the event batches to postgres db using pgx. +Supports [dead queue](/plugin/output/README.md#dead-queue). + [More details...](plugin/output/postgres/README.md) ## s3 Sends events to s3 output of one or multiple buckets. @@ -936,6 +948,8 @@ Out: } ``` +Supports [dead queue](/plugin/output/README.md#dead-queue). + [More details...](plugin/output/splunk/README.md) ## stdout It writes events to stdout(also known as console). diff --git a/plugin/output/README.idoc.md b/plugin/output/README.idoc.md index 70d8c3b7f..3e5b3a9c3 100644 --- a/plugin/output/README.idoc.md +++ b/plugin/output/README.idoc.md @@ -1,3 +1,74 @@ # Output plugins -@global-contents-table-plugin-output|contents-table \ No newline at end of file +@global-contents-table-plugin-output|contents-table + +## dead queue + +Failed events from the main pipeline are redirected to a dead-letter queue (DLQ) to prevent data loss and enable recovery. + +### Examples + +#### Dead queue to the reserve elasticsearch + +Consumes logs from a Kafka topic. Sends them to Elasticsearch (primary cluster). Fails over to a reserve ("dead-letter") Elasticsearch if the primary is unavailable. + +```yaml +main_pipeline: + input: + type: kafka + brokers: + - kafka:9092 + topics: + - logs + output: + type: elasticsearch + workers_count: 32 + endpoints: + - http://elasticsearch-primary:9200 + # route to reserve elasticsearch + deadqueue: + endpoints: + - http://elasticsearch-reserve:9200 + type: elasticsearch +``` + +#### Dead queue with second kafka topic and low priority consumer + +Main Pipeline: Processes logs from Kafka → Elasticsearch. Failed events go to a dead-letter Kafka topic. + +Dead-Queue Pipeline: Re-processes failed events from the DLQ topic with lower priority. + +```yaml +main_pipeline: + input: + type: kafka + brokers: + - kafka:9092 + topics: + - logs + output: + type: elasticsearch + workers_count: 32 + endpoints: + - http://elasticsearch:9200 + # route to deadqueue pipeline + deadqueue: + brokers: + - kafka:9092 + default_topic: logs-deadqueue + type: kafka + +deadqueue_pipeline: + input: + type: kafka + brokers: + - kafka:9092 + topics: + - logs-deadqueue + output: + type: elasticsearch + workers_count: 1 # low priority + fatal_on_failed_insert: false + endpoints: + - http://elasticsearch:9200 +``` \ No newline at end of file diff --git a/plugin/output/README.md b/plugin/output/README.md index f968a6b45..0a584b075 100755 --- a/plugin/output/README.md +++ b/plugin/output/README.md @@ -7,6 +7,8 @@ It sends the event batches to Clickhouse database using File.d uses low level Go client - [ch-go](https://github.com/ClickHouse/ch-go) to provide these features. +Supports [dead queue](/plugin/output/README.md#dead-queue). + [More details...](plugin/output/clickhouse/README.md) ## devnull It provides an API to test pipelines and other plugins. @@ -16,6 +18,8 @@ It provides an API to test pipelines and other plugins. It sends events into Elasticsearch. It uses `_bulk` API to send events in batches. If a network error occurs, the batch will infinitely try to be delivered to the random endpoint. +Supports [dead queue](/plugin/output/README.md#dead-queue). + [More details...](plugin/output/elasticsearch/README.md) ## file It sends event batches into files. @@ -39,18 +43,26 @@ GELF messages are separated by null byte. Each message is a JSON with the follow Every field with an underscore prefix `_` will be treated as an extra field. Allowed characters in field names are letters, numbers, underscores, dashes, and dots. +Supports [dead queue](/plugin/output/README.md#dead-queue). + [More details...](plugin/output/gelf/README.md) ## kafka It sends the event batches to kafka brokers using `franz-go` lib. +Supports [dead queue](/plugin/output/README.md#dead-queue). + [More details...](plugin/output/kafka/README.md) ## loki It sends the logs batches to Loki using HTTP API. +Supports [dead queue](/plugin/output/README.md#dead-queue). + [More details...](plugin/output/loki/README.md) ## postgres It sends the event batches to postgres db using pgx. +Supports [dead queue](/plugin/output/README.md#dead-queue). + [More details...](plugin/output/postgres/README.md) ## s3 Sends events to s3 output of one or multiple buckets. @@ -179,9 +191,82 @@ Out: } ``` +Supports [dead queue](/plugin/output/README.md#dead-queue). + [More details...](plugin/output/splunk/README.md) ## stdout It writes events to stdout(also known as console). [More details...](plugin/output/stdout/README.md) + +## dead queue + +Failed events from the main pipeline are redirected to a dead-letter queue (DLQ) to prevent data loss and enable recovery. + +### Examples + +#### Dead queue to the reserve elasticsearch + +Consumes logs from a Kafka topic. Sends them to Elasticsearch (primary cluster). Fails over to a reserve ("dead-letter") Elasticsearch if the primary is unavailable. + +```yaml +main_pipeline: + input: + type: kafka + brokers: + - kafka:9092 + topics: + - logs + output: + type: elasticsearch + workers_count: 32 + endpoints: + - http://elasticsearch-primary:9200 + # route to reserve elasticsearch + deadqueue: + endpoints: + - http://elasticsearch-reserve:9200 + type: elasticsearch +``` + +#### Dead queue with second kafka topic and low priority consumer + +Main Pipeline: Processes logs from Kafka → Elasticsearch. Failed events go to a dead-letter Kafka topic. + +Dead-Queue Pipeline: Re-processes failed events from the DLQ topic with lower priority. + +```yaml +main_pipeline: + input: + type: kafka + brokers: + - kafka:9092 + topics: + - logs + output: + type: elasticsearch + workers_count: 32 + endpoints: + - http://elasticsearch:9200 + # route to deadqueue pipeline + deadqueue: + brokers: + - kafka:9092 + default_topic: logs-deadqueue + type: kafka + +deadqueue_pipeline: + input: + type: kafka + brokers: + - kafka:9092 + topics: + - logs-deadqueue + output: + type: elasticsearch + workers_count: 1 # low priority + fatal_on_failed_insert: false + endpoints: + - http://elasticsearch:9200 +```
*Generated using [__insane-doc__](https://github.com/vitkovskii/insane-doc)* \ No newline at end of file diff --git a/plugin/output/clickhouse/README.md b/plugin/output/clickhouse/README.md index a59df51b0..08b1b374e 100644 --- a/plugin/output/clickhouse/README.md +++ b/plugin/output/clickhouse/README.md @@ -5,6 +5,8 @@ It sends the event batches to Clickhouse database using File.d uses low level Go client - [ch-go](https://github.com/ClickHouse/ch-go) to provide these features. +Supports [dead queue](/plugin/output/README.md#dead-queue). + ### Config params **`addresses`** *`[]Address`* *`required`* @@ -128,8 +130,7 @@ File.d will fall with non-zero exit code or skip message (see fatal_on_failed_in **`fatal_on_failed_insert`** *`bool`* *`default=false`* -After an insert error, fall with a non-zero exit code or not -**Experimental feature** +After an insert error, fall with a non-zero exit code or not. A configured deadqueue disables fatal exits.
diff --git a/plugin/output/clickhouse/clickhouse.go b/plugin/output/clickhouse/clickhouse.go index c4e0524ca..ca3338647 100644 --- a/plugin/output/clickhouse/clickhouse.go +++ b/plugin/output/clickhouse/clickhouse.go @@ -30,6 +30,8 @@ It sends the event batches to Clickhouse database using [Native protocol](https://clickhouse.com/docs/en/interfaces/tcp/). File.d uses low level Go client - [ch-go](https://github.com/ClickHouse/ch-go) to provide these features. + +Supports [dead queue](/plugin/output/README.md#dead-queue). }*/ const ( @@ -247,8 +249,7 @@ type Config struct { // > @3@4@5@6 // > - // > After an insert error, fall with a non-zero exit code or not - // > **Experimental feature** + // > After an insert error, fall with a non-zero exit code or not. A configured deadqueue disables fatal exits. FatalOnFailedInsert bool `json:"fatal_on_failed_insert" default:"false"` // * // > @3@4@5@6 diff --git a/plugin/output/elasticsearch/README.md b/plugin/output/elasticsearch/README.md index 409c8ecee..21114e946 100755 --- a/plugin/output/elasticsearch/README.md +++ b/plugin/output/elasticsearch/README.md @@ -2,6 +2,8 @@ It sends events into Elasticsearch. It uses `_bulk` API to send events in batches. If a network error occurs, the batch will infinitely try to be delivered to the random endpoint. +Supports [dead queue](/plugin/output/README.md#dead-queue). + ### Config params **`endpoints`** *`[]string`* *`required`* @@ -128,8 +130,7 @@ File.d will fall with non-zero exit code or skip message (see fatal_on_failed_in **`fatal_on_failed_insert`** *`bool`* *`default=false`* -After an insert error, fall with a non-zero exit code or not -**Experimental feature** +After an insert error, fall with a non-zero exit code or not. A configured deadqueue disables fatal exits.
diff --git a/plugin/output/elasticsearch/elasticsearch.go b/plugin/output/elasticsearch/elasticsearch.go index ec0229c91..27d82e9ca 100644 --- a/plugin/output/elasticsearch/elasticsearch.go +++ b/plugin/output/elasticsearch/elasticsearch.go @@ -23,6 +23,8 @@ import ( /*{ introduction It sends events into Elasticsearch. It uses `_bulk` API to send events in batches. If a network error occurs, the batch will infinitely try to be delivered to the random endpoint. + +Supports [dead queue](/plugin/output/README.md#dead-queue). }*/ const ( @@ -169,8 +171,7 @@ type Config struct { // > @3@4@5@6 // > - // > After an insert error, fall with a non-zero exit code or not - // > **Experimental feature** + // > After an insert error, fall with a non-zero exit code or not. A configured deadqueue disables fatal exits. FatalOnFailedInsert bool `json:"fatal_on_failed_insert" default:"false"` // * // > @3@4@5@6 diff --git a/plugin/output/gelf/README.md b/plugin/output/gelf/README.md index 3e5738d3c..5666789d7 100755 --- a/plugin/output/gelf/README.md +++ b/plugin/output/gelf/README.md @@ -16,6 +16,8 @@ GELF messages are separated by null byte. Each message is a JSON with the follow Every field with an underscore prefix `_` will be treated as an extra field. Allowed characters in field names are letters, numbers, underscores, dashes, and dots. +Supports [dead queue](/plugin/output/README.md#dead-queue). + ### Config params **`endpoint`** *`string`* *`required`* @@ -121,8 +123,7 @@ After this timeout the batch will be sent even if batch isn't completed. **`retry`** *`int`* *`default=0`* -Retries of insertion. If File.d cannot insert for this number of attempts, -File.d will fall with non-zero exit code or skip message (see fatal_on_failed_insert). +After an insert error, fall with a non-zero exit code or not. A configured deadqueue disables fatal exits.
diff --git a/plugin/output/gelf/gelf.go b/plugin/output/gelf/gelf.go index d16e0e314..8b83f5714 100644 --- a/plugin/output/gelf/gelf.go +++ b/plugin/output/gelf/gelf.go @@ -33,6 +33,8 @@ GELF messages are separated by null byte. Each message is a JSON with the follow Every field with an underscore prefix `_` will be treated as an extra field. Allowed characters in field names are letters, numbers, underscores, dashes, and dots. + +Supports [dead queue](/plugin/output/README.md#dead-queue). }*/ const ( @@ -151,8 +153,7 @@ type Config struct { // > @3@4@5@6 // > - // > Retries of insertion. If File.d cannot insert for this number of attempts, - // > File.d will fall with non-zero exit code or skip message (see fatal_on_failed_insert). + // > After an insert error, fall with a non-zero exit code or not. A configured deadqueue disables fatal exits. Retry int `json:"retry" default:"0"` // * // > @3@4@5@6 diff --git a/plugin/output/kafka/README.md b/plugin/output/kafka/README.md index eda1a50ec..d1ef51b20 100755 --- a/plugin/output/kafka/README.md +++ b/plugin/output/kafka/README.md @@ -1,6 +1,8 @@ # Kafka output It sends the event batches to kafka brokers using `franz-go` lib. +Supports [dead queue](/plugin/output/README.md#dead-queue). + ### Config params **`brokers`** *`[]string`* *`required`* @@ -91,8 +93,7 @@ the client.ForceMetadataRefresh() function is used for some ProduceSync errors: **`fatal_on_failed_insert`** *`bool`* *`default=false`* -After an insert error, fall with a non-zero exit code or not -**Experimental feature** +After an insert error, fall with a non-zero exit code or not. A configured deadqueue disables fatal exits.
diff --git a/plugin/output/kafka/kafka.go b/plugin/output/kafka/kafka.go index 186d73063..213ff57ec 100644 --- a/plugin/output/kafka/kafka.go +++ b/plugin/output/kafka/kafka.go @@ -18,6 +18,8 @@ import ( /*{ introduction It sends the event batches to kafka brokers using `franz-go` lib. + +Supports [dead queue](/plugin/output/README.md#dead-queue). }*/ const ( @@ -127,8 +129,7 @@ type Config struct { // > @3@4@5@6 // > - // > After an insert error, fall with a non-zero exit code or not - // > **Experimental feature** + // > After an insert error, fall with a non-zero exit code or not. A configured deadqueue disables fatal exits. FatalOnFailedInsert bool `json:"fatal_on_failed_insert" default:"false"` // * // > @3@4@5@6 diff --git a/plugin/output/loki/README.md b/plugin/output/loki/README.md index 051449dfd..1ce47cee1 100644 --- a/plugin/output/loki/README.md +++ b/plugin/output/loki/README.md @@ -1,6 +1,8 @@ # Loki output It sends the logs batches to Loki using HTTP API. +Supports [dead queue](/plugin/output/README.md#dead-queue). + ### Config params **`address`** *`string`* *`required`* @@ -137,8 +139,7 @@ File.d will fall with non-zero exit code or skip message (see fatal_on_failed_in **`fatal_on_failed_insert`** *`bool`* *`default=false`* -After an insert error, fall with a non-zero exit code or not -**Experimental feature** +After an insert error, fall with a non-zero exit code or not. A configured deadqueue disables fatal exits.
diff --git a/plugin/output/loki/loki.go b/plugin/output/loki/loki.go index 8dfb6f995..7837ad8ba 100644 --- a/plugin/output/loki/loki.go +++ b/plugin/output/loki/loki.go @@ -24,6 +24,8 @@ import ( /*{ introduction It sends the logs batches to Loki using HTTP API. + +Supports [dead queue](/plugin/output/README.md#dead-queue). }*/ var errUnixNanoFormat = errors.New("please send time in UnixNano format or add a convert_date action") @@ -170,8 +172,7 @@ type Config struct { // > @3@4@5@6 // > - // > After an insert error, fall with a non-zero exit code or not - // > **Experimental feature** + // > After an insert error, fall with a non-zero exit code or not. A configured deadqueue disables fatal exits. FatalOnFailedInsert bool `json:"fatal_on_failed_insert" default:"false"` // * // > @3@4@5@6 diff --git a/plugin/output/postgres/README.md b/plugin/output/postgres/README.md index 9bfd08881..64a46d73b 100755 --- a/plugin/output/postgres/README.md +++ b/plugin/output/postgres/README.md @@ -1,6 +1,8 @@ # Postgres output It sends the event batches to postgres db using pgx. +Supports [dead queue](/plugin/output/README.md#dead-queue). + ### Config params **`strict`** *`bool`* *`default=false`* @@ -47,8 +49,7 @@ File.d will fall with non-zero exit code or skip message (see fatal_on_failed_in **`fatal_on_failed_insert`** *`bool`* *`default=false`* -After an insert error, fall with a non-zero exit code or not -**Experimental feature** +After an insert error, fall with a non-zero exit code or not. A configured deadqueue disables fatal exits.
diff --git a/plugin/output/postgres/postgres.go b/plugin/output/postgres/postgres.go index 3d3392b59..e47b0498e 100644 --- a/plugin/output/postgres/postgres.go +++ b/plugin/output/postgres/postgres.go @@ -22,6 +22,8 @@ import ( /*{ introduction It sends the event batches to postgres db using pgx. + +Supports [dead queue](/plugin/output/README.md#dead-queue). }*/ /*{ example @@ -164,8 +166,7 @@ type Config struct { // > @3@4@5@6 // > - // > After an insert error, fall with a non-zero exit code or not - // > **Experimental feature** + // > After an insert error, fall with a non-zero exit code or not. A configured deadqueue disables fatal exits. FatalOnFailedInsert bool `json:"fatal_on_failed_insert" default:"false"` // * // > @3@4@5@6 diff --git a/plugin/output/s3/README.md b/plugin/output/s3/README.md index 5b914e248..8564e3d1b 100755 --- a/plugin/output/s3/README.md +++ b/plugin/output/s3/README.md @@ -154,7 +154,6 @@ File.d will fall with non-zero exit code or skip message (see fatal_on_failed_in **`fatal_on_failed_insert`** *`bool`* *`default=false`* After an insert error, fall with a non-zero exit code or not -**Experimental feature**
diff --git a/plugin/output/s3/s3.go b/plugin/output/s3/s3.go index e846b605d..1e9ca0f67 100644 --- a/plugin/output/s3/s3.go +++ b/plugin/output/s3/s3.go @@ -244,7 +244,6 @@ type Config struct { // > @3@4@5@6 // > // > After an insert error, fall with a non-zero exit code or not - // > **Experimental feature** FatalOnFailedInsert bool `json:"fatal_on_failed_insert" default:"false"` // * // > @3@4@5@6 diff --git a/plugin/output/splunk/README.md b/plugin/output/splunk/README.md index 78cc23bec..b9df5013f 100755 --- a/plugin/output/splunk/README.md +++ b/plugin/output/splunk/README.md @@ -47,6 +47,8 @@ Out: } ``` +Supports [dead queue](/plugin/output/README.md#dead-queue). + ### Config params **`endpoint`** *`string`* *`required`* @@ -125,8 +127,7 @@ File.d will fall with non-zero exit code or skip message (see fatal_on_failed_in **`fatal_on_failed_insert`** *`bool`* *`default=false`* -After an insert error, fall with a non-zero exit code or not -**Experimental feature** +After an insert error, fall with a non-zero exit code or not. A configured deadqueue disables fatal exits.
diff --git a/plugin/output/splunk/splunk.go b/plugin/output/splunk/splunk.go index 0b2b1f98f..57df2a09a 100644 --- a/plugin/output/splunk/splunk.go +++ b/plugin/output/splunk/splunk.go @@ -68,6 +68,8 @@ Out: } } ``` + +Supports [dead queue](/plugin/output/README.md#dead-queue). }*/ const ( @@ -179,8 +181,7 @@ type Config struct { // > @3@4@5@6 // > - // > After an insert error, fall with a non-zero exit code or not - // > **Experimental feature** + // > After an insert error, fall with a non-zero exit code or not. A configured deadqueue disables fatal exits. FatalOnFailedInsert bool `json:"fatal_on_failed_insert" default:"false"` // * // > @3@4@5@6 From 551280038cd15d034858b5fc7eb6caf0c748c6c2 Mon Sep 17 00:00:00 2001 From: Dmitry Romanov Date: Fri, 15 Aug 2025 13:01:09 +0700 Subject: [PATCH 12/19] hide isDeadQueueAvailable --- pipeline/backoff.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pipeline/backoff.go b/pipeline/backoff.go index eb8fc40da..735c42eae 100644 --- a/pipeline/backoff.go +++ b/pipeline/backoff.go @@ -11,7 +11,7 @@ type RetriableBatcher struct { outFn RetriableBatcherOutFn batcher *Batcher backoffOpts BackoffOpts - IsDeadQueueAvailable bool + isDeadQueueAvailable bool onRetryError func(err error, events []*Event) } @@ -29,7 +29,7 @@ func NewRetriableBatcher(batcherOpts *BatcherOptions, batcherOutFn RetriableBatc outFn: batcherOutFn, backoffOpts: opts, onRetryError: onError, - IsDeadQueueAvailable: opts.IsDeadQueueAvailable, + isDeadQueueAvailable: opts.IsDeadQueueAvailable, } batcherBackoff.setBatcher(batcherOpts) return batcherBackoff @@ -66,7 +66,7 @@ func (b *RetriableBatcher) Out(data *WorkerData, batch *Batch) { events = batch.events } b.onRetryError(err, events) - if batch != nil && b.IsDeadQueueAvailable { + if batch != nil && b.isDeadQueueAvailable { batch.reset() batch.status = BatchStatusInDeadQueue } From a69f8df6b34b057f78035834c4302d277f3828be Mon Sep 17 00:00:00 2001 From: Dmitry Romanov Date: Wed, 3 Sep 2025 13:31:10 +0700 Subject: [PATCH 13/19] todo in router ack --- pipeline/router.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pipeline/router.go b/pipeline/router.go index 6d36faf54..31f8c587d 100644 --- a/pipeline/router.go +++ b/pipeline/router.go @@ -23,7 +23,7 @@ func (r *Router) SetDeadQueueOutput(info *OutputPluginInfo) { } func (r *Router) Ack(event *Event) { - + // TODO: send commit to input after receiving all acks from outputs } func (r *Router) Fail(event *Event) { From 770155d9839df5b48b2137e64349bfd0c5d16cbb Mon Sep 17 00:00:00 2001 From: Dmitry Romanov Date: Wed, 3 Sep 2025 13:51:34 +0700 Subject: [PATCH 14/19] debug test e2e --- .github/workflows/ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 23fe2bf2b..854543184 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -101,7 +101,7 @@ jobs: GOFLAGS: ${{ matrix.flags }} LOG_LEVEL: error run: | - go test ./e2e -coverprofile=profile-e2e${{ matrix.flags }}.out -covermode=atomic -tags=e2e_new -timeout=3m -coverpkg=./... -json > test-report-e2e${{ matrix.flags }}.json + go test ./e2e -coverprofile=profile-e2e${{ matrix.flags }}.out -covermode=atomic -tags=e2e_new -timeout=3m -coverpkg=./... -json - name: Coverage report run: | From e67b1aa7c9cdb8be3e1e9b45f4f1000f8ba0a1d2 Mon Sep 17 00:00:00 2001 From: Dmitry Romanov Date: Wed, 3 Sep 2025 14:00:33 +0700 Subject: [PATCH 15/19] Revert "debug test e2e" This reverts commit 770155d9839df5b48b2137e64349bfd0c5d16cbb. --- .github/workflows/ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 854543184..23fe2bf2b 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -101,7 +101,7 @@ jobs: GOFLAGS: ${{ matrix.flags }} LOG_LEVEL: error run: | - go test ./e2e -coverprofile=profile-e2e${{ matrix.flags }}.out -covermode=atomic -tags=e2e_new -timeout=3m -coverpkg=./... -json + go test ./e2e -coverprofile=profile-e2e${{ matrix.flags }}.out -covermode=atomic -tags=e2e_new -timeout=3m -coverpkg=./... -json > test-report-e2e${{ matrix.flags }}.json - name: Coverage report run: | From 92714e94169b668fd96ff1088f4e51827cd69742 Mon Sep 17 00:00:00 2001 From: Dmitry Romanov Date: Wed, 3 Sep 2025 14:19:48 +0700 Subject: [PATCH 16/19] switch zookeepeer in e2e tests --- e2e/kafka_auth/docker-compose.yml | 9 ++++++--- e2e/kafka_file/docker-compose.yml | 9 ++++++--- 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/e2e/kafka_auth/docker-compose.yml b/e2e/kafka_auth/docker-compose.yml index 90edf224c..3e10b9acd 100644 --- a/e2e/kafka_auth/docker-compose.yml +++ b/e2e/kafka_auth/docker-compose.yml @@ -2,13 +2,16 @@ version: "2.1" services: zookeeper: - image: docker.io/bitnami/zookeeper:3.9 + image: zookeeper:3.9 ports: - "2182:2181" volumes: - - "zookeeper_data:/bitnami" + - "zookeeper_data:/data" environment: - - ALLOW_ANONYMOUS_LOGIN=yes + ZOO_MY_ID: 1 + ZOO_PORT: 2181 + ZOO_4LW_COMMANDS_WHITELIST: "*" + ZOO_SERVERS: server.1=zookeeper:2888:3888;2181 init-certs: image: docker.io/bitnami/kafka:3.6 command: /tmp/generate.sh diff --git a/e2e/kafka_file/docker-compose.yml b/e2e/kafka_file/docker-compose.yml index 924743dfe..927ad4bb7 100644 --- a/e2e/kafka_file/docker-compose.yml +++ b/e2e/kafka_file/docker-compose.yml @@ -2,13 +2,16 @@ version: "2" services: zookeeper: - image: docker.io/bitnami/zookeeper:3.8 + image: zookeeper:3.9 ports: - "2181:2181" volumes: - - "zookeeper_data:/bitnami" + - "zookeeper_data:/data" environment: - - ALLOW_ANONYMOUS_LOGIN=yes + ZOO_MY_ID: 1 + ZOO_PORT: 2181 + ZOO_4LW_COMMANDS_WHITELIST: "*" + ZOO_SERVERS: server.1=zookeeper:2888:3888;2181 kafka: image: docker.io/bitnami/kafka:3.1 ports: From 633ccd408d5f5ed3c8ec4dee69df49d4ec8b7dfd Mon Sep 17 00:00:00 2001 From: Dmitry Romanov Date: Wed, 3 Sep 2025 14:27:29 +0700 Subject: [PATCH 17/19] Reapply "debug test e2e" This reverts commit e67b1aa7c9cdb8be3e1e9b45f4f1000f8ba0a1d2. --- .github/workflows/ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 23fe2bf2b..854543184 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -101,7 +101,7 @@ jobs: GOFLAGS: ${{ matrix.flags }} LOG_LEVEL: error run: | - go test ./e2e -coverprofile=profile-e2e${{ matrix.flags }}.out -covermode=atomic -tags=e2e_new -timeout=3m -coverpkg=./... -json > test-report-e2e${{ matrix.flags }}.json + go test ./e2e -coverprofile=profile-e2e${{ matrix.flags }}.out -covermode=atomic -tags=e2e_new -timeout=3m -coverpkg=./... -json - name: Coverage report run: | From 325dda9d9d099b2282ddec8847ae4aeab04a748e Mon Sep 17 00:00:00 2001 From: Dmitry Romanov Date: Wed, 3 Sep 2025 14:33:06 +0700 Subject: [PATCH 18/19] Revert "Reapply "debug test e2e"" This reverts commit 633ccd408d5f5ed3c8ec4dee69df49d4ec8b7dfd. --- .github/workflows/ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 854543184..23fe2bf2b 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -101,7 +101,7 @@ jobs: GOFLAGS: ${{ matrix.flags }} LOG_LEVEL: error run: | - go test ./e2e -coverprofile=profile-e2e${{ matrix.flags }}.out -covermode=atomic -tags=e2e_new -timeout=3m -coverpkg=./... -json + go test ./e2e -coverprofile=profile-e2e${{ matrix.flags }}.out -covermode=atomic -tags=e2e_new -timeout=3m -coverpkg=./... -json > test-report-e2e${{ matrix.flags }}.json - name: Coverage report run: | From 62651426c6fcfc215fadcbab546d3da6b0a8dcda Mon Sep 17 00:00:00 2001 From: Dmitry Romanov Date: Wed, 3 Sep 2025 14:36:15 +0700 Subject: [PATCH 19/19] show output result of go test --- .github/workflows/ci.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 23fe2bf2b..f11bd69c5 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -35,7 +35,7 @@ jobs: GOFLAGS: ${{ matrix.flags }} LOG_LEVEL: error run: | - go test -coverprofile=profile${{ matrix.flags }}.out -covermode=atomic -v -coverpkg=./... ./... -json > test-report${{ matrix.flags }}.json + go test -coverprofile=profile${{ matrix.flags }}.out -covermode=atomic -v -coverpkg=./... ./... -json | tee test-report${{ matrix.flags }}.json - name: Coverage report run: | @@ -101,7 +101,7 @@ jobs: GOFLAGS: ${{ matrix.flags }} LOG_LEVEL: error run: | - go test ./e2e -coverprofile=profile-e2e${{ matrix.flags }}.out -covermode=atomic -tags=e2e_new -timeout=3m -coverpkg=./... -json > test-report-e2e${{ matrix.flags }}.json + go test ./e2e -coverprofile=profile-e2e${{ matrix.flags }}.out -covermode=atomic -tags=e2e_new -timeout=3m -coverpkg=./... -json | tee test-report-e2e${{ matrix.flags }}.json - name: Coverage report run: |