From 2d020560e5bfb05503dc52863532070d9986dcfa Mon Sep 17 00:00:00 2001 From: Chmouel Boudjnah Date: Wed, 18 Mar 2026 17:31:13 +0100 Subject: [PATCH] fix(adapter): eliminate data race on shared event field The listener struct had a shared `event *info.Event` field that was written to by concurrent HTTP request handlers. When two requests arrived simultaneously (e.g. a push event and an incoming webhook), both would write to l.event, causing the push handler to read the incoming handler's event data. This resulted in the push event being processed as event_type=incoming, matching the wrong PipelineRun triggers and creating duplicate PipelineRuns. Remove `event` from the listener struct and make it a local variable per request in handleEvent. Pass it as a parameter to detectIncoming and processIncoming so each concurrent request operates on its own isolated event object. Signed-off-by: Chmouel Boudjnah Assisted-by: Claude Opus 4.6 (via Claude Code) --- pkg/adapter/adapter.go | 13 ++++++------- pkg/adapter/incoming.go | 36 ++++++++++++++++++------------------ pkg/adapter/incoming_test.go | 23 ++++++++++++----------- 3 files changed, 36 insertions(+), 36 deletions(-) diff --git a/pkg/adapter/adapter.go b/pkg/adapter/adapter.go index 0f1724e677..5d65a6bed9 100644 --- a/pkg/adapter/adapter.go +++ b/pkg/adapter/adapter.go @@ -53,7 +53,6 @@ type listener struct { run *params.Run kint kubeinteraction.Interface logger *zap.SugaredLogger - event *info.Event } type Response struct { @@ -131,9 +130,9 @@ func (l listener) handleEvent(ctx context.Context) http.HandlerFunc { return } - var event map[string]any + var eventBody map[string]any if string(payload) != "" { - if err := json.Unmarshal(payload, &event); err != nil { + if err := json.Unmarshal(payload, &eventBody); err != nil { l.logger.Errorf("Invalid event body format format: %s", err) response.WriteHeader(http.StatusBadRequest) return @@ -143,7 +142,7 @@ func (l listener) handleEvent(ctx context.Context) http.HandlerFunc { var gitProvider provider.Interface var logger *zap.SugaredLogger - l.event = info.NewEvent() + event := info.NewEvent() pacInfo := l.run.Info.GetPacOpts() globalRepo, err := l.run.Clients.PipelineAsCode.PipelinesascodeV1alpha1().Repositories(l.run.Info.Kube.Namespace).Get( @@ -170,7 +169,7 @@ func (l listener) handleEvent(ctx context.Context) http.HandlerFunc { return } - isIncoming, targettedRepo, err := l.detectIncoming(ctx, request, payload) + isIncoming, targettedRepo, err := l.detectIncoming(ctx, event, request, payload) if err != nil { if errors.Is(err, errMissingFields) { l.writeResponse(response, http.StatusBadRequest, err.Error()) @@ -180,7 +179,7 @@ func (l listener) handleEvent(ctx context.Context) http.HandlerFunc { } if isIncoming { - gitProvider, logger, err = l.processIncoming(targettedRepo) + gitProvider, logger, err = l.processIncoming(event, targettedRepo) } else { gitProvider, logger, err = l.detectProvider(request, string(payload)) } @@ -196,7 +195,7 @@ func (l listener) handleEvent(ctx context.Context) http.HandlerFunc { run: l.run, vcx: gitProvider, kint: l.kint, - event: l.event, + event: event, logger: logger, payload: payload, pacInfo: &pacInfo, diff --git a/pkg/adapter/incoming.go b/pkg/adapter/incoming.go index 31d1a9e17c..86c29e6515 100644 --- a/pkg/adapter/incoming.go +++ b/pkg/adapter/incoming.go @@ -112,7 +112,7 @@ func applyIncomingParams(req *http.Request, payloadBody []byte, params []string) // detectIncoming checks if the request is for an "incoming" webhook request. // If the request is for an "incoming" webhook request the request is parsed and matched to the expected // repository. -func (l *listener) detectIncoming(ctx context.Context, req *http.Request, payloadBody []byte) (bool, *v1alpha1.Repository, error) { +func (l *listener) detectIncoming(ctx context.Context, event *info.Event, req *http.Request, payloadBody []byte) (bool, *v1alpha1.Repository, error) { if req.URL.Path != "/incoming" { return false, nil, nil } @@ -182,45 +182,45 @@ func (l *listener) detectIncoming(ctx context.Context, req *http.Request, payloa if err != nil { return false, nil, err } - l.event.Provider.URL = enterpriseURL - l.event.Provider.Token = token - l.event.InstallationID = installationID + event.Provider.URL = enterpriseURL + event.Provider.Token = token + event.InstallationID = installationID // Github app is not installed for provided repository url - if l.event.InstallationID == 0 { + if event.InstallationID == 0 { return false, nil, fmt.Errorf("GithubApp is not installed for the provided repository url %s ", repo.Spec.URL) } } // make sure accepted is json if string(payloadBody) != "" { - if l.event.Event, err = applyIncomingParams(req, payloadBody, hook.Params); err != nil { + if event.Event, err = applyIncomingParams(req, payloadBody, hook.Params); err != nil { return false, nil, err } } // TODO: more than i think about it and more i think triggertarget should be // eventType and vice versa, but keeping as is for now. - l.event.EventType = "incoming" - l.event.TriggerTarget = "push" - l.event.TargetPipelineRun = payload.PipelineRun - l.event.HeadBranch = payload.Branch - l.event.BaseBranch = payload.Branch - l.event.Request.Header = req.Header - l.event.Request.Payload = payloadBody - l.event.URL = repo.Spec.URL - l.event.Sender = "incoming" + event.EventType = "incoming" + event.TriggerTarget = "push" + event.TargetPipelineRun = payload.PipelineRun + event.HeadBranch = payload.Branch + event.BaseBranch = payload.Branch + event.Request.Header = req.Header + event.Request.Payload = payloadBody + event.URL = repo.Spec.URL + event.Sender = "incoming" return true, repo, err } -func (l *listener) processIncoming(targetRepo *v1alpha1.Repository) (provider.Interface, *zap.SugaredLogger, error) { +func (l *listener) processIncoming(event *info.Event, targetRepo *v1alpha1.Repository) (provider.Interface, *zap.SugaredLogger, error) { // can a git ssh URL be a Repo URL? I don't think this will even ever work org, repo, err := formatting.GetRepoOwnerSplitted(targetRepo.Spec.URL) if err != nil { return nil, nil, err } - l.event.Organization = org - l.event.Repository = repo + event.Organization = org + event.Repository = repo var provider provider.Interface if targetRepo.Spec.GitProvider == nil || targetRepo.Spec.GitProvider.Type == "" { diff --git a/pkg/adapter/incoming_test.go b/pkg/adapter/incoming_test.go index f66a791492..5eb6c74da2 100644 --- a/pkg/adapter/incoming_test.go +++ b/pkg/adapter/incoming_test.go @@ -830,8 +830,8 @@ func Test_listener_detectIncoming(t *testing.T) { run: client, logger: logger, kint: kint, - event: info.NewEvent(), } + event := info.NewEvent() // make a new request req := httptest.NewRequestWithContext(ctx, tt.args.method, @@ -839,7 +839,7 @@ func Test_listener_detectIncoming(t *testing.T) { tt.args.queryRepository, tt.args.querySecret, tt.args.queryPipelineRun, tt.args.queryBranch, tt.args.queryNamespace), strings.NewReader(tt.args.incomingBody)) req.Header = tt.args.queryHeaders - got, _, err := l.detectIncoming(ctx, req, []byte(tt.args.incomingBody)) + got, _, err := l.detectIncoming(ctx, event, req, []byte(tt.args.incomingBody)) if tt.wantSubstrErr != "" { assert.Assert(t, err != nil) assert.ErrorContains(t, err, tt.wantSubstrErr) @@ -850,7 +850,7 @@ func Test_listener_detectIncoming(t *testing.T) { return } assert.Equal(t, got, tt.want, "err = %v", err) - assert.Equal(t, l.event.TargetPipelineRun, tt.args.queryPipelineRun) + assert.Equal(t, event.TargetPipelineRun, tt.args.queryPipelineRun) }) } } @@ -1012,16 +1012,17 @@ func Test_listener_processIncoming(t *testing.T) { observer, _ := zapobserver.New(zap.InfoLevel) logger := zap.New(observer).Sugar() l := &listener{ - run: client, kint: kint, logger: logger, event: info.NewEvent(), + run: client, kint: kint, logger: logger, } - pintf, _, err := l.processIncoming(tt.targetRepo) + event := info.NewEvent() + pintf, _, err := l.processIncoming(event, tt.targetRepo) if tt.wantErr { assert.Assert(t, err != nil) return } assert.Assert(t, reflect.TypeOf(pintf).Elem() == reflect.TypeOf(tt.want).Elem()) - assert.Assert(t, l.event.Organization == tt.wantOrg) - assert.Assert(t, l.event.Repository == tt.wantRepo) + assert.Assert(t, event.Organization == tt.wantOrg) + assert.Assert(t, event.Repository == tt.wantRepo) }) } } @@ -1158,9 +1159,9 @@ func Test_detectIncoming_legacy_warning(t *testing.T) { run: client, logger: logger, kint: kint, - event: info.NewEvent(), } - got, _, err := l.detectIncoming(ctx, tt.req, tt.body) + event := info.NewEvent() + got, _, err := l.detectIncoming(ctx, event, tt.req, tt.body) assert.NilError(t, err) assert.Assert(t, got) found := false @@ -1229,9 +1230,9 @@ func Test_detectIncoming_body_params_are_parsed(t *testing.T) { run: client, logger: zap.NewNop().Sugar(), kint: kint, - event: info.NewEvent(), } - got, _, err := l.detectIncoming(ctx, req, []byte(payload)) + event := info.NewEvent() + got, _, err := l.detectIncoming(ctx, event, req, []byte(payload)) assert.NilError(t, err) assert.Assert(t, got) }