Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 84 additions & 15 deletions cmd/payload-testing-prow-plugin/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,41 @@ func (s *server) handle(l *logrus.Entry, ic github.IssueCommentEvent) (string, [
return strings.Join(messages, "\n"), includedAdditionalPRs.UnsortedList()
}

// abortJob aborts a single ProwJob and returns whether it was actually aborted and any error
func (s *server) abortJob(job *prowapi.ProwJob, logger *logrus.Entry) (bool, error) {
if job.Complete() {
return false, nil
}
logger.Debugf("aborting prowjob")
job.Status.State = prowapi.AbortedState
if err := s.kubeClient.Update(s.ctx, job); err != nil {
logger.WithError(err).Errorf("failed to abort prowjob")
return false, err
}
logger.Debugf("aborted prowjob")
return true, nil
}

// isAggregatorJob checks if a ProwJob is an aggregator job and returns the aggregation ID
func isAggregatorJob(job *prowapi.ProwJob) (string, bool) {
if job.Labels == nil {
return "", false
}
aggregationID, exists := job.Labels[api.AggregationIDLabel]
if !exists {
return "", false
}
// Also check that the job name (from annotation) is prefixed with "aggregator-"
if job.Annotations == nil {
return "", false
}
jobName, exists := job.Annotations[api.ProwJobJobNameAnnotation]
if !exists || !strings.HasPrefix(jobName, "aggregator-") {
return "", false
}
return aggregationID, true
}

func (s *server) abortAll(logger *logrus.Entry, ic github.IssueCommentEvent) string {
org := ic.Repo.Owner.Login
repo := ic.Repo.Name
Expand All @@ -539,6 +574,7 @@ func (s *server) abortAll(logger *logrus.Entry, ic github.IssueCommentEvent) str
}

var erroredJobs []string
var totalJobsAborted int
for _, jobName := range jobs {
jobLogger := logger.WithField("jobName", jobName)
job := &prowapi.ProwJob{}
Expand All @@ -547,29 +583,62 @@ func (s *server) abortAll(logger *logrus.Entry, ic github.IssueCommentEvent) str
erroredJobs = append(erroredJobs, jobName)
continue
}
// Do not abort jobs that already completed
if job.Complete() {
continue
}
jobLogger.Debugf("aborting prowjob")
job.Status.State = prowapi.AbortedState
// We use Update and not Patch here, because we are not the authority of the .Status.State field
// and must not overwrite changes made to it in the interim by the responsible agent.
// The accepted trade-off for now is that this leads to failure if unrelated fields where changed
// by another different actor.
if err = s.kubeClient.Update(s.ctx, job); err != nil {
jobLogger.WithError(err).Errorf("failed to abort prowjob")

wasAborted, err := s.abortJob(job, jobLogger)
if err != nil {
erroredJobs = append(erroredJobs, jobName)
} else if wasAborted {
totalJobsAborted++
} else {
jobLogger.Debugf("aborted prowjob")
jobLogger.Info("job was already complete")
}

// If this is an aggregator job, abort all job runs for the aggregator
if aggregationID, isAggregator := isAggregatorJob(job); isAggregator {
jobLogger.Info("Found aggregator job, aborting all jobs that the aggregator is tracking...")

var aggregatedJobs prowapi.ProwJobList
labelSelector := labels.Set{api.AggregationIDLabel: aggregationID}.AsSelector()
listOpts := ctrlruntimeclient.ListOptions{
Namespace: s.namespace,
LabelSelector: labelSelector,
}

if err := s.kubeClient.List(s.ctx, &aggregatedJobs, &listOpts); err != nil {
jobLogger.WithError(err).Error("failed to list aggregated jobs")
erroredJobs = append(erroredJobs, fmt.Sprintf("%s-aggregated-jobs", jobName))
continue
}

for i := range aggregatedJobs.Items {
aggregatedJob := &aggregatedJobs.Items[i]
aggregatedJobLogger := jobLogger.WithFields(logrus.Fields{
"aggregatedJobName": aggregatedJob.Name,
"aggregatedJobSpec": aggregatedJob.Spec.Job,
})

// Skip the aggregator job itself to avoid double processing
if aggregatedJob.Name == job.Name {
continue
}

wasAborted, err = s.abortJob(aggregatedJob, aggregatedJobLogger)
if err != nil {
erroredJobs = append(erroredJobs, aggregatedJob.Name)
} else if wasAborted {
totalJobsAborted++
} else {
aggregatedJobLogger.Info("job was already complete")
}
}
}
}

if len(erroredJobs) > 0 {
return fmt.Sprintf("Failed to abort %d payload jobs out of %d. Failed jobs: %s", len(erroredJobs), len(jobs), strings.Join(erroredJobs, ", "))
return fmt.Sprintf("Failed to abort some payload jobs. Total jobs aborted: %d. Failed jobs: %s", totalJobsAborted, strings.Join(erroredJobs, ", "))
}

return fmt.Sprintf("aborted active payload jobs for pull request %s/%s#%d", org, repo, prNumber)
return fmt.Sprintf("aborted %d active payload job(s) for pull request %s/%s#%d", totalJobsAborted, org, repo, prNumber)
}

