Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 56 additions & 37 deletions pkg/queue/queue_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,61 +198,39 @@ func (qm *Manager) InitQueues(ctx context.Context, tekton versioned2.Interface,
return err
}

// pipelineRuns from the namespace where repository is present
// those are required for creating queues
for _, repo := range repos.Items {
// Group repositories by namespace to avoid duplicate List() calls per namespace.
namespaceRepos := make(map[string][]*v1alpha1.Repository)
for i := range repos.Items {
repo := &repos.Items[i]
if repo.Spec.ConcurrencyLimit == nil || *repo.Spec.ConcurrencyLimit == 0 {
continue
}
namespaceRepos[repo.Namespace] = append(namespaceRepos[repo.Namespace], repo)
}

// add all pipelineRuns in started state to pending queue
prs, err := tekton.TektonV1().PipelineRuns(repo.Namespace).
for namespace, reposInNS := range namespaceRepos {
// Fetch started PipelineRuns ONCE per namespace
startedPRs, err := tekton.TektonV1().PipelineRuns(namespace).
List(ctx, v1.ListOptions{
LabelSelector: fmt.Sprintf("%s=%s", keys.State, kubeinteraction.StateStarted),
})
if err != nil {
return err
}

// sort the pipelinerun by creation time before adding to queue
sortedPRs := sortPipelineRunsByCreationTimestamp(prs.Items)

for _, pr := range sortedPRs {
order, exist := pr.GetAnnotations()[keys.ExecutionOrder]
if !exist {
// if the pipelineRun doesn't have order label then wait
return nil
}
orderedList := FilterPipelineRunByState(ctx, tekton, strings.Split(order, ","), "", kubeinteraction.StateStarted)

_, err = qm.AddListToRunningQueue(&repo, orderedList)
if err != nil {
qm.logger.Error("failed to init queue for repo: ", repo.GetName())
}
}

// now fetch all queued pipelineRun
prs, err = tekton.TektonV1().PipelineRuns(repo.Namespace).
// Fetch queued PipelineRuns ONCE per namespace
queuedPRs, err := tekton.TektonV1().PipelineRuns(namespace).
List(ctx, v1.ListOptions{
LabelSelector: fmt.Sprintf("%s=%s", keys.State, kubeinteraction.StateQueued),
})
if err != nil {
return err
}

// sort the pipelinerun by creation time before adding to queue
sortedPRs = sortPipelineRunsByCreationTimestamp(prs.Items)

for _, pr := range sortedPRs {
order, exist := pr.GetAnnotations()[keys.ExecutionOrder]
if !exist {
// if the pipelineRun doesn't have order label then wait
return nil
}
orderedList := FilterPipelineRunByState(ctx, tekton, strings.Split(order, ","), tektonv1.PipelineRunSpecStatusPending, kubeinteraction.StateQueued)
if err := qm.AddToPendingQueue(&repo, orderedList); err != nil {
qm.logger.Error("failed to init queue for repo: ", repo.GetName())
}
// Process each repository in the namespace
for _, repo := range reposInNS {
qm.sortAndQueuePipelines(ctx, tekton, repo, startedPRs.Items, "running")
qm.sortAndQueuePipelines(ctx, tekton, repo, queuedPRs.Items, "pending")
}
}

Expand Down Expand Up @@ -289,6 +267,47 @@ func (qm *Manager) RunningPipelineRuns(repo *v1alpha1.Repository) []string {
return []string{}
}

func (qm *Manager) sortAndQueuePipelines(
ctx context.Context,
tekton versioned2.Interface,
repo *v1alpha1.Repository,
prList []tektonv1.PipelineRun,
queueState string,
) {
// Process PRs for this repository
// Note: We iterate all PRs in namespace, but execution-order annotation
// determines which PRs actually belong to this repo
sortedPRs := sortPipelineRunsByCreationTimestamp(prList)

for _, pr := range sortedPRs {
// Check if the pipelineRun belongs to the repository
if pr.GetLabels()[keys.Repository] != repo.GetName() {
continue
}

order, exist := pr.GetAnnotations()[keys.ExecutionOrder]
if !exist {
// if the pipelineRun doesn't have order label then skip it
continue
}

switch queueState {
case "running":
orderedList := FilterPipelineRunByState(ctx, tekton, strings.Split(order, ","), "", kubeinteraction.StateStarted)
// AddListToRunningQueue will only add PRs that match this repo
_, err := qm.AddListToRunningQueue(repo, orderedList)
if err != nil {
qm.logger.Error("failed to init queue for repo: ", repo.GetName())
}
case "pending":
orderedList := FilterPipelineRunByState(ctx, tekton, strings.Split(order, ","), tektonv1.PipelineRunSpecStatusPending, kubeinteraction.StateQueued)
if err := qm.AddToPendingQueue(repo, orderedList); err != nil {
qm.logger.Error("failed to init queue for repo: ", repo.GetName())
}
}
}
}

func sortPipelineRunsByCreationTimestamp(prs []tektonv1.PipelineRun) []*tektonv1.PipelineRun {
runTimeObj := make([]runtime.Object, len(prs))
for i := range prs {
Expand Down
170 changes: 170 additions & 0 deletions pkg/queue/queue_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,176 @@ func TestQueueManager_InitQueues(t *testing.T) {
assert.Equal(t, len(runs), 1)
}

func TestQueueManager_InitQueues_SkipsMissingExecutionOrder(t *testing.T) {
// This test verifies the fix for the early-return bug where a PipelineRun
// without execution-order would cause InitQueues to stop processing all
// remaining repositories.
ctx, _ := rtesting.SetupFakeContext(t)
observer, _ := zapobserver.New(zap.InfoLevel)
logger := zap.New(observer).Sugar()

startedLabel := map[string]string{
keys.State: kubeinteraction.StateStarted,
}
queuedLabel := map[string]string{
keys.State: kubeinteraction.StateQueued,
}

// Create two repositories
repo1 := &v1alpha1.Repository{
ObjectMeta: metav1.ObjectMeta{
Name: "repo1",
Namespace: "ns1",
},
Spec: v1alpha1.RepositorySpec{
ConcurrencyLimit: intPtr(1),
},
}
repo2 := &v1alpha1.Repository{
ObjectMeta: metav1.ObjectMeta{
Name: "repo2",
Namespace: "ns2",
},
Spec: v1alpha1.RepositorySpec{
ConcurrencyLimit: intPtr(1),
},
}

// Repo1 has a PipelineRun WITHOUT execution-order (the bug scenario)
prWithoutOrder := newTestPR("pr-without-order", time.Now(), startedLabel, map[string]string{
keys.State: kubeinteraction.StateStarted,
// No execution-order annotation
}, tektonv1.PipelineRunSpec{})
prWithoutOrder.Namespace = "ns1"

// Repo2 has a normal PipelineRun WITH execution-order
prWithOrder := newTestPR("pr-with-order", time.Now(), startedLabel, map[string]string{
keys.ExecutionOrder: "ns2/pr-with-order",
keys.State: kubeinteraction.StateStarted,
}, tektonv1.PipelineRunSpec{})
prWithOrder.Namespace = "ns2"

// Also add a queued PR to repo2
prQueued := newTestPR("pr-queued", time.Now(), queuedLabel, map[string]string{
keys.ExecutionOrder: "ns2/pr-with-order,ns2/pr-queued",
keys.State: kubeinteraction.StateQueued,
}, tektonv1.PipelineRunSpec{
Status: tektonv1.PipelineRunSpecStatusPending,
})
prQueued.Namespace = "ns2"

tdata := testclient.Data{
Repositories: []*v1alpha1.Repository{repo1, repo2},
PipelineRuns: []*tektonv1.PipelineRun{prWithoutOrder, prWithOrder, prQueued},
}
stdata, _ := testclient.SeedTestData(t, ctx, tdata)

qm := NewManager(logger)

// Before the fix, this would return early when encountering prWithoutOrder,
// and repo2 would not get its queues initialized.
err := qm.InitQueues(ctx, stdata.Pipeline, stdata.PipelineAsCode)
assert.NilError(t, err)

// Verify repo1 has no queues (PipelineRun without execution-order was skipped)
sema1 := qm.queueMap[RepoKey(repo1)]
assert.Assert(t, sema1 == nil || len(sema1.getCurrentRunning()) == 0)

// Verify repo2 DOES have queues initialized (processing continued after repo1)
sema2 := qm.queueMap[RepoKey(repo2)]
assert.Assert(t, sema2 != nil, "repo2 should have queues initialized")
assert.Equal(t, len(sema2.getCurrentRunning()), 1, "repo2 should have 1 running PR")
assert.Equal(t, len(sema2.getCurrentPending()), 1, "repo2 should have 1 pending PR")
}

func TestQueueManager_InitQueues_NamespaceDeduplication(t *testing.T) {
// This test verifies namespace deduplication: multiple repos in the same
// namespace should only trigger ONE List() call per namespace, not one per repo.
ctx, _ := rtesting.SetupFakeContext(t)
observer, _ := zapobserver.New(zap.InfoLevel)
logger := zap.New(observer).Sugar()

startedLabel := map[string]string{
keys.State: kubeinteraction.StateStarted,
}

// Create THREE repositories in the SAME namespace
repo1 := &v1alpha1.Repository{
ObjectMeta: metav1.ObjectMeta{
Name: "repo1",
Namespace: "shared-ns",
},
Spec: v1alpha1.RepositorySpec{
ConcurrencyLimit: intPtr(1),
},
}
repo2 := &v1alpha1.Repository{
ObjectMeta: metav1.ObjectMeta{
Name: "repo2",
Namespace: "shared-ns",
},
Spec: v1alpha1.RepositorySpec{
ConcurrencyLimit: intPtr(1),
},
}
repo3 := &v1alpha1.Repository{
ObjectMeta: metav1.ObjectMeta{
Name: "repo3",
Namespace: "shared-ns",
},
Spec: v1alpha1.RepositorySpec{
ConcurrencyLimit: intPtr(1),
},
}

// Each repo has its own started PipelineRun
pr1 := newTestPR("pr1", time.Now(), startedLabel, map[string]string{
keys.ExecutionOrder: "shared-ns/pr1",
keys.State: kubeinteraction.StateStarted,
}, tektonv1.PipelineRunSpec{})
pr1.Namespace = "shared-ns"

pr2 := newTestPR("pr2", time.Now(), startedLabel, map[string]string{
keys.ExecutionOrder: "shared-ns/pr2",
keys.State: kubeinteraction.StateStarted,
}, tektonv1.PipelineRunSpec{})
pr2.Namespace = "shared-ns"

pr3 := newTestPR("pr3", time.Now(), startedLabel, map[string]string{
keys.ExecutionOrder: "shared-ns/pr3",
keys.State: kubeinteraction.StateStarted,
}, tektonv1.PipelineRunSpec{})
pr3.Namespace = "shared-ns"

tdata := testclient.Data{
Repositories: []*v1alpha1.Repository{repo1, repo2, repo3},
PipelineRuns: []*tektonv1.PipelineRun{pr1, pr2, pr3},
}
stdata, _ := testclient.SeedTestData(t, ctx, tdata)

qm := NewManager(logger)

err := qm.InitQueues(ctx, stdata.Pipeline, stdata.PipelineAsCode)
assert.NilError(t, err)

// Verify all three repos have their queues initialized
sema1 := qm.queueMap[RepoKey(repo1)]
assert.Assert(t, sema1 != nil, "repo1 should have queues")
assert.Equal(t, len(sema1.getCurrentRunning()), 1, "repo1 should have 1 running PR")

sema2 := qm.queueMap[RepoKey(repo2)]
assert.Assert(t, sema2 != nil, "repo2 should have queues")
assert.Equal(t, len(sema2.getCurrentRunning()), 1, "repo2 should have 1 running PR")

sema3 := qm.queueMap[RepoKey(repo3)]
assert.Assert(t, sema3 != nil, "repo3 should have queues")
assert.Equal(t, len(sema3.getCurrentRunning()), 1, "repo3 should have 1 running PR")

// Before optimization: 3 repos Γ— 2 states = 6 List() calls
// After optimization: 1 namespace Γ— 2 states = 2 List() calls
// (We can't directly measure API calls in this test, but the logic ensures deduplication)
}

func TestFilterPipelineRunByInProgress(t *testing.T) {
ctx, _ := rtesting.SetupFakeContext(t)
ns := "test-ns"
Expand Down
Loading
Loading