Skip to content

Commit dcbc311

Browse files
committed
test: validate pipeline run parallelism e2e
Signed-off-by: sduvvuri1603 <sduvvuri@redhat.com>
1 parent 8ccafa9 commit dcbc311

File tree

5 files changed

+117
-17
lines changed

5 files changed

+117
-17
lines changed

backend/test/end2end/pipeline_e2e_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -257,5 +257,6 @@ func validatePipelineRunSuccess(pipelineFile string, pipelineDir string, testCon
257257
}
258258
compiledWorkflow := workflowutils.UnmarshallWorkflowYAML(filepath.Join(testutil.GetCompiledWorkflowsFilesDir(), pipelineFile))
259259
e2e_utils.ValidateComponentStatuses(runClient, k8Client, testContext, createdRunID, compiledWorkflow)
260+
e2e_utils.ValidateParallelismIfConfigured(runClient, pipelineFilePath, createdRunID)
260261

261262
}

backend/test/end2end/utils/e2e_utils.go

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,15 @@ package utils
44
import (
55
"fmt"
66
"maps"
7+
"path/filepath"
78
"sort"
9+
"strings"
810
"time"
911

1012
runparams "github.com/kubeflow/pipelines/backend/api/v2beta1/go_http_client/run_client/run_service"
1113
"github.com/kubeflow/pipelines/backend/api/v2beta1/go_http_client/run_model"
1214
apiserver "github.com/kubeflow/pipelines/backend/src/common/client/api_server/v2"
15+
workflowutils "github.com/kubeflow/pipelines/backend/test/compiler/utils"
1316
"github.com/kubeflow/pipelines/backend/test/config"
1417
"github.com/kubeflow/pipelines/backend/test/logger"
1518
"github.com/kubeflow/pipelines/backend/test/testutil"
@@ -93,6 +96,91 @@ func ValidateComponentStatuses(runClient *apiserver.RunClient, k8Client *kuberne
9396

9497
}
9598

99+
// ValidateParallelismIfConfigured checks pipelineRunParallelism constraints when set.
100+
func ValidateParallelismIfConfigured(runClient *apiserver.RunClient, pipelineFilePath, runID string) {
101+
spec := testutil.ParseFileToSpecs(pipelineFilePath, false, nil)
102+
if spec == nil || spec.PlatformSpec() == nil {
103+
return
104+
}
105+
k8sSpec, ok := spec.PlatformSpec().GetPlatforms()["kubernetes"]
106+
if !ok || k8sSpec == nil {
107+
return
108+
}
109+
cfg := k8sSpec.GetPipelineConfig()
110+
if cfg == nil || cfg.PipelineRunParallelism == nil {
111+
return
112+
}
113+
limit := cfg.GetPipelineRunParallelism()
114+
115+
baseName := filepath.Base(pipelineFilePath)
116+
if ext := filepath.Ext(baseName); ext != "" {
117+
baseName = strings.TrimSuffix(baseName, ext)
118+
}
119+
compiledWorkflow := workflowutils.UnmarshallWorkflowYAML(
120+
filepath.Join(testutil.GetCompiledWorkflowsFilesDir(), baseName+".yaml"))
121+
componentTaskNames := map[string]struct{}{}
122+
for _, tmpl := range compiledWorkflow.Spec.Templates {
123+
if tmpl.DAG == nil {
124+
continue
125+
}
126+
for _, task := range tmpl.DAG.Tasks {
127+
if task.Template == "system-container-executor" {
128+
componentTaskNames[task.Name] = struct{}{}
129+
}
130+
}
131+
}
132+
if len(componentTaskNames) == 0 {
133+
return
134+
}
135+
136+
runIDCopy := runID
137+
runDetail := testutil.GetPipelineRun(runClient, &runIDCopy)
138+
gomega.Expect(runDetail).NotTo(gomega.BeNil())
139+
gomega.Expect(runDetail.RunDetails).NotTo(gomega.BeNil())
140+
141+
type event struct {
142+
at time.Time
143+
delta int
144+
}
145+
var events []event
146+
for _, task := range runDetail.RunDetails.TaskDetails {
147+
if task == nil {
148+
continue
149+
}
150+
name := task.DisplayName
151+
if name == "" {
152+
name = task.TaskID
153+
}
154+
if _, ok := componentTaskNames[name]; !ok {
155+
continue
156+
}
157+
start := time.Time(task.StartTime)
158+
if start.IsZero() {
159+
continue
160+
}
161+
events = append(events, event{at: start, delta: +1})
162+
end := time.Time(task.EndTime)
163+
if !end.IsZero() {
164+
events = append(events, event{at: end, delta: -1})
165+
}
166+
}
167+
if len(events) == 0 {
168+
return
169+
}
170+
sort.Slice(events, func(i, j int) bool {
171+
if events[i].at.Equal(events[j].at) {
172+
return events[i].delta < events[j].delta
173+
}
174+
return events[i].at.Before(events[j].at)
175+
})
176+
running := 0
177+
for _, ev := range events {
178+
running += ev.delta
179+
gomega.Expect(running).To(gomega.BeNumerically(">=", 0))
180+
gomega.Expect(running).To(gomega.BeNumerically("<=", limit))
181+
}
182+
}
183+
96184
// CapturePodLogsForUnsuccessfulTasks - Capture pod logs of a failed component
97185
func CapturePodLogsForUnsuccessfulTasks(k8Client *kubernetes.Clientset, testContext *apitests.TestContext, taskDetails []*run_model.V2beta1PipelineTaskDetail) {
98186
failedTasks := make(map[string]string)

test_data/compiled-workflows/pipeline_with_run_parallelism.yaml

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,9 @@ metadata:
66
spec:
77
arguments:
88
parameters:
9-
- name: components-00cdf4de1b0ff15d542d6cb54821aeebf3de511087a035956ba6381b97ace0a1
10-
value: '{"executorLabel":"exec-produce-message","inputDefinitions":{"parameters":{"msg":{"parameterType":"STRING"}}},"outputDefinitions":{"parameters":{"Output":{"parameterType":"STRING"}}}}'
11-
- name: implementations-00cdf4de1b0ff15d542d6cb54821aeebf3de511087a035956ba6381b97ace0a1
9+
- name: components-178514df38896ed194ce06e7bfe28c7b70b505ef42774615200c302567af37de
10+
value: '{"executorLabel":"exec-produce-message","inputDefinitions":{"parameters":{"msg":{"parameterType":"STRING"},"sleep_seconds":{"defaultValue":20,"isOptional":true,"parameterType":"NUMBER_INTEGER"}}},"outputDefinitions":{"parameters":{"Output":{"parameterType":"STRING"}}}}'
11+
- name: implementations-178514df38896ed194ce06e7bfe28c7b70b505ef42774615200c302567af37de
1212
value: '{"args":["--executor_input","{{$}}","--function_to_execute","produce_message"],"command":["sh","-c","\nif
1313
! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip || python3
1414
-m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1
@@ -18,12 +18,14 @@ spec:
1818
\u003e \"$program_path/ephemeral_component.py\"\n_KFP_RUNTIME=true python3
1919
-m kfp.dsl.executor_main --component_module_path \"$program_path/ephemeral_component.py\" \"$@\"\n","\nimport
2020
kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import *\n\ndef
21-
produce_message(msg: str) -\u003e str:\n print(msg)\n return msg\n\n"],"image":"python:3.11"}'
21+
produce_message(msg: str, sleep_seconds: int = 20) -\u003e str:\n import
22+
time\n\n print(f''Processing {msg}...'')\n time.sleep(sleep_seconds)\n return
23+
msg\n\n"],"image":"python:3.9"}'
2224
- name: components-comp-for-loop-2
2325
value: '{"dag":{"tasks":{"produce-message":{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-produce-message"},"inputs":{"parameters":{"msg":{"componentInputParameter":"pipelinechannel--loop-item-param-1"}}},"taskInfo":{"name":"produce-message"}}}},"inputDefinitions":{"parameters":{"pipelinechannel--loop-item-param-1":{"parameterType":"STRING"}}}}'
2426
- name: components-root
2527
value: '{"dag":{"tasks":{"for-loop-2":{"componentRef":{"name":"comp-for-loop-2"},"parameterIterator":{"itemInput":"pipelinechannel--loop-item-param-1","items":{"raw":"[\"one\",
26-
\"two\", \"three\"]"}},"taskInfo":{"name":"for-loop-2"}}}}}'
28+
\"two\", \"three\", \"four\", \"five\"]"}},"taskInfo":{"name":"for-loop-2"}}}}}'
2729
entrypoint: entrypoint
2830
parallelism: 2
2931
podMetadata:
@@ -213,11 +215,11 @@ spec:
213215
- arguments:
214216
parameters:
215217
- name: component
216-
value: '{{workflow.parameters.components-00cdf4de1b0ff15d542d6cb54821aeebf3de511087a035956ba6381b97ace0a1}}'
218+
value: '{{workflow.parameters.components-178514df38896ed194ce06e7bfe28c7b70b505ef42774615200c302567af37de}}'
217219
- name: task
218220
value: '{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-produce-message"},"inputs":{"parameters":{"msg":{"componentInputParameter":"pipelinechannel--loop-item-param-1"}}},"taskInfo":{"name":"produce-message"}}'
219221
- name: container
220-
value: '{{workflow.parameters.implementations-00cdf4de1b0ff15d542d6cb54821aeebf3de511087a035956ba6381b97ace0a1}}'
222+
value: '{{workflow.parameters.implementations-178514df38896ed194ce06e7bfe28c7b70b505ef42774615200c302567af37de}}'
221223
- name: task-name
222224
value: produce-message
223225
- name: parent-dag-id
@@ -337,7 +339,7 @@ spec:
337339
value: '{{inputs.parameters.parent-dag-id}}'
338340
- name: task
339341
value: '{"componentRef":{"name":"comp-for-loop-2"},"parameterIterator":{"itemInput":"pipelinechannel--loop-item-param-1","items":{"raw":"[\"one\",
340-
\"two\", \"three\"]"}},"taskInfo":{"name":"for-loop-2"}}'
342+
\"two\", \"three\", \"four\", \"five\"]"}},"taskInfo":{"name":"for-loop-2"}}'
341343
name: iteration-item-driver
342344
template: system-dag-driver
343345
- arguments:
@@ -366,7 +368,7 @@ spec:
366368
value: '{{inputs.parameters.parent-dag-id}}'
367369
- name: task
368370
value: '{"componentRef":{"name":"comp-for-loop-2"},"parameterIterator":{"itemInput":"pipelinechannel--loop-item-param-1","items":{"raw":"[\"one\",
369-
\"two\", \"three\"]"}},"taskInfo":{"name":"for-loop-2"}}'
371+
\"two\", \"three\", \"four\", \"five\"]"}},"taskInfo":{"name":"for-loop-2"}}'
370372
name: iteration-driver
371373
template: system-dag-driver
372374
- arguments:

test_data/sdk_compiled_pipelines/valid/essential/pipeline_with_run_parallelism.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,12 @@
1616
from kfp import compiler, dsl
1717

1818

19-
@dsl.component
20-
def produce_message(msg: str) -> str:
21-
print(msg)
19+
@dsl.component(base_image='python:3.9')
20+
def produce_message(msg: str, sleep_seconds: int = 20) -> str:
21+
import time
22+
23+
print(f'Processing {msg}...')
24+
time.sleep(sleep_seconds)
2225
return msg
2326

2427

@@ -27,7 +30,8 @@ def produce_message(msg: str) -> str:
2730
pipeline_config=dsl.PipelineConfig(pipeline_run_parallelism=2),
2831
)
2932
def pipeline_with_run_parallelism():
30-
with dsl.ParallelFor(items=['one', 'two', 'three']) as item:
33+
loop_args = ['one', 'two', 'three', 'four', 'five']
34+
with dsl.ParallelFor(items=loop_args) as item:
3135
produce_message(msg=item)
3236

3337

test_data/sdk_compiled_pipelines/valid/essential/pipeline_with_run_parallelism.yaml

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,10 @@ components:
2525
parameters:
2626
msg:
2727
parameterType: STRING
28+
sleep_seconds:
29+
defaultValue: 20.0
30+
isOptional: true
31+
parameterType: NUMBER_INTEGER
2832
outputDefinitions:
2933
parameters:
3034
Output:
@@ -57,9 +61,10 @@ deploymentSpec:
5761
5862
'
5963
- "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
60-
\ *\n\ndef produce_message(msg: str) -> str:\n print(msg)\n return\
61-
\ msg\n\n"
62-
image: python:3.11
64+
\ *\n\ndef produce_message(msg: str, sleep_seconds: int = 20) -> str:\n\
65+
\ import time\n\n print(f'Processing {msg}...')\n time.sleep(sleep_seconds)\n\
66+
\ return msg\n\n"
67+
image: python:3.9
6368
pipelineInfo:
6469
name: pipeline-with-run-parallelism
6570
root:
@@ -71,7 +76,7 @@ root:
7176
parameterIterator:
7277
itemInput: pipelinechannel--loop-item-param-1
7378
items:
74-
raw: '["one", "two", "three"]'
79+
raw: '["one", "two", "three", "four", "five"]'
7580
taskInfo:
7681
name: for-loop-2
7782
schemaVersion: 2.1.0

0 commit comments

Comments
 (0)