func (s *server) getPayloadJobsForPR(org, repo string, prNumber int, logger *logrus.Entry) ([]string, error) {
Expand Down
95 changes: 94 additions & 1 deletion cmd/payload-testing-prow-plugin/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"strings"
"testing"
"time"

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
Expand Down Expand Up @@ -1050,7 +1051,99 @@ trigger 0 job(s) of type all for the ci release of OCP 4.8
Body: "/payload-abort",
},
},
expectedMessage: `aborted active payload jobs for pull request org/repo#123`,
expectedMessage: `aborted 1 active payload job(s) for pull request org/repo#123`,
},
{
name: "abort all jobs aborts underlying aggregated job runs",
s: &server{
ghc: ghc,
ctx: context.TODO(),
kubeClient: fakeclient.NewClientBuilder().WithRuntimeObjects(
&prpqv1.PullRequestPayloadQualificationRun{
ObjectMeta: metav1.ObjectMeta{
Namespace: "ci",
Labels: map[string]string{
kube.OrgLabel: "org",
kube.RepoLabel: "repo",
kube.PullLabel: "123",
},
},
Spec: prpqv1.PullRequestPayloadTestSpec{},
Status: prpqv1.PullRequestPayloadTestStatus{
Jobs: []prpqv1.PullRequestPayloadJobStatus{
{
Status: prowapi.ProwJobStatus{State: prowapi.PendingState},
ProwJob: "aggregator-some-job",
},
},
},
},
// Aggregator job with aggregation-id label (pending - will be aborted)
&prowapi.ProwJob{
ObjectMeta: metav1.ObjectMeta{
Name: "aggregator-some-job",
Namespace: "ci",
Labels: map[string]string{
api.AggregationIDLabel: "test-aggregation-id",
},
Annotations: map[string]string{
api.ProwJobJobNameAnnotation: "aggregator-some-job",
},
},
Status: prowapi.ProwJobStatus{State: prowapi.PendingState},
},
// First underlying aggregated job (pending - will be aborted)
&prowapi.ProwJob{
ObjectMeta: metav1.ObjectMeta{
Name: "some-job-0",
Namespace: "ci",
Labels: map[string]string{
api.AggregationIDLabel: "test-aggregation-id",
},
},
Status: prowapi.ProwJobStatus{State: prowapi.PendingState},
},
// Second underlying aggregated job (triggered - will be aborted)
&prowapi.ProwJob{
ObjectMeta: metav1.ObjectMeta{
Name: "some-job-1",
Namespace: "ci",
Labels: map[string]string{
api.AggregationIDLabel: "test-aggregation-id",
},
},
Status: prowapi.ProwJobStatus{State: prowapi.TriggeredState},
},
// Third underlying aggregated job (already completed - will NOT be counted)
&prowapi.ProwJob{
ObjectMeta: metav1.ObjectMeta{
Name: "some-job-2",
Namespace: "ci",
Labels: map[string]string{
api.AggregationIDLabel: "test-aggregation-id",
},
},
Status: prowapi.ProwJobStatus{
State: prowapi.SuccessState,
CompletionTime: &metav1.Time{Time: time.Now()},
},
},
).Build(),
namespace: "ci",
trustedChecker: &fakeTrustedChecker{},
},
ic: github.IssueCommentEvent{
GUID: "guid",
Repo: github.Repo{Owner: github.User{Login: "org"}, Name: "repo"},
Issue: github.Issue{
Number: 123,
PullRequest: &struct{}{},
},
Comment: github.IssueComment{
Body: "/payload-abort",
},
},
expectedMessage: `aborted 3 active payload job(s) for pull request org/repo#123`,
},
{
name: "incorrectly formatted command",
Expand Down
7 changes: 7 additions & 0 deletions pkg/api/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,13 @@ const (
// to copy the annotation if it exists
ReleaseConfigAnnotation = "release.openshift.io/config"

// AggregationIDLabel is the label used to link aggregated jobs to their aggregator job
AggregationIDLabel = "release.openshift.io/aggregation-id"

// ProwJobJobNameAnnotation is the annotation in prowJob for the Job Name.
// It is used to match relevant job names for different aggregators
ProwJobJobNameAnnotation = "prow.k8s.io/job"

ImageStreamImportRetries = 6

NestedPodmanSCC = "nested-podman"
Expand Down
10 changes: 4 additions & 6 deletions pkg/controller/prpqr_reconciler/prpqr_reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,6 @@ const (
conditionAllJobsTriggered = "AllJobsTriggered"
conditionWithErrors = "WithErrors"

aggregationIDLabel = "release.openshift.io/aggregation-id"

dependentProwJobsFinalizer = "pullrequestpayloadqualificationruns.ci.openshift.io/dependent-prowjobs"
)

Expand Down Expand Up @@ -406,7 +404,7 @@ func (r *reconciler) abortJobs(ctx context.Context,
if labels == nil {
return "", false
}
label, exists := labels[aggregationIDLabel]
label, exists := labels[api.AggregationIDLabel]
return label, exists
}

Expand Down Expand Up @@ -448,7 +446,7 @@ func (r *reconciler) abortJobs(ctx context.Context,
logger.Info("Aborting aggregated prowjobs...")

var aggregatedProwjobs prowv1.ProwJobList
if err := r.client.List(ctx, &aggregatedProwjobs, ctrlruntimeclient.MatchingLabels{aggregationIDLabel: aggregationId}); err != nil {
if err := r.client.List(ctx, &aggregatedProwjobs, ctrlruntimeclient.MatchingLabels{api.AggregationIDLabel: aggregationId}); err != nil {
logger.WithError(err).Error("Failed to list aggregated jobs")
continue
}
Expand Down Expand Up @@ -782,7 +780,7 @@ func (r *reconciler) generateAggregatedProwjobs(uid string, ciopConfig *api.Rele

for i := 0; i < spec.AggregatedCount; i++ {
opts := &aggregatedOptions{
labels: map[string]string{aggregationIDLabel: uid},
labels: map[string]string{api.AggregationIDLabel: uid},
aggregatedIndex: i,
releaseJobName: spec.JobName(jobconfig.PeriodicPrefix),
}
Expand Down Expand Up @@ -854,7 +852,7 @@ func generateAggregatorJob(baseCiop *api.Metadata, uid, aggregatorJobName, jobNa
return nil, fmt.Errorf("failed to default the ProwJob: %w", err)
}

labels := map[string]string{aggregationIDLabel: uid, v1.PullRequestPayloadQualificationRunLabel: prpqrName}
labels := map[string]string{api.AggregationIDLabel: uid, v1.PullRequestPayloadQualificationRunLabel: prpqrName}
annotations := map[string]string{releaseJobNameAnnotation: jobNameHash(aggregatorJobName)}

cfg := getCfg.Config()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"github.com/sirupsen/logrus"

prowjobv1 "sigs.k8s.io/prow/pkg/apis/prowjobs/v1"

"github.com/openshift/ci-tools/pkg/api"
)

const (
Expand All @@ -14,7 +16,7 @@ const (
// ProwJobPayloadInvocationIDLabel is the name of the label for the payload invocation id in prow job
ProwJobPayloadInvocationIDLabel = "release.openshift.io/aggregation-id"
// prowJobReleaseJobNameAnnotation refers to the original periodic job name for PR based payload runs.
// This is a special case for the PR invoked payload jobs where ProwJobJobNameAnnotation annotation
// This is a special case for the PR invoked payload jobs where api.ProwJobJobNameAnnotation annotation
// refers to a uniquely generated name per job run. Thus, prowJobReleaseJobNameAnnotation is used to
// refer to the original job name.
prowJobReleaseJobNameAnnotation = "releaseJobName"
Expand All @@ -23,7 +25,7 @@ const (
func NewProwJobMatcherFuncForPR(matchJobName, matchID, matchLabel string) ProwJobMatcherFunc {
return func(prowJob *prowjobv1.ProwJob) bool {
id := prowJob.Labels[matchLabel]
jobName := prowJob.Annotations[ProwJobJobNameAnnotation]
jobName := prowJob.Annotations[api.ProwJobJobNameAnnotation]
jobRunId := prowJob.Labels[prowJobJobRunIDLabel]
if releaseJobName, ok := prowJob.Annotations[prowJobReleaseJobNameAnnotation]; ok {
if releaseJobName != matchJobName {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"github.com/sirupsen/logrus"

prowjobv1 "sigs.k8s.io/prow/pkg/apis/prowjobs/v1"

"github.com/openshift/ci-tools/pkg/api"
)

const (
Expand All @@ -21,7 +23,7 @@ func GetPayloadTagFromProwJob(prowJob *prowjobv1.ProwJob) string {
func NewProwJobMatcherFuncForReleaseController(matchJobName, matchPayloadTag string) ProwJobMatcherFunc {
return func(prowJob *prowjobv1.ProwJob) bool {
payloadTag := GetPayloadTagFromProwJob(prowJob)
jobName := prowJob.Annotations[ProwJobJobNameAnnotation]
jobName := prowJob.Annotations[api.ProwJobJobNameAnnotation]
jobRunId := prowJob.Labels[prowJobJobRunIDLabel]
if jobName != matchJobName {
return false
Expand Down
3 changes: 0 additions & 3 deletions pkg/jobrunaggregator/jobrunaggregatorlib/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,6 @@ import (
const (
JobStateQuerySourceBigQuery = "bigquery"
JobStateQuerySourceCluster = "cluster"
// ProwJobJobNameAnnotation is the annotation in prowJob for the Job Name.
// It is used to match relevant job names for different aggregators
ProwJobJobNameAnnotation = "prow.k8s.io/job"
// prowJobJobRunIDLabel is the label in prowJob for the prow job run ID. It is a unique identifier for job runs across different jobs
prowJobJobRunIDLabel = "prow.k8s.io/build-id"
)
Expand Down
5 changes: 3 additions & 2 deletions pkg/jobrunaggregator/jobruntestcaseanalyzer/analyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
prowjobv1 "sigs.k8s.io/prow/pkg/apis/prowjobs/v1"
prowjobclientset "sigs.k8s.io/prow/pkg/client/clientset/versioned"

"github.com/openshift/ci-tools/pkg/api"
"github.com/openshift/ci-tools/pkg/jobrunaggregator/jobrunaggregatorapi"
"github.com/openshift/ci-tools/pkg/jobrunaggregator/jobrunaggregatorlib"
"github.com/openshift/ci-tools/pkg/junit"
Expand Down Expand Up @@ -95,7 +96,7 @@ type testCaseAnalyzerJobGetter struct {
}

func (s *testCaseAnalyzerJobGetter) shouldAggregateJob(prowJob *prowjobv1.ProwJob) bool {
jobName := prowJob.Annotations[jobrunaggregatorlib.ProwJobJobNameAnnotation]
jobName := prowJob.Annotations[api.ProwJobJobNameAnnotation]
// if PR payload, only find the exact jobs
if s.jobGCSPrefixes != nil && len(*s.jobGCSPrefixes) > 0 {
if !s.jobNames.Has(jobName) {
Expand Down Expand Up @@ -427,7 +428,7 @@ func (o *JobRunTestCaseAnalyzerOptions) shouldAggregateJob(prowJob *prowjobv1.Pr
}
// second level of match deal with payload or invocation ID
var prowJobRunMatcherFunc jobrunaggregatorlib.ProwJobMatcherFunc
jobName := prowJob.Annotations[jobrunaggregatorlib.ProwJobJobNameAnnotation]
jobName := prowJob.Annotations[api.ProwJobJobNameAnnotation]
if len(o.payloadTag) > 0 {
prowJobRunMatcherFunc = jobrunaggregatorlib.NewProwJobMatcherFuncForReleaseController(jobName, o.payloadTag)
}
Expand Down