diff --git a/pkg/queue/queue_manager.go b/pkg/queue/queue_manager.go index cd1aa62374..3a98436bcc 100644 --- a/pkg/queue/queue_manager.go +++ b/pkg/queue/queue_manager.go @@ -198,15 +198,19 @@ func (qm *Manager) InitQueues(ctx context.Context, tekton versioned2.Interface, return err } - // pipelineRuns from the namespace where repository is present - // those are required for creating queues - for _, repo := range repos.Items { + // Group repositories by namespace to avoid duplicate List() calls per namespace. + namespaceRepos := make(map[string][]*v1alpha1.Repository) + for i := range repos.Items { + repo := &repos.Items[i] if repo.Spec.ConcurrencyLimit == nil || *repo.Spec.ConcurrencyLimit == 0 { continue } + namespaceRepos[repo.Namespace] = append(namespaceRepos[repo.Namespace], repo) + } - // add all pipelineRuns in started state to pending queue - prs, err := tekton.TektonV1().PipelineRuns(repo.Namespace). + for namespace, reposInNS := range namespaceRepos { + // Fetch started PipelineRuns ONCE per namespace + startedPRs, err := tekton.TektonV1().PipelineRuns(namespace). List(ctx, v1.ListOptions{ LabelSelector: fmt.Sprintf("%s=%s", keys.State, kubeinteraction.StateStarted), }) @@ -214,25 +218,8 @@ func (qm *Manager) InitQueues(ctx context.Context, tekton versioned2.Interface, return err } - // sort the pipelinerun by creation time before adding to queue - sortedPRs := sortPipelineRunsByCreationTimestamp(prs.Items) - - for _, pr := range sortedPRs { - order, exist := pr.GetAnnotations()[keys.ExecutionOrder] - if !exist { - // if the pipelineRun doesn't have order label then wait - return nil - } - orderedList := FilterPipelineRunByState(ctx, tekton, strings.Split(order, ","), "", kubeinteraction.StateStarted) - - _, err = qm.AddListToRunningQueue(&repo, orderedList) - if err != nil { - qm.logger.Error("failed to init queue for repo: ", repo.GetName()) - } - } - - // now fetch all queued pipelineRun - prs, err = tekton.TektonV1().PipelineRuns(repo.Namespace). + // Fetch queued PipelineRuns ONCE per namespace + queuedPRs, err := tekton.TektonV1().PipelineRuns(namespace). List(ctx, v1.ListOptions{ LabelSelector: fmt.Sprintf("%s=%s", keys.State, kubeinteraction.StateQueued), }) @@ -240,19 +227,10 @@ func (qm *Manager) InitQueues(ctx context.Context, tekton versioned2.Interface, return err } - // sort the pipelinerun by creation time before adding to queue - sortedPRs = sortPipelineRunsByCreationTimestamp(prs.Items) - - for _, pr := range sortedPRs { - order, exist := pr.GetAnnotations()[keys.ExecutionOrder] - if !exist { - // if the pipelineRun doesn't have order label then wait - return nil - } - orderedList := FilterPipelineRunByState(ctx, tekton, strings.Split(order, ","), tektonv1.PipelineRunSpecStatusPending, kubeinteraction.StateQueued) - if err := qm.AddToPendingQueue(&repo, orderedList); err != nil { - qm.logger.Error("failed to init queue for repo: ", repo.GetName()) - } + // Process each repository in the namespace + for _, repo := range reposInNS { + qm.sortAndQueuePipelines(ctx, tekton, repo, startedPRs.Items, "running") + qm.sortAndQueuePipelines(ctx, tekton, repo, queuedPRs.Items, "pending") } } @@ -289,6 +267,47 @@ func (qm *Manager) RunningPipelineRuns(repo *v1alpha1.Repository) []string { return []string{} } +func (qm *Manager) sortAndQueuePipelines( + ctx context.Context, + tekton versioned2.Interface, + repo *v1alpha1.Repository, + prList []tektonv1.PipelineRun, + queueState string, +) { + // Process PRs for this repository + // Note: We iterate all PRs in namespace, but execution-order annotation + // determines which PRs actually belong to this repo + sortedPRs := sortPipelineRunsByCreationTimestamp(prList) + + for _, pr := range sortedPRs { + // Check if the pipelineRun belongs to the repository + if pr.GetLabels()[keys.Repository] != repo.GetName() { + continue + } + + order, exist := pr.GetAnnotations()[keys.ExecutionOrder] + if !exist { + // if the pipelineRun doesn't have order label then skip it + continue + } + + switch queueState { + case "running": + orderedList := FilterPipelineRunByState(ctx, tekton, strings.Split(order, ","), "", kubeinteraction.StateStarted) + // AddListToRunningQueue will only add PRs that match this repo + _, err := qm.AddListToRunningQueue(repo, orderedList) + if err != nil { + qm.logger.Error("failed to init queue for repo: ", repo.GetName()) + } + case "pending": + orderedList := FilterPipelineRunByState(ctx, tekton, strings.Split(order, ","), tektonv1.PipelineRunSpecStatusPending, kubeinteraction.StateQueued) + if err := qm.AddToPendingQueue(repo, orderedList); err != nil { + qm.logger.Error("failed to init queue for repo: ", repo.GetName()) + } + } + } +} + func sortPipelineRunsByCreationTimestamp(prs []tektonv1.PipelineRun) []*tektonv1.PipelineRun { runTimeObj := make([]runtime.Object, len(prs)) for i := range prs { diff --git a/pkg/queue/queue_manager_test.go b/pkg/queue/queue_manager_test.go index d52d00e872..66ff0812d1 100644 --- a/pkg/queue/queue_manager_test.go +++ b/pkg/queue/queue_manager_test.go @@ -274,6 +274,176 @@ func TestQueueManager_InitQueues(t *testing.T) { assert.Equal(t, len(runs), 1) } +func TestQueueManager_InitQueues_SkipsMissingExecutionOrder(t *testing.T) { + // This test verifies the fix for the early-return bug where a PipelineRun + // without execution-order would cause InitQueues to stop processing all + // remaining repositories. + ctx, _ := rtesting.SetupFakeContext(t) + observer, _ := zapobserver.New(zap.InfoLevel) + logger := zap.New(observer).Sugar() + + startedLabel := map[string]string{ + keys.State: kubeinteraction.StateStarted, + } + queuedLabel := map[string]string{ + keys.State: kubeinteraction.StateQueued, + } + + // Create two repositories + repo1 := &v1alpha1.Repository{ + ObjectMeta: metav1.ObjectMeta{ + Name: "repo1", + Namespace: "ns1", + }, + Spec: v1alpha1.RepositorySpec{ + ConcurrencyLimit: intPtr(1), + }, + } + repo2 := &v1alpha1.Repository{ + ObjectMeta: metav1.ObjectMeta{ + Name: "repo2", + Namespace: "ns2", + }, + Spec: v1alpha1.RepositorySpec{ + ConcurrencyLimit: intPtr(1), + }, + } + + // Repo1 has a PipelineRun WITHOUT execution-order (the bug scenario) + prWithoutOrder := newTestPR("pr-without-order", time.Now(), startedLabel, map[string]string{ + keys.State: kubeinteraction.StateStarted, + // No execution-order annotation + }, tektonv1.PipelineRunSpec{}) + prWithoutOrder.Namespace = "ns1" + + // Repo2 has a normal PipelineRun WITH execution-order + prWithOrder := newTestPR("pr-with-order", time.Now(), startedLabel, map[string]string{ + keys.ExecutionOrder: "ns2/pr-with-order", + keys.State: kubeinteraction.StateStarted, + }, tektonv1.PipelineRunSpec{}) + prWithOrder.Namespace = "ns2" + + // Also add a queued PR to repo2 + prQueued := newTestPR("pr-queued", time.Now(), queuedLabel, map[string]string{ + keys.ExecutionOrder: "ns2/pr-with-order,ns2/pr-queued", + keys.State: kubeinteraction.StateQueued, + }, tektonv1.PipelineRunSpec{ + Status: tektonv1.PipelineRunSpecStatusPending, + }) + prQueued.Namespace = "ns2" + + tdata := testclient.Data{ + Repositories: []*v1alpha1.Repository{repo1, repo2}, + PipelineRuns: []*tektonv1.PipelineRun{prWithoutOrder, prWithOrder, prQueued}, + } + stdata, _ := testclient.SeedTestData(t, ctx, tdata) + + qm := NewManager(logger) + + // Before the fix, this would return early when encountering prWithoutOrder, + // and repo2 would not get its queues initialized. + err := qm.InitQueues(ctx, stdata.Pipeline, stdata.PipelineAsCode) + assert.NilError(t, err) + + // Verify repo1 has no queues (PipelineRun without execution-order was skipped) + sema1 := qm.queueMap[RepoKey(repo1)] + assert.Assert(t, sema1 == nil || len(sema1.getCurrentRunning()) == 0) + + // Verify repo2 DOES have queues initialized (processing continued after repo1) + sema2 := qm.queueMap[RepoKey(repo2)] + assert.Assert(t, sema2 != nil, "repo2 should have queues initialized") + assert.Equal(t, len(sema2.getCurrentRunning()), 1, "repo2 should have 1 running PR") + assert.Equal(t, len(sema2.getCurrentPending()), 1, "repo2 should have 1 pending PR") +} + +func TestQueueManager_InitQueues_NamespaceDeduplication(t *testing.T) { + // This test verifies namespace deduplication: multiple repos in the same + // namespace should only trigger ONE List() call per namespace, not one per repo. + ctx, _ := rtesting.SetupFakeContext(t) + observer, _ := zapobserver.New(zap.InfoLevel) + logger := zap.New(observer).Sugar() + + startedLabel := map[string]string{ + keys.State: kubeinteraction.StateStarted, + } + + // Create THREE repositories in the SAME namespace + repo1 := &v1alpha1.Repository{ + ObjectMeta: metav1.ObjectMeta{ + Name: "repo1", + Namespace: "shared-ns", + }, + Spec: v1alpha1.RepositorySpec{ + ConcurrencyLimit: intPtr(1), + }, + } + repo2 := &v1alpha1.Repository{ + ObjectMeta: metav1.ObjectMeta{ + Name: "repo2", + Namespace: "shared-ns", + }, + Spec: v1alpha1.RepositorySpec{ + ConcurrencyLimit: intPtr(1), + }, + } + repo3 := &v1alpha1.Repository{ + ObjectMeta: metav1.ObjectMeta{ + Name: "repo3", + Namespace: "shared-ns", + }, + Spec: v1alpha1.RepositorySpec{ + ConcurrencyLimit: intPtr(1), + }, + } + + // Each repo has its own started PipelineRun + pr1 := newTestPR("pr1", time.Now(), startedLabel, map[string]string{ + keys.ExecutionOrder: "shared-ns/pr1", + keys.State: kubeinteraction.StateStarted, + }, tektonv1.PipelineRunSpec{}) + pr1.Namespace = "shared-ns" + + pr2 := newTestPR("pr2", time.Now(), startedLabel, map[string]string{ + keys.ExecutionOrder: "shared-ns/pr2", + keys.State: kubeinteraction.StateStarted, + }, tektonv1.PipelineRunSpec{}) + pr2.Namespace = "shared-ns" + + pr3 := newTestPR("pr3", time.Now(), startedLabel, map[string]string{ + keys.ExecutionOrder: "shared-ns/pr3", + keys.State: kubeinteraction.StateStarted, + }, tektonv1.PipelineRunSpec{}) + pr3.Namespace = "shared-ns" + + tdata := testclient.Data{ + Repositories: []*v1alpha1.Repository{repo1, repo2, repo3}, + PipelineRuns: []*tektonv1.PipelineRun{pr1, pr2, pr3}, + } + stdata, _ := testclient.SeedTestData(t, ctx, tdata) + + qm := NewManager(logger) + + err := qm.InitQueues(ctx, stdata.Pipeline, stdata.PipelineAsCode) + assert.NilError(t, err) + + // Verify all three repos have their queues initialized + sema1 := qm.queueMap[RepoKey(repo1)] + assert.Assert(t, sema1 != nil, "repo1 should have queues") + assert.Equal(t, len(sema1.getCurrentRunning()), 1, "repo1 should have 1 running PR") + + sema2 := qm.queueMap[RepoKey(repo2)] + assert.Assert(t, sema2 != nil, "repo2 should have queues") + assert.Equal(t, len(sema2.getCurrentRunning()), 1, "repo2 should have 1 running PR") + + sema3 := qm.queueMap[RepoKey(repo3)] + assert.Assert(t, sema3 != nil, "repo3 should have queues") + assert.Equal(t, len(sema3.getCurrentRunning()), 1, "repo3 should have 1 running PR") + + // Before optimization: 3 repos × 2 states = 6 List() calls + // After optimization: 1 namespace × 2 states = 2 List() calls + // (We can't directly measure API calls in this test, but the logic ensures deduplication) +} + func TestFilterPipelineRunByInProgress(t *testing.T) { ctx, _ := rtesting.SetupFakeContext(t) ns := "test-ns" diff --git a/pkg/reconciler/reconciler.go b/pkg/reconciler/reconciler.go index 9314a85d4a..02ea09e222 100644 --- a/pkg/reconciler/reconciler.go +++ b/pkg/reconciler/reconciler.go @@ -60,10 +60,32 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, pr *tektonv1.PipelineRun logger.Debugf("reconciling pipelineRun %s/%s", pr.GetNamespace(), pr.GetName()) - // make sure we have the latest pipelinerun to reconcile, since there is something updating at the same time - lpr, err := r.run.Clients.Tekton.TektonV1().PipelineRuns(pr.GetNamespace()).Get(ctx, pr.GetName(), metav1.GetOptions{}) - if err != nil { - return fmt.Errorf("cannot get pipelineRun: %w", err) + // Early exit for completed/failed state to avoid unnecessary API calls and reconcile churn + state, exist := pr.GetAnnotations()[keys.State] + if exist && (state == kubeinteraction.StateCompleted || state == kubeinteraction.StateFailed) { + return nil + } + + // Use lister (informer cache) instead of fresh Get() when possible. + // Only do a fresh Get() if we need to update the object to avoid resource version conflicts. + // This reduces API server load and reconcile frequency. + var lpr *tektonv1.PipelineRun + var err error + if r.pipelineRunLister != nil { + lpr, err = r.pipelineRunLister.PipelineRuns(pr.GetNamespace()).Get(pr.GetName()) + if err != nil { + // If not in cache, fall back to direct Get + lpr, err = r.run.Clients.Tekton.TektonV1().PipelineRuns(pr.GetNamespace()).Get(ctx, pr.GetName(), metav1.GetOptions{}) + if err != nil { + return fmt.Errorf("cannot get pipelineRun: %w", err) + } + } + } else { + // Lister not available, use direct Get + lpr, err = r.run.Clients.Tekton.TektonV1().PipelineRuns(pr.GetNamespace()).Get(ctx, pr.GetName(), metav1.GetOptions{}) + if err != nil { + return fmt.Errorf("cannot get pipelineRun: %w", err) + } } if lpr.GetResourceVersion() != pr.GetResourceVersion() { @@ -71,11 +93,8 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, pr *tektonv1.PipelineRun return nil } - // if pipelineRun is in completed or failed state then return - state, exist := pr.GetAnnotations()[keys.State] - if exist && (state == kubeinteraction.StateCompleted || state == kubeinteraction.StateFailed) { - return nil - } + // Update pr to the latest version from lister/cache + pr = lpr reason := "" if len(pr.Status.GetConditions()) > 0 { @@ -253,17 +272,17 @@ func (r *Reconciler) reportFinalStatus(ctx context.Context, logger *zap.SugaredL return repo, fmt.Errorf("cannot update run status: %w", err) } - if _, err := r.updatePipelineRunState(ctx, logger, pr, finalState); err != nil { + if _, err := r.updatePipelineRunState(ctx, logger, newPr, finalState); err != nil { return repo, fmt.Errorf("cannot update state: %w", err) } - if err := r.emitMetrics(pr); err != nil { + if err := r.emitMetrics(newPr); err != nil { logger.Error("failed to emit metrics: ", err) } // remove pipelineRun from Queue and start the next one for { - next := r.qm.RemoveAndTakeItemFromQueue(repo, pr) + next := r.qm.RemoveAndTakeItemFromQueue(repo, newPr) if next == "" { break } @@ -282,7 +301,7 @@ func (r *Reconciler) reportFinalStatus(ctx context.Context, logger *zap.SugaredL break } - if err := r.cleanupPipelineRuns(ctx, logger, pacInfo, repo, pr); err != nil { + if err := r.cleanupPipelineRuns(ctx, logger, pacInfo, repo, newPr); err != nil { return repo, fmt.Errorf("error cleaning pipelineruns: %w", err) } @@ -290,6 +309,15 @@ func (r *Reconciler) reportFinalStatus(ctx context.Context, logger *zap.SugaredL } func (r *Reconciler) updatePipelineRunToInProgress(ctx context.Context, logger *zap.SugaredLogger, repo *v1alpha1.Repository, pr *tektonv1.PipelineRun) error { + // Check if already in progress to avoid redundancy + // This prevents unnecessary provider status updates and reduces conflict potential + currentState := pr.GetAnnotations()[keys.State] + scmReporting, scmExists := pr.GetAnnotations()[keys.SCMReportingPLRStarted] + if currentState == kubeinteraction.StateStarted && scmExists && scmReporting == "true" { + logger.Debugf("pipelineRun %s/%s already marked as in-progress, skipping update", pr.GetNamespace(), pr.GetName()) + return nil + } + pr, err := r.updatePipelineRunState(ctx, logger, pr, kubeinteraction.StateStarted) if err != nil { return fmt.Errorf("cannot update state: %w", err) @@ -377,6 +405,23 @@ func (r *Reconciler) initGitProviderClient(ctx context.Context, logger *zap.Suga func (r *Reconciler) updatePipelineRunState(ctx context.Context, logger *zap.SugaredLogger, pr *tektonv1.PipelineRun, state string) (*tektonv1.PipelineRun, error) { currentState := pr.GetAnnotations()[keys.State] + + // Skip update if state is already correct (idempotency check) + // This reduces API server load and prevents unnecessary resource version conflicts + if currentState == state { + // For "started" state, also check if SCMReportingPLRStarted annotation is already set + if state == kubeinteraction.StateStarted { + if scmReporting, exists := pr.GetAnnotations()[keys.SCMReportingPLRStarted]; exists && scmReporting == "true" { + logger.Debugf("pipelineRun %v/%v already in state %s with SCMReportingPLRStarted=true, skipping update", pr.GetNamespace(), pr.GetName(), state) + return pr, nil + } + // State is correct but annotation missing, continue with update + } else { + logger.Debugf("pipelineRun %v/%v already in state %s, skipping update", pr.GetNamespace(), pr.GetName(), state) + return pr, nil + } + } + logger.Infof("updating pipelineRun %v/%v state from %s to %s", pr.GetNamespace(), pr.GetName(), currentState, state) annotations := map[string]string{ keys.State: state,