diff --git a/go.mod b/go.mod index 6bf9204f4935..e3ed1c729757 100644 --- a/go.mod +++ b/go.mod @@ -406,6 +406,9 @@ replace ( github.com/dgraph-io/ristretto => github.com/elastic/ristretto v0.1.1-0.20220602190459-83b0895ca5b3 // Removes glog dependency. See https://github.com/elastic/beats/issues/31810. github.com/dop251/goja => github.com/andrewkroh/goja v0.0.0-20190128172624-dd2ac4456e20 github.com/dop251/goja_nodejs => github.com/dop251/goja_nodejs v0.0.0-20171011081505-adff31b136e6 + + // this is temporary until the elastic-agent-client changes are merged and released + github.com/elastic/elastic-agent-client/v7 => github.com/pchila/elastic-agent-client/v7 v7.0.0-20240712140251-3179341caf2e github.com/fsnotify/fsevents => github.com/elastic/fsevents v0.0.0-20181029231046-e1d381a4d270 github.com/fsnotify/fsnotify => github.com/adriansr/fsnotify v1.4.8-0.20211018144411-a81f2b630e7c github.com/godror/godror => github.com/godror/godror v0.33.2 // updating to v0.24.2 caused a breaking change diff --git a/go.sum b/go.sum index 12ad7d6a691c..3270189526e8 100644 --- a/go.sum +++ b/go.sum @@ -1410,6 +1410,12 @@ github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144T github.com/paulbellamy/ratecounter v0.2.0/go.mod h1:Hfx1hDpSGoqxkVVpBi/IlYD7kChlfo5C6hzIHwPqfFE= github.com/pborman/getopt v0.0.0-20170112200414-7148bc3a4c30/go.mod h1:85jBQOZwpVEaDAr341tbn15RS4fCAsIst0qp7i8ex1o= github.com/pborman/uuid v1.2.0/go.mod h1:X/NO0urCmaxf9VXbdlT7C2Yzkj2IKimNn4k+gtPdI/k= +github.com/pchila/elastic-agent-client/v7 v7.0.0-20240628124107-987e33ca9e14 h1:s9OJHxfrHxOFBkQrKQAKOE4cYMKMO6s3UxZUxWk+utA= +github.com/pchila/elastic-agent-client/v7 v7.0.0-20240628124107-987e33ca9e14/go.mod h1:h2yJHN8Q5rhfi9i6FfyPufh+StFN+UD9PYGv8blXKbE= +github.com/pchila/elastic-agent-client/v7 v7.0.0-20240703134948-b26a8ccf196e h1:l3tZOaQGHhG6s6mmciPzr2ZLVHks4/na7VJj6nUxifc= +github.com/pchila/elastic-agent-client/v7 v7.0.0-20240703134948-b26a8ccf196e/go.mod h1:h2yJHN8Q5rhfi9i6FfyPufh+StFN+UD9PYGv8blXKbE= +github.com/pchila/elastic-agent-client/v7 v7.0.0-20240712140251-3179341caf2e h1:c+moxj4HJUXmcVYb/YXk0ei48SM4v8Hc8QeHd5A5/Eo= +github.com/pchila/elastic-agent-client/v7 v7.0.0-20240712140251-3179341caf2e/go.mod h1:h2yJHN8Q5rhfi9i6FfyPufh+StFN+UD9PYGv8blXKbE= github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic= github.com/pelletier/go-toml v1.4.0/go.mod h1:PN7xzY2wHTK0K9p34ErDQMlFxa51Fk0OUruD3k1mMwo= github.com/pelletier/go-toml v1.7.0/go.mod h1:vwGMzjaWMwyfHwgIBhI2YUM4fB6nL6lVAvS1LBMMhTE= diff --git a/libbeat/cmd/instance/beat.go b/libbeat/cmd/instance/beat.go index c15d9b8c2006..dc16bbb74205 100644 --- a/libbeat/cmd/instance/beat.go +++ b/libbeat/cmd/instance/beat.go @@ -97,8 +97,9 @@ type Beat struct { RawConfig *config.C // Raw config that can be unpacked to get Beat specific config data. IdxSupporter idxmgmt.Supporter - keystore keystore.Keystore - processors processing.Supporter + keystore keystore.Keystore + processors processing.Supporter + processingFactory processing.SupportFactory InputQueueSize int // Size of the producer queue used by most queues. @@ -377,7 +378,6 @@ func (b *Beat) createBeater(bt beat.Creator) (beat.Beater, error) { } } - var publisher *pipeline.Pipeline monitors := pipeline.Monitors{ Metrics: reg, Telemetry: monitoring.GetNamespace("state").GetRegistry(), @@ -389,18 +389,39 @@ func (b *Beat) createBeater(bt beat.Creator) (beat.Beater, error) { Processors: b.processors, InputQueueSize: b.InputQueueSize, } - publisher, err = pipeline.LoadWithSettings(b.Info, monitors, b.Config.Pipeline, outputFactory, settings) - if err != nil { - return nil, fmt.Errorf("error initializing publisher: %w", err) - } - reload.RegisterV2.MustRegisterOutput(b.makeOutputReloader(publisher.OutputReloader())) + if b.Manager.Enabled() { + if !outputEnabled { + // Create a pipeline builder and don't set the pipeline until we are ready to set the output + pipelineBuilder := pipeline.NewPipelineBuilder(b.Info, monitors, b.Config.Pipeline, outputFactory, settings, b.processingFactory) + b.Publisher = pipelineBuilder + reload.RegisterV2.MustRegisterGlobalProcessors(pipelineBuilder.GlobalProcessorsReloader()) + reload.RegisterV2.MustRegisterOutput(b.makeOutputReloader(pipelineBuilder.OutputReloader())) + } else { + // don't set the builder, the output is already there. + // TODO Log a warning saying that reload won't work properly + + publisher, err := pipeline.LoadWithSettings(b.Info, monitors, b.Config.Pipeline, outputFactory, settings) + if err != nil { + return nil, fmt.Errorf("error initializing publisher: %w", err) + } + b.Publisher = publisher + reload.RegisterV2.MustRegisterOutput(b.makeOutputReloader(publisher.OutputReloader())) + } + } else { + // normal running without agent + publisher, err := pipeline.LoadWithSettings(b.Info, monitors, b.Config.Pipeline, outputFactory, settings) + if err != nil { + return nil, fmt.Errorf("error initializing publisher: %w", err) + } + b.Publisher = publisher + reload.RegisterV2.MustRegisterOutput(b.makeOutputReloader(publisher.OutputReloader())) + } // TODO: some beats race on shutdown with publisher.Stop -> do not call Stop yet, // but refine publisher to disconnect clients on stop automatically // defer pipeline.Close() - b.Publisher = publisher beater, err := bt(&b.Beat, sub) if err != nil { return nil, err @@ -898,6 +919,7 @@ func (b *Beat) configure(settings Settings) error { if processingFactory == nil { processingFactory = processing.MakeDefaultBeatSupport(true) } + b.processingFactory = processingFactory b.processors, err = processingFactory(b.Info, logp.L().Named("processors"), b.RawConfig) b.Manager.RegisterDiagnosticHook("global processors", "a list of currently configured global beat processors", diff --git a/libbeat/common/reload/reload.go b/libbeat/common/reload/reload.go index d029f473f17c..b9bec1504061 100644 --- a/libbeat/common/reload/reload.go +++ b/libbeat/common/reload/reload.go @@ -38,6 +38,9 @@ const OutputRegName = "output" // APMRegName is the registation name for APM tracing. const APMRegName = "apm" +// GlobalProcessorRegName is the registration name for global processors config +const GlobalProcessorRegName = "global_processors" + // ConfigWithMeta holds a pair of config.C and optional metadata for it type ConfigWithMeta struct { // Config to store @@ -157,6 +160,14 @@ func (r *Registry) MustRegisterAPM(list Reloadable) { } } +// MustRegisterGlobalProcessors is a V2-specific registration function +// that declares a reloadable global processor configuration +func (r *Registry) MustRegisterGlobalProcessors(obj Reloadable) { + if err := r.Register(GlobalProcessorRegName, obj); err != nil { + panic(err) + } +} + // GetInputList is a V2-specific function // That returns the reloadable list created for an input func (r *Registry) GetInputList() ReloadableList { @@ -181,6 +192,14 @@ func (r *Registry) GetReloadableAPM() Reloadable { return r.confs[APMRegName] } +// GetReloadableGlobalProcessors is a V2-specific function +// That returns the reloader for the registered global processor reloadable object +func (r *Registry) GetReloadableGlobalProcessors() Reloadable { + r.RLock() + defer r.RUnlock() + return r.confs[GlobalProcessorRegName] +} + // GetRegisteredNames returns the list of names registered func (r *Registry) GetRegisteredNames() []string { r.RLock() diff --git a/libbeat/publisher/pipeline/builder.go b/libbeat/publisher/pipeline/builder.go new file mode 100644 index 000000000000..28a206b5e63d --- /dev/null +++ b/libbeat/publisher/pipeline/builder.go @@ -0,0 +1,158 @@ +package pipeline + +import ( + "errors" + "fmt" + "sync" + + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/common/reload" + "github.com/elastic/beats/v7/libbeat/outputs" + "github.com/elastic/beats/v7/libbeat/publisher/processing" + conf "github.com/elastic/elastic-agent-libs/config" + "github.com/elastic/elastic-agent-libs/logp" +) + +var ErrNoReloadPipelineAlreadyBuilt = errors.New("cannot reload as Pipeline has been already built") + +// Builder is an implementation of Builder pattern for building a Pipeline +type Builder struct { + // Pipeline parameters + beatInfo beat.Info + monitors Monitors + config Config + makeOutput outputFactory + settings Settings + + // Attributes for lazy loading of the pipeline + buildmx sync.Mutex + pipelineBuilt bool + pipeline *Pipeline + pipelineErr error + + // Reloaders + outputReloader *outputReloader + globalProcessorReloader *globalProcessorReloader + + processingFactory processing.SupportFactory +} + +func NewPipelineBuilder(beatInfo beat.Info, monitors Monitors, config Config, outFactory outputFactory, settings Settings, processorsFactory processing.SupportFactory) *Builder { + b := &Builder{ + beatInfo: beatInfo, + monitors: monitors, + config: config, + makeOutput: outFactory, + settings: settings, + processingFactory: processorsFactory, + } + + b.outputReloader = &outputReloader{b} + b.globalProcessorReloader = &globalProcessorReloader{b: b} + + return b +} + +func (b *Builder) WithGlobalProcessors(processors processing.Supporter) error { + b.settings.Processors = processors + return nil +} + +// build will materialize the Pipeline only once, after Pipeline has been built this is a no-op +func (b *Builder) build() { + b.buildmx.Lock() + defer b.buildmx.Unlock() + log := b.monitors.Logger + + if b.pipelineBuilt { + log.Debug("Pipeline already built, skipping..") + return + } + + log.Info("Creating Pipeline...") + p, err := LoadWithSettings(b.beatInfo, b.monitors, b.config, b.makeOutput, b.settings) + b.pipelineBuilt = true + + if err != nil { + log.Errorf("Error creating pipeline: %s", err) + b.pipelineErr = fmt.Errorf("instantiating Pipeline: %w", err) + return + } + b.pipeline = p + log.Info("Pipeline created successfully") +} + +func (b *Builder) ConnectWith(config beat.ClientConfig) (beat.Client, error) { + b.build() + if b.pipelineErr != nil { + return nil, b.pipelineErr + } + + return b.pipeline.ConnectWith(config) +} + +func (b *Builder) Connect() (beat.Client, error) { + b.build() + if b.pipelineErr != nil { + return nil, b.pipelineErr + } + + return b.pipeline.Connect() +} + +func (b *Builder) OutputReloader() OutputReloader { + return b.outputReloader +} + +func (b *Builder) GlobalProcessorsReloader() *globalProcessorReloader { + return b.globalProcessorReloader +} + +type globalProcessorReloader struct { + b *Builder +} + +func (gpr *globalProcessorReloader) Reload(config *reload.ConfigWithMeta) error { + builder := gpr.b + builder.buildmx.Lock() + defer builder.buildmx.Unlock() + + builder.monitors.Logger.Debugf("Reloading global processor with %s", config.Config) + + if builder.pipelineBuilt { + // Too late as the pipeline is built already. We need to restart + builder.monitors.Logger.Debug("Pipeline already instantiated. Returning ErrNoReloadPipelineAlreadyBuilt") + return ErrNoReloadPipelineAlreadyBuilt + } + + newProcessors, err := gpr.createProcessors(config.Config) + if err != nil { + return fmt.Errorf("creating new processors with config %s : %w", config.Config, err) + } + builder.WithGlobalProcessors(newProcessors) + builder.monitors.Logger.Debugf("Reloading global processor complete", config.Config) + return nil +} + +func (gpr *globalProcessorReloader) createProcessors(rawProcessorConfig *conf.C) (processing.Supporter, error) { + processingFactory := gpr.b.processingFactory + if processingFactory == nil { + processingFactory = processing.MakeDefaultBeatSupport(true) + } + return processingFactory(gpr.b.beatInfo, logp.L().Named("processors"), rawProcessorConfig) +} + +type outputReloader struct { + b *Builder +} + +func (or *outputReloader) Reload( + cfg *reload.ConfigWithMeta, + factory func(outputs.Observer, conf.Namespace) (outputs.Group, error), +) error { + or.b.build() // create the pipeline if needed + if or.b.pipelineErr != nil { + return or.b.pipelineErr + } + return or.b.pipeline.OutputReloader().Reload(cfg, factory) +} diff --git a/libbeat/publisher/pipeline/client.go b/libbeat/publisher/pipeline/client.go index af756213a632..3d1d613fdd5c 100644 --- a/libbeat/publisher/pipeline/client.go +++ b/libbeat/publisher/pipeline/client.go @@ -29,6 +29,10 @@ import ( "github.com/elastic/elastic-agent-libs/logp" ) +type clientUnregisterer interface { + UnregisterClient(*client) +} + // client connects a beat with the processors and pipeline queue. type client struct { logger *logp.Logger diff --git a/libbeat/publisher/pipeline/pipeline.go b/libbeat/publisher/pipeline/pipeline.go index a5a13a0584ea..ed38415186fe 100644 --- a/libbeat/publisher/pipeline/pipeline.go +++ b/libbeat/publisher/pipeline/pipeline.go @@ -110,7 +110,7 @@ type OutputReloader interface { // The new pipeline will take ownership of queue and outputs. On Close, the // queue and outputs will be closed. func New( - beat beat.Info, + beatInfo beat.Info, monitors Monitors, userQueueConfig conf.Namespace, out outputs.Group, @@ -121,7 +121,7 @@ func New( } p := &Pipeline{ - beatInfo: beat, + beatInfo: beatInfo, monitors: monitors, observer: nilObserver, waitCloseTimeout: settings.WaitClose, @@ -147,7 +147,7 @@ func New( return nil, err } - output, err := newOutputController(beat, monitors, p.observer, queueFactory, settings.InputQueueSize) + output, err := newOutputController(beatInfo, monitors, p.observer, queueFactory, settings.InputQueueSize) if err != nil { return nil, err } @@ -209,7 +209,7 @@ func (p *Pipeline) ConnectWith(cfg beat.ClientConfig) (beat.Client, error) { return nil, err } - client := &client{ + clt := &client{ logger: p.monitors.Logger, isOpen: atomic.MakeBool(true), clientListener: cfg.ClientListener, @@ -233,7 +233,7 @@ func (p *Pipeline) ConnectWith(cfg beat.ClientConfig) (beat.Client, error) { producerCfg := queue.ProducerConfig{ ACK: func(count int) { - client.observer.eventsACKed(count) + clt.observer.eventsACKed(count) if ackHandler != nil { ackHandler.ACKEvents(count) } @@ -244,17 +244,18 @@ func (p *Pipeline) ConnectWith(cfg beat.ClientConfig) (beat.Client, error) { ackHandler = acker.Nil() } - client.eventListener = ackHandler - client.waiter = waiter - client.producer = p.outputController.queueProducer(producerCfg) - if client.producer == nil { + clt.eventListener = ackHandler + clt.waiter = waiter + clt.producer = p.outputController.queueProducer(producerCfg) + if clt.producer == nil { // This can only happen if the pipeline was shut down while clients // were still waiting to connect. return nil, fmt.Errorf("client failed to connect because the pipeline is shutting down") } p.observer.clientConnected() - return client, nil + + return clt, nil } func (p *Pipeline) createEventProcessing(cfg beat.ProcessingConfig, noPublish bool) (beat.Processor, error) { diff --git a/x-pack/libbeat/management/managerV2.go b/x-pack/libbeat/management/managerV2.go index e39b394bf2b3..e8440809af8f 100644 --- a/x-pack/libbeat/management/managerV2.go +++ b/x-pack/libbeat/management/managerV2.go @@ -14,6 +14,7 @@ import ( "syscall" "time" + "github.com/elastic/beats/v7/libbeat/publisher/pipeline" "github.com/joeshaw/multierror" "go.uber.org/zap/zapcore" "google.golang.org/grpc" @@ -105,6 +106,9 @@ type BeatV2Manager struct { // set with the last applied APM config lastAPMCfg *proto.APMConfig + // set with the last applied global processors config + lastGlobalProcessorsConfig *proto.GlobalProcessorsConfig + // used for the debug callback to report as-running config lastBeatOutputCfg *reload.ConfigWithMeta lastBeatInputCfgs []*reload.ConfigWithMeta @@ -183,7 +187,7 @@ func NewV2AgentManager(config *conf.C, registry *reload.Registry) (lbmanagement. client.WithGRPCDialOptions(grpc.WithTransportCredentials(insecure.NewCredentials()))) } else { // Normal Elastic-Agent-Client initialisation - agentClient, _, err = client.NewV2FromReader(os.Stdin, versionInfo) + agentClient, _, err = client.NewV2FromReader(os.Stdin, versionInfo, client.WithEmitComponentChanges(true)) if err != nil { return nil, fmt.Errorf("error reading control config from agent: %w", err) } @@ -502,6 +506,12 @@ func (cm *BeatV2Manager) unitListen() { cm.isRunning = false cm.UpdateStatus(status.Stopping, "Stopping") return + case change := <-cm.client.ComponentChanges(): + cm.logger.Debugw("received component change event", "event", change) + err := cm.reloadGlobalProcessors(change) + if err != nil { + cm.logger.Errorw("Error reloading global processors", "error", err) + } case change := <-cm.client.UnitChanges(): cm.logger.Infof( "BeatV2Manager.unitListen UnitChanged.ID(%s), UnitChanged.Type(%s), UnitChanged.Trigger(%d): %s/%s", @@ -996,6 +1006,50 @@ func (cm *BeatV2Manager) handleDebugYaml() []byte { return data } +func (cm *BeatV2Manager) reloadGlobalProcessors(change client.Component) error { + cm.logger.Debug("Reloading global processors config") + processors := cm.registry.GetReloadableGlobalProcessors() + if processors == nil { + return fmt.Errorf("reloading global processors: no global processors reloadable registered") + } + + if change.Config == nil { + cm.logger.Debug("Component changes contain a nil config, skipping global processors reload") + return nil + } + + if gproto.Equal(cm.lastGlobalProcessorsConfig, change.Config.Processors) { + cm.logger.Debug("Global processor config is the same as the last applied, skipping reload") + return nil + } + + var newProcessorConfig *conf.C + if change.Config.Processors != nil { + newConf, err := conf.NewConfigFrom(change.Config.Processors) + if err != nil { + return fmt.Errorf("creating new global processor config: %w", err) + } + newProcessorConfig = newConf + } + + err := processors.Reload(&reload.ConfigWithMeta{Config: newProcessorConfig}) + + if errors.Is(err, pipeline.ErrNoReloadPipelineAlreadyBuilt) { + // Pipeline is already instantiated, need to restart + cm.logger.Info("beat is restarting because global processor configuration changed") + cm.Stop() + return nil + } + + if err != nil { + return fmt.Errorf("reloading global processor config: %w", err) + } + + cm.lastGlobalProcessorsConfig = change.Config.Processors + cm.logger.Debug("Global processors config reloaded") + return nil +} + func getZapcoreLevel(ll client.UnitLogLevel) (zapcore.Level, bool) { switch ll { case client.UnitLogLevelError: