Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
deca2bd
appsec/allowlists: replace tomb refresh with context cancellation
mmetc Feb 27, 2026
9131ee2
appsec acquisition: cancel allowlist refresh from tomb lifecycle
mmetc Feb 27, 2026
58ed4cc
longpollclient: replace tomb lifecycle and handle closed PAPI stream
mmetc Feb 27, 2026
23948c1
csplugin: migrate watcher/broker lifecycle to context
mmetc Feb 27, 2026
c29f1bb
csplugin: fix lint issues and cap shutdown flush
mmetc Feb 27, 2026
b6f8b48
apiserver/papi: migrate sync lifecycle from tomb to context
mmetc Feb 27, 2026
a75a264
apiserver/apic: migrate metrics lifecycle to context
mmetc Feb 27, 2026
b3f1e38
acquisition: migrate file streaming to context
mmetc Feb 27, 2026
f94fad7
acquisition: migrate http streaming to context
mmetc Feb 27, 2026
d93ab4b
acquisition: migrate s3 lifecycle to context
mmetc Feb 27, 2026
e16beb6
cmd/crowdsec: migrate lifecycle to context
mmetc Feb 27, 2026
67d59b9
cmd/crowdsec: satisfy context lint checks
mmetc Feb 27, 2026
0a66b78
acquisition/http: extract Stream request handler
mmetc Feb 27, 2026
2eeaff8
chore: checkpoint apiserver changes
mmetc Feb 27, 2026
660b51f
fix: stabilize reload shutdown logging
mmetc Feb 27, 2026
525fd10
fix(test): resolve appsec test failures from wait-for timeout and dae…
mmetc Feb 28, 2026
b5888b4
fix(kafka): return ctx.Err() instead of nil on context cancellation
mmetc Mar 1, 2026
e4d3401
fix(kubernetesaudit): suppress contextcheck for Shutdown
mmetc Mar 1, 2026
92f75f7
fix(loki): fix containedctx and errorlint issues
mmetc Mar 1, 2026
298542e
fix(victorialogs): fix containedctx, bodyclose, and errcheck issues
mmetc Mar 1, 2026
8408f93
refactor(wineventlog): update acquisition interface
mmetc Mar 1, 2026
5ddfce0
fix: handle context cancellation gracefully in kafka and update winev…
mmetc Mar 1, 2026
54c8ff5
kafka: nolint nilerr
mmetc Mar 2, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 29 additions & 28 deletions cmd/crowdsec-cli/clinotifications/notifications.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"encoding/csv"
"encoding/json"
"errors"
"fmt"
"io/fs"
"net/url"
Expand All @@ -19,7 +18,6 @@ import (
"github.com/go-openapi/strfmt"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"gopkg.in/tomb.v2"
"gopkg.in/yaml.v3"

"github.com/crowdsecurity/go-cs-lib/ptr"
Expand Down Expand Up @@ -275,7 +273,6 @@ func (cli *cliNotifications) notificationConfigFilter(_ *cobra.Command, args []s
func (cli *cliNotifications) newTestCmd() *cobra.Command {
var (
pluginBroker csplugin.PluginBroker
pluginTomb tomb.Tomb
alertOverride string
)

Expand Down Expand Up @@ -315,11 +312,19 @@ func (cli *cliNotifications) newTestCmd() *cobra.Command {
},
}, cfg.ConfigPaths)
},
RunE: func(_ *cobra.Command, _ []string) error {
pluginTomb.Go(func() error {
pluginBroker.Run(&pluginTomb)
return nil
})
RunE: func(cmd *cobra.Command, _ []string) error {
pluginCtx, cancelPlugin := context.WithCancel(cmd.Context())
pluginDone := make(chan struct{})

go func() {
pluginBroker.Run(pluginCtx)
close(pluginDone)
}()
defer func() {
cancelPlugin()
<-pluginDone
}()

alert := &models.Alert{
Capacity: ptr.Of(int32(0)),
Decisions: []*models.Decision{{
Expand Down Expand Up @@ -360,16 +365,12 @@ func (cli *cliNotifications) newTestCmd() *cobra.Command {
Alert: alert,
}

// time.Sleep(2 * time.Second) // There's no mechanism to ensure notification has been sent
pluginTomb.Kill(errors.New("terminating"))
_ = pluginTomb.Wait()

return nil
},
}
cmd.Flags().StringVarP(&alertOverride, "alert", "a", "",
"JSON string used to override alert fields in the generic alert " +
"(see crowdsec/pkg/models/alert.go in the source tree for the full definition of the object)")
"JSON string used to override alert fields in the generic alert "+
"(see crowdsec/pkg/models/alert.go in the source tree for the full definition of the object)")

return cmd
}
Expand Down Expand Up @@ -401,10 +402,7 @@ cscli notifications reinject <alert_id> -a '{"remediation": true,"scenario":"not
return nil
},
RunE: func(cmd *cobra.Command, _ []string) error {
var (
pluginBroker csplugin.PluginBroker
pluginTomb tomb.Tomb
)
var pluginBroker csplugin.PluginBroker

ctx := cmd.Context()
cfg := cli.cfg()
Expand Down Expand Up @@ -442,10 +440,17 @@ cscli notifications reinject <alert_id> -a '{"remediation": true,"scenario":"not
return fmt.Errorf("can't initialize plugins: %w", err)
}

pluginTomb.Go(func() error {
pluginBroker.Run(&pluginTomb)
return nil
})
pluginCtx, cancelPlugin := context.WithCancel(ctx)
pluginDone := make(chan struct{})

go func() {
pluginBroker.Run(pluginCtx)
close(pluginDone)
}()
defer func() {
cancelPlugin()
<-pluginDone
}()

profiles, err := csprofiles.NewProfile(cfg.API.Server.Profiles)
if err != nil {
Expand Down Expand Up @@ -481,16 +486,12 @@ cscli notifications reinject <alert_id> -a '{"remediation": true,"scenario":"not
break
}
}
// time.Sleep(2 * time.Second) // There's no mechanism to ensure notification has been sent
pluginTomb.Kill(errors.New("terminating"))
_ = pluginTomb.Wait()

return nil
},
}
cmd.Flags().StringVarP(&alertOverride, "alert", "a", "",
"JSON string used to override alert fields in the reinjected alert " +
"(see crowdsec/pkg/models/alert.go in the source tree for the full definition of the object)")
"JSON string used to override alert fields in the reinjected alert "+
"(see crowdsec/pkg/models/alert.go in the source tree for the full definition of the object)")

return cmd
}
Expand Down
72 changes: 57 additions & 15 deletions cmd/crowdsec/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"context"
"fmt"
"net"
"time"

log "github.com/sirupsen/logrus"
Expand All @@ -20,6 +21,15 @@ func initAPIServer(ctx context.Context, cConfig *csconfig.Config) (*apiserver.AP
log.Info("push and pull to Central API disabled")
}

if cConfig.API.Server.ListenURI != "" {
listenConfig := &net.ListenConfig{}
listener, err := listenConfig.Listen(ctx, "tcp", cConfig.API.Server.ListenURI)
if err != nil {
return nil, fmt.Errorf("local API server stopped with error: listening on %s: %w", cConfig.API.Server.ListenURI, err)
}
_ = listener.Close()
}

accessLogger := cConfig.API.Server.NewAccessLogger(cConfig.Common.LogConfig, accessLogFilename)

apiServer, err := apiserver.NewServer(ctx, cConfig.API.Server, accessLogger)
Expand All @@ -42,30 +52,62 @@ func initAPIServer(ctx context.Context, cConfig *csconfig.Config) (*apiserver.AP

func serveAPIServer(ctx context.Context, apiServer *apiserver.APIServer) {
apiReady := make(chan bool, 1)
runCtx, cancel := context.WithCancel(ctx)
apiCancel = cancel
apiDone = make(chan error, 1)

apiTomb.Go(func() error {
go func() {
defer trace.ReportPanic()

pluginCtx, cancelPlugin := context.WithCancel(runCtx)
pluginDone := make(chan struct{})

go func() {
defer trace.ReportPanic()
defer close(pluginDone)
pluginBroker.Run(pluginCtx)
}()

runErr := make(chan error, 1)

go func() {
defer trace.ReportPanic()

log.Debugf("serving API after %s ms", time.Since(crowdsecT0))

if err := apiServer.Run(ctx, apiReady); err != nil {
log.Fatal(err)
}
runErr <- apiServer.Run(runCtx, apiReady)
}()

pluginTomb.Go(func() error {
pluginBroker.Run(&pluginTomb)
return nil
})

<-apiTomb.Dying() // lock until go routine is dying
pluginTomb.Kill(nil)
log.Infof("serve: shutting down api server")
select {
case err := <-runErr:
if err != nil && runCtx.Err() == nil {
log.Fatal(err)
}
cancelPlugin()
<-pluginDone
apiDone <- err
case <-runCtx.Done():
log.Infof("serve: shutting down api server")

shutdownCtx, cancelShutdown := context.WithTimeout(context.WithoutCancel(runCtx), 5*time.Second)
err := apiServer.Shutdown(shutdownCtx)
cancelShutdown()

if runErrValue := <-runErr; runErrValue != nil && err == nil {
err = runErrValue
}

return apiServer.Shutdown(ctx)
})
<-apiReady
cancelPlugin()
<-pluginDone
apiDone <- err
}
}()

select {
case <-apiReady:
case err := <-apiDone:
if err != nil {
log.Fatal(err)
}
}
}
39 changes: 19 additions & 20 deletions cmd/crowdsec/crowdsec.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,38 +185,37 @@ func serveCrowdsec(
sd *StateDumper,
) {
cctx, cancel := context.WithCancel(ctx)
crowdCancel = cancel
crowdDone = make(chan error, 1)

var g errgroup.Group

bucketStore := leakybucket.NewBucketStore()

crowdsecTomb.Go(func() error {
go func() {
defer trace.ReportPanic()
// this logs every time, even at config reload
log.Debugf("running agent after %s ms", time.Since(crowdsecT0))
agentReady <- true

go func() {
defer trace.ReportPanic()
// this logs every time, even at config reload
log.Debugf("running agent after %s ms", time.Since(crowdsecT0))

agentReady <- true

if err := runCrowdsec(cctx, &g, cConfig, parsers, hub, datasources, sd, bucketStore); err != nil {
log.Fatalf("unable to start crowdsec routines: %s", err)
}
}()
if err := runCrowdsec(cctx, &g, cConfig, parsers, hub, datasources, sd, bucketStore); err != nil {
crowdDone <- fmt.Errorf("unable to start crowdsec routines: %w", err)
return
}

/* we should stop in two cases :
- crowdsecTomb has been Killed() : it might be shutdown or reload, so stop
- context has been canceled: it might be shutdown or reload, so stop
- acquisTomb is dead, it means that we were in "cat" mode and files are done reading, quit
*/
waitOnTomb()
waitOnCrowdsecStop(cctx)
log.Debugf("Shutting down crowdsec routines")

if err := ShutdownCrowdsecRoutines(cancel, &g, datasources); err != nil {
return fmt.Errorf("unable to shutdown crowdsec routines: %w", err)
crowdDone <- fmt.Errorf("unable to shutdown crowdsec routines: %w", err)
return
}

log.Debugf("everything is dead, return crowdsecTomb")
log.Debugf("everything is dead, return crowdsec routine")
log.Debugf("sd.DumpDir == %s", sd.DumpDir)

if sd.DumpDir != "" {
Expand All @@ -229,11 +228,11 @@ func serveCrowdsec(
os.Exit(0)
}

return nil
})
crowdDone <- nil
}()
}

func waitOnTomb() {
func waitOnCrowdsecStop(ctx context.Context) {
for {
select {
case <-acquisTomb.Dead():
Expand All @@ -256,7 +255,7 @@ func waitOnTomb() {

return

case <-crowdsecTomb.Dying():
case <-ctx.Done():
log.Infof("Crowdsec engine shutting down")
return
}
Expand Down
13 changes: 7 additions & 6 deletions cmd/crowdsec/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,13 @@ import (
)

var (
// tombs for the parser, buckets and outputs.
acquisTomb tomb.Tomb
outputsTomb tomb.Tomb
apiTomb tomb.Tomb
crowdsecTomb tomb.Tomb
pluginTomb tomb.Tomb
// tombs for acquisition and output routines.
acquisTomb tomb.Tomb
outputsTomb tomb.Tomb
apiCancel context.CancelFunc
apiDone chan error
crowdCancel context.CancelFunc
crowdDone chan error

flags Flags

Expand Down
Loading
Loading