From 73307bc7fc10ce4a20d3c0638daeafb6583fe5a2 Mon Sep 17 00:00:00 2001 From: sduvvuri1603 Date: Wed, 12 Nov 2025 16:17:06 -0500 Subject: [PATCH 01/10] feat: add pipeline run parallelism config Signed-off-by: sduvvuri1603 --- README.md | 2 + .../go/pipelinespec/pipeline_spec.pb.go | 24 ++++-- api/v2alpha1/pipeline_spec.proto | 4 + .../src/apiserver/client/kubernetes_core.go | 5 ++ .../apiserver/client/kubernetes_core_fake.go | 33 ++++++-- .../apiserver/resource/resource_manager.go | 72 ++++++++++++++++++ .../resource/resource_manager_test.go | 68 +++++++++++++++++ backend/src/cache/client/kubernetes_core.go | 5 ++ .../src/cache/client/kubernetes_core_fake.go | 23 +++++- backend/src/v2/compiler/argocompiler/argo.go | 11 +++ .../generated-1791485/pipeline_spec.pb | Bin 262 -> 262 bytes .../generated-1791485/pipeline_version.pb | Bin 1678 -> 1678 bytes .../run_completed_with_spec.pb | Bin 1537 -> 1537 bytes .../pipeline/ml-pipeline-apiserver-role.yaml | 10 +++ proposals/11875-pipeline-workspace/README.md | 8 +- sdk/python/kfp/compiler/compiler_test.py | 4 +- .../kfp/compiler/pipeline_spec_builder.py | 4 + sdk/python/kfp/dsl/pipeline_config.py | 22 +++++- ...ith_string_machine_fields_task_output.yaml | 6 +- .../pipeline_with_workspace.yaml | 21 ++--- .../valid/critical/pipeline_with_workspace.py | 1 + .../critical/pipeline_with_workspace.yaml | 7 +- 22 files changed, 296 insertions(+), 34 deletions(-) diff --git a/README.md b/README.md index 7be81b1d4bb..91b6006eacf 100644 --- a/README.md +++ b/README.md @@ -42,6 +42,8 @@ See the Kubeflow [Pipelines API doc](https://www.kubeflow.org/docs/components/pi Consult the [Python SDK reference docs](https://kubeflow-pipelines.readthedocs.io/en/stable/) when writing pipelines using the Python SDK. +> New in master: `dsl.PipelineConfig` now accepts an optional `pipeline_run_parallelism` integer to cap concurrent task execution for a run. The backend stores the requested limit in a shared ConfigMap and surfaces it to Argo Workflows via `spec.parallelism`. + ## Deep Wiki Check out our AI Powered repo documentation on [DeepWiki](https://deepwiki.com/kubeflow/pipelines). diff --git a/api/v2alpha1/go/pipelinespec/pipeline_spec.pb.go b/api/v2alpha1/go/pipelinespec/pipeline_spec.pb.go index e91a354bb7d..279422b3f52 100644 --- a/api/v2alpha1/go/pipelinespec/pipeline_spec.pb.go +++ b/api/v2alpha1/go/pipelinespec/pipeline_spec.pb.go @@ -2760,9 +2760,12 @@ type PipelineConfig struct { ResourceTtl int32 `protobuf:"varint,1,opt,name=resource_ttl,json=resourceTtl,proto3" json:"resource_ttl,omitempty"` // Configuration for a shared storage workspace that persists for the duration of the pipeline run. // The workspace can be configured with size and Kubernetes-specific settings to override default PVC configurations. - Workspace *WorkspaceConfig `protobuf:"bytes,2,opt,name=workspace,proto3,oneof" json:"workspace,omitempty"` - unknownFields protoimpl.UnknownFields - sizeCache protoimpl.SizeCache + Workspace *WorkspaceConfig `protobuf:"bytes,2,opt,name=workspace,proto3,oneof" json:"workspace,omitempty"` + // Maximum number of tasks that can be scheduled simultaneously for a single + // pipeline run. + PipelineRunParallelism *int32 `protobuf:"varint,3,opt,name=pipeline_run_parallelism,json=pipelineRunParallelism,proto3,oneof" json:"pipeline_run_parallelism,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache } func (x *PipelineConfig) Reset() { @@ -2809,6 +2812,13 @@ func (x *PipelineConfig) GetWorkspace() *WorkspaceConfig { return nil } +func (x *PipelineConfig) GetPipelineRunParallelism() int32 { + if x != nil && x.PipelineRunParallelism != nil { + return *x.PipelineRunParallelism + } + return 0 +} + // The runtime config of a PipelineJob. type PipelineJob_RuntimeConfig struct { state protoimpl.MessageState `protogen:"open.v1"` @@ -6099,12 +6109,14 @@ const file_pipeline_spec_proto_rawDesc = "" + "\v_kubernetes\"r\n" + "\x19KubernetesWorkspaceConfig\x12B\n" + "\x0epvc_spec_patch\x18\x01 \x01(\v2\x17.google.protobuf.StructH\x00R\fpvcSpecPatch\x88\x01\x01B\x11\n" + - "\x0f_pvc_spec_patch\"\x83\x01\n" + + "\x0f_pvc_spec_patch\"\xdf\x01\n" + "\x0ePipelineConfig\x12!\n" + "\fresource_ttl\x18\x01 \x01(\x05R\vresourceTtl\x12@\n" + - "\tworkspace\x18\x02 \x01(\v2\x1d.ml_pipelines.WorkspaceConfigH\x00R\tworkspace\x88\x01\x01B\f\n" + + "\tworkspace\x18\x02 \x01(\v2\x1d.ml_pipelines.WorkspaceConfigH\x00R\tworkspace\x88\x01\x01\x12=\n" + + "\x18pipeline_run_parallelism\x18\x03 \x01(\x05H\x01R\x16pipelineRunParallelism\x88\x01\x01B\f\n" + "\n" + - "_workspaceB 0 && run.PipelineVersionId != "" { + if err := r.upsertPipelineParallelismConfigMap( + ctx, k8sNamespace, run.PipelineVersionId, parallelism); err != nil { + if apierrors.IsForbidden(errors.Cause(err)) { + glog.Warningf("Skipping pipeline_run_parallelism ConfigMap update in namespace %q: %v", k8sNamespace, err) + } else { + return nil, util.Wrap(err, "Failed to persist pipeline_run_parallelism configuration") + } + } + } + // assign OwnerReference to scheduledworkflow if run.RecurringRunId != "" { job, err := r.jobStore.GetJob(run.RecurringRunId) diff --git a/backend/src/apiserver/resource/resource_manager_test.go b/backend/src/apiserver/resource/resource_manager_test.go index d928fbde359..23bd0a56729 100644 --- a/backend/src/apiserver/resource/resource_manager_test.go +++ b/backend/src/apiserver/resource/resource_manager_test.go @@ -1936,6 +1936,66 @@ func TestCreateRun_ThroughPipelineVersion(t *testing.T) { assert.Equal(t, expectedRunDetail.ToV1(), runDetail.ToV1(), "CreateRun stored invalid data in database") } +func TestCreateRun_PipelineRunParallelismConfigMap(t *testing.T) { + initEnvVars() + + store := NewFakeClientManagerOrFatal(util.NewFakeTimeForEpoch()) + defer store.Close() + + manager := NewResourceManager(store, &ResourceManagerOptions{CollectMetrics: false}) + + pipeline, err := manager.CreatePipeline(createPipeline("parallelism-pipeline", "", "kubeflow")) + assert.NoError(t, err) + + pipelineStore, ok := store.pipelineStore.(*storage.PipelineStore) + assert.True(t, ok) + pipelineStore.SetUUIDGenerator(util.NewFakeUUIDGeneratorOrFatal(FakeUUIDOne, nil)) + + pv := createPipelineVersion( + pipeline.UUID, + "parallelism-version", + "", + "", + v2SpecHelloWorldWithParallelism, + "", + "kubeflow", + ) + version, err := manager.CreatePipelineVersion(pv) + assert.NoError(t, err) + + run := &model.Run{ + DisplayName: "run-with-parallelism", + Namespace: "kubeflow", + PipelineSpec: model.PipelineSpec{ + PipelineId: pipeline.UUID, + PipelineVersionId: version.UUID, + RuntimeConfig: model.RuntimeConfig{ + Parameters: "{\"text\":\"hello\"}", + }, + }, + } + + createdRun, err := manager.CreateRun(context.Background(), run) + assert.NoError(t, err) + assert.NotNil(t, createdRun) + + configMap, err := store.k8sCoreClientFake.ConfigMapClient("kubeflow").Get( + context.Background(), pipelineParallelismConfigMapName, v1.GetOptions{}) + assert.NoError(t, err) + if assert.NotNil(t, configMap.Data) { + assert.Equal(t, "5", configMap.Data[version.UUID]) + } + + execSpec, err := store.ExecClientFake.Execution("kubeflow").Get( + context.Background(), createdRun.K8SName, v1.GetOptions{}) + assert.NoError(t, err) + workflow, ok := execSpec.(*util.Workflow) + assert.True(t, ok) + if assert.NotNil(t, workflow.Spec.Parallelism) { + assert.Equal(t, int64(5), *workflow.Spec.Parallelism) + } +} + func TestCreateRun_ThroughPipelineIdAndPipelineVersion(t *testing.T) { // Create experiment, pipeline, and pipeline version. store, manager, experiment, pipeline, _ := initWithExperimentAndPipeline(t) @@ -4047,6 +4107,14 @@ schemaVersion: 2.1.0 sdkVersion: kfp-1.6.5 ` +var v2SpecHelloWorldWithParallelism = v2SpecHelloWorld + ` +--- +platforms: + kubernetes: + pipelineConfig: + pipelineRunParallelism: 5 +` + var v2SpecHelloWorldMutated = ` components: comp-hello-world: diff --git a/backend/src/cache/client/kubernetes_core.go b/backend/src/cache/client/kubernetes_core.go index f88a1f70cc5..d091a3284ca 100644 --- a/backend/src/cache/client/kubernetes_core.go +++ b/backend/src/cache/client/kubernetes_core.go @@ -13,6 +13,7 @@ import ( type KubernetesCoreInterface interface { PodClient(namespace string) v1.PodInterface + ConfigMapClient(namespace string) v1.ConfigMapInterface } type KubernetesCore struct { @@ -23,6 +24,10 @@ func (c *KubernetesCore) PodClient(namespace string) v1.PodInterface { return c.coreV1Client.Pods(namespace) } +func (c *KubernetesCore) ConfigMapClient(namespace string) v1.ConfigMapInterface { + return c.coreV1Client.ConfigMaps(namespace) +} + func createKubernetesCore(clientParams util.ClientParameters) (KubernetesCoreInterface, error) { restConfig, err := util.GetKubernetesConfig() if err != nil { diff --git a/backend/src/cache/client/kubernetes_core_fake.go b/backend/src/cache/client/kubernetes_core_fake.go index b6b3c1d147d..5dfad2d2ef0 100644 --- a/backend/src/cache/client/kubernetes_core_fake.go +++ b/backend/src/cache/client/kubernetes_core_fake.go @@ -20,11 +20,13 @@ import ( "github.com/kubeflow/pipelines/backend/src/common/util" policyv1 "k8s.io/api/policy/v1" policyv1beta1 "k8s.io/api/policy/v1beta1" + kubernetesfake "k8s.io/client-go/kubernetes/fake" v1 "k8s.io/client-go/kubernetes/typed/core/v1" ) type FakeKuberneteCoreClient struct { podClientFake *FakePodClient + coreClient v1.CoreV1Interface } func (c *FakeKuberneteCoreClient) PodClient(namespace string) v1.PodInterface { @@ -34,22 +36,39 @@ func (c *FakeKuberneteCoreClient) PodClient(namespace string) v1.PodInterface { return c.podClientFake } +func (c *FakeKuberneteCoreClient) ConfigMapClient(namespace string) v1.ConfigMapInterface { + return c.coreClient.ConfigMaps(namespace) +} + func NewFakeKuberneteCoresClient() *FakeKuberneteCoreClient { - return &FakeKuberneteCoreClient{&FakePodClient{}} + clientset := kubernetesfake.NewSimpleClientset() + return &FakeKuberneteCoreClient{ + podClientFake: &FakePodClient{}, + coreClient: clientset.CoreV1(), + } } type FakeKubernetesCoreClientWithBadPodClient struct { podClientFake *FakeBadPodClient + coreClient v1.CoreV1Interface } func NewFakeKubernetesCoreClientWithBadPodClient() *FakeKubernetesCoreClientWithBadPodClient { - return &FakeKubernetesCoreClientWithBadPodClient{&FakeBadPodClient{}} + clientset := kubernetesfake.NewSimpleClientset() + return &FakeKubernetesCoreClientWithBadPodClient{ + podClientFake: &FakeBadPodClient{}, + coreClient: clientset.CoreV1(), + } } func (c *FakeKubernetesCoreClientWithBadPodClient) PodClient(namespace string) v1.PodInterface { return c.podClientFake } +func (c *FakeKubernetesCoreClientWithBadPodClient) ConfigMapClient(namespace string) v1.ConfigMapInterface { + return c.coreClient.ConfigMaps(namespace) +} + func (c *FakePodClient) EvictV1(context.Context, *policyv1.Eviction) error { return nil } diff --git a/backend/src/v2/compiler/argocompiler/argo.go b/backend/src/v2/compiler/argocompiler/argo.go index 55e80c03914..c3b79820435 100644 --- a/backend/src/v2/compiler/argocompiler/argo.go +++ b/backend/src/v2/compiler/argocompiler/argo.go @@ -150,6 +150,17 @@ func Compile(jobArg *pipelinespec.PipelineJob, kubernetesSpecArg *pipelinespec.S }, } + if hasPipelineConfig { + pipelineConfig := kubernetesSpec.GetPipelineConfig() + if pipelineConfig.PipelineRunParallelism != nil { + value := pipelineConfig.GetPipelineRunParallelism() + if value <= 0 { + return nil, fmt.Errorf("pipelineRunParallelism must be greater than 0, got %d", value) + } + wf.Spec.Parallelism = util.Int64Pointer(int64(value)) + } + } + runAsUser := GetPipelineRunAsUser() if runAsUser != nil { wf.Spec.SecurityContext = &k8score.PodSecurityContext{RunAsUser: runAsUser} diff --git a/backend/test/proto_tests/testdata/generated-1791485/pipeline_spec.pb b/backend/test/proto_tests/testdata/generated-1791485/pipeline_spec.pb index 4f4f8023f0559da8109a6c8b7f3c8ab07d3c26cb..9e0d394e65b5dbdd609205ac97fbd101179de694 100644 GIT binary patch delta 42 wcmZo;YGa!4SlLjBNq||1my12Ww4|W4#0bb`QsOPj&o9wUtw>ESEy*te0Og4b#{d8T delta 42 wcmZo;YGa!4SlLL3Nq|X+my12Ww4|W4#1P13R^lzn&o9wUtw>ESEy*te0Og4b#{d8T diff --git a/backend/test/proto_tests/testdata/generated-1791485/pipeline_version.pb b/backend/test/proto_tests/testdata/generated-1791485/pipeline_version.pb index 7ffd67ca12b800e0be6358ae8787cc980256c764..8eca2ec68b756e8e8f2e73e86e0f608bb0972c2b 100644 GIT binary patch delta 18 acmeCDS|`i1 Optional[int]: + return self._pipeline_run_parallelism + + @pipeline_run_parallelism.setter + def pipeline_run_parallelism(self, value: Optional[int]) -> None: # pylint: disable=attribute-defined-outside-init + if value is None: + self._pipeline_run_parallelism = None + return + if not isinstance(value, int): + raise ValueError( + 'pipeline_run_parallelism must be an integer if specified.') + if value <= 0: + raise ValueError( + 'pipeline_run_parallelism must be a positive integer.') + self._pipeline_run_parallelism = value diff --git a/test_data/compiled-workflows/pipeline_with_string_machine_fields_task_output.yaml b/test_data/compiled-workflows/pipeline_with_string_machine_fields_task_output.yaml index 99ae8ff5498..b8e6a4cd3cd 100644 --- a/test_data/compiled-workflows/pipeline_with_string_machine_fields_task_output.yaml +++ b/test_data/compiled-workflows/pipeline_with_string_machine_fields_task_output.yaml @@ -72,7 +72,7 @@ spec: kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import *\n\ndef sum_numbers(a: int, b: int) -\u003e int:\n return a + b\n\n"],"image":"python:3.11","resources":{"accelerator":{"resourceCount":"{{$.inputs.parameters[''pipelinechannel--accelerator-limit-Output'']}}","resourceType":"{{$.inputs.parameters[''pipelinechannel--accelerator-type-Output'']}}"},"resourceCpuLimit":"{{$.inputs.parameters[''pipelinechannel--cpu-limit-Output'']}}","resourceMemoryLimit":"{{$.inputs.parameters[''pipelinechannel--memory-limit-Output'']}}"}}' - name: components-root - value: '{"dag":{"tasks":{"accelerator-limit":{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-accelerator-limit"},"taskInfo":{"name":"accelerator-limit"}},"accelerator-type":{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-accelerator-type"},"taskInfo":{"name":"accelerator-type"}},"cpu-limit":{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-cpu-limit"},"taskInfo":{"name":"cpu-limit"}},"memory-limit":{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-memory-limit"},"taskInfo":{"name":"memory-limit"}},"sum-numbers":{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-sum-numbers"},"dependentTasks":["accelerator-type","accelerator-limit","cpu-limit","memory-limit"],"inputs":{"parameters":{"a":{"runtimeValue":{"constant":1}},"accelerator_count":{"runtimeValue":{"constant":"{{$.inputs.parameters[''pipelinechannel--accelerator-limit-Output'']}}"}},"accelerator_type":{"runtimeValue":{"constant":"{{$.inputs.parameters[''pipelinechannel--accelerator-type-Output'']}}"}},"b":{"runtimeValue":{"constant":2}},"cpu_limit":{"runtimeValue":{"constant":"{{$.inputs.parameters[''pipelinechannel--cpu-limit-Output'']}}"}},"memory_limit":{"runtimeValue":{"constant":"{{$.inputs.parameters[''pipelinechannel--memory-limit-Output'']}}"}},"pipelinechannel--accelerator-limit-Output":{"taskOutputParameter":{"outputParameterKey":"Output","producerTask":"accelerator-limit"}},"pipelinechannel--accelerator-type-Output":{"taskOutputParameter":{"outputParameterKey":"Output","producerTask":"accelerator-type"}},"pipelinechannel--cpu-limit-Output":{"taskOutputParameter":{"outputParameterKey":"Output","producerTask":"cpu-limit"}},"pipelinechannel--memory-limit-Output":{"taskOutputParameter":{"outputParameterKey":"Output","producerTask":"memory-limit"}}}},"taskInfo":{"name":"sum-numbers"}}}}}' + value: '{"dag":{"tasks":{"accelerator-limit":{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-accelerator-limit"},"taskInfo":{"name":"accelerator-limit"}},"accelerator-type":{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-accelerator-type"},"taskInfo":{"name":"accelerator-type"}},"cpu-limit":{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-cpu-limit"},"taskInfo":{"name":"cpu-limit"}},"memory-limit":{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-memory-limit"},"taskInfo":{"name":"memory-limit"}},"sum-numbers":{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-sum-numbers"},"dependentTasks":["accelerator-limit","cpu-limit","accelerator-type","memory-limit"],"inputs":{"parameters":{"a":{"runtimeValue":{"constant":1}},"accelerator_count":{"runtimeValue":{"constant":"{{$.inputs.parameters[''pipelinechannel--accelerator-limit-Output'']}}"}},"accelerator_type":{"runtimeValue":{"constant":"{{$.inputs.parameters[''pipelinechannel--accelerator-type-Output'']}}"}},"b":{"runtimeValue":{"constant":2}},"cpu_limit":{"runtimeValue":{"constant":"{{$.inputs.parameters[''pipelinechannel--cpu-limit-Output'']}}"}},"memory_limit":{"runtimeValue":{"constant":"{{$.inputs.parameters[''pipelinechannel--memory-limit-Output'']}}"}},"pipelinechannel--accelerator-limit-Output":{"taskOutputParameter":{"outputParameterKey":"Output","producerTask":"accelerator-limit"}},"pipelinechannel--accelerator-type-Output":{"taskOutputParameter":{"outputParameterKey":"Output","producerTask":"accelerator-type"}},"pipelinechannel--cpu-limit-Output":{"taskOutputParameter":{"outputParameterKey":"Output","producerTask":"cpu-limit"}},"pipelinechannel--memory-limit-Output":{"taskOutputParameter":{"outputParameterKey":"Output","producerTask":"memory-limit"}}}},"taskInfo":{"name":"sum-numbers"}}}}}' entrypoint: entrypoint podMetadata: annotations: @@ -355,14 +355,14 @@ spec: - name: component value: '{{workflow.parameters.components-49f9a898b718a077f30b7fd8c02d39767cff91ff0bbda4379daf866a91dbdb1b}}' - name: task - value: '{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-sum-numbers"},"dependentTasks":["accelerator-type","accelerator-limit","cpu-limit","memory-limit"],"inputs":{"parameters":{"a":{"runtimeValue":{"constant":1}},"accelerator_count":{"runtimeValue":{"constant":"{{$.inputs.parameters[''pipelinechannel--accelerator-limit-Output'']}}"}},"accelerator_type":{"runtimeValue":{"constant":"{{$.inputs.parameters[''pipelinechannel--accelerator-type-Output'']}}"}},"b":{"runtimeValue":{"constant":2}},"cpu_limit":{"runtimeValue":{"constant":"{{$.inputs.parameters[''pipelinechannel--cpu-limit-Output'']}}"}},"memory_limit":{"runtimeValue":{"constant":"{{$.inputs.parameters[''pipelinechannel--memory-limit-Output'']}}"}},"pipelinechannel--accelerator-limit-Output":{"taskOutputParameter":{"outputParameterKey":"Output","producerTask":"accelerator-limit"}},"pipelinechannel--accelerator-type-Output":{"taskOutputParameter":{"outputParameterKey":"Output","producerTask":"accelerator-type"}},"pipelinechannel--cpu-limit-Output":{"taskOutputParameter":{"outputParameterKey":"Output","producerTask":"cpu-limit"}},"pipelinechannel--memory-limit-Output":{"taskOutputParameter":{"outputParameterKey":"Output","producerTask":"memory-limit"}}}},"taskInfo":{"name":"sum-numbers"}}' + value: '{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-sum-numbers"},"dependentTasks":["accelerator-limit","cpu-limit","accelerator-type","memory-limit"],"inputs":{"parameters":{"a":{"runtimeValue":{"constant":1}},"accelerator_count":{"runtimeValue":{"constant":"{{$.inputs.parameters[''pipelinechannel--accelerator-limit-Output'']}}"}},"accelerator_type":{"runtimeValue":{"constant":"{{$.inputs.parameters[''pipelinechannel--accelerator-type-Output'']}}"}},"b":{"runtimeValue":{"constant":2}},"cpu_limit":{"runtimeValue":{"constant":"{{$.inputs.parameters[''pipelinechannel--cpu-limit-Output'']}}"}},"memory_limit":{"runtimeValue":{"constant":"{{$.inputs.parameters[''pipelinechannel--memory-limit-Output'']}}"}},"pipelinechannel--accelerator-limit-Output":{"taskOutputParameter":{"outputParameterKey":"Output","producerTask":"accelerator-limit"}},"pipelinechannel--accelerator-type-Output":{"taskOutputParameter":{"outputParameterKey":"Output","producerTask":"accelerator-type"}},"pipelinechannel--cpu-limit-Output":{"taskOutputParameter":{"outputParameterKey":"Output","producerTask":"cpu-limit"}},"pipelinechannel--memory-limit-Output":{"taskOutputParameter":{"outputParameterKey":"Output","producerTask":"memory-limit"}}}},"taskInfo":{"name":"sum-numbers"}}' - name: container value: '{{workflow.parameters.implementations-49f9a898b718a077f30b7fd8c02d39767cff91ff0bbda4379daf866a91dbdb1b}}' - name: task-name value: sum-numbers - name: parent-dag-id value: '{{inputs.parameters.parent-dag-id}}' - depends: accelerator-type.Succeeded && accelerator-limit.Succeeded && cpu-limit.Succeeded + depends: accelerator-limit.Succeeded && cpu-limit.Succeeded && accelerator-type.Succeeded && memory-limit.Succeeded name: sum-numbers-driver template: system-container-driver diff --git a/test_data/compiled-workflows/pipeline_with_workspace.yaml b/test_data/compiled-workflows/pipeline_with_workspace.yaml index afb87063841..30da88b7d23 100644 --- a/test_data/compiled-workflows/pipeline_with_workspace.yaml +++ b/test_data/compiled-workflows/pipeline_with_workspace.yaml @@ -6,13 +6,13 @@ metadata: spec: arguments: parameters: - - name: components-099322c2301dec986b734828a313953d589b6759b84e2204d3be55050a87abf3 + - name: components-fdb9ebacd988353f9117beea48067da4e44b4484a444ad8f093929476660be50 value: '{"executorLabel":"exec-read-from-workspace","inputDefinitions":{"parameters":{"file_path":{"parameterType":"STRING"}}},"outputDefinitions":{"parameters":{"Output":{"parameterType":"STRING"}}}}' - - name: implementations-099322c2301dec986b734828a313953d589b6759b84e2204d3be55050a87abf3 + - name: implementations-fdb9ebacd988353f9117beea48067da4e44b4484a444ad8f093929476660be50 value: '{"args":["--executor_input","{{$}}","--function_to_execute","read_from_workspace"],"command":["sh","-c","\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip || python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1 - python3 -m pip install --quiet --no-warn-script-location ''kfp==2.14.3'' ''--no-deps'' + python3 -m pip install --quiet --no-warn-script-location ''kfp==2.14.6'' ''--no-deps'' ''typing-extensions\u003e=3.7.4,\u003c5; python_version\u003c\"3.9\"'' \u0026\u0026 \"$0\" \"$@\"\n","sh","-ec","program_path=$(mktemp -d)\n\nprintf \"%s\" \"$0\" \u003e \"$program_path/ephemeral_component.py\"\n_KFP_RUNTIME=true python3 @@ -25,13 +25,13 @@ spec: {content}\")\n assert content == \"Hello from workspace!\"\n return content\n else:\n print(f\"File not found at: {file_path}\")\n return \"File not found\"\n\n"],"image":"python:3.11"}' - - name: components-d94cb6155685654cc8749b6be9ebf3fb71fe13a85b91fd244acdd09265f42081 + - name: components-8547baf43414445c3b300de6c0e4bea20119f0d9de05c3af3f5dfcb181cc9bad value: '{"executorLabel":"exec-write-to-workspace","inputDefinitions":{"parameters":{"workspace_path":{"parameterType":"STRING"}}},"outputDefinitions":{"parameters":{"Output":{"parameterType":"STRING"}}}}' - - name: implementations-d94cb6155685654cc8749b6be9ebf3fb71fe13a85b91fd244acdd09265f42081 + - name: implementations-8547baf43414445c3b300de6c0e4bea20119f0d9de05c3af3f5dfcb181cc9bad value: '{"args":["--executor_input","{{$}}","--function_to_execute","write_to_workspace"],"command":["sh","-c","\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip || python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1 - python3 -m pip install --quiet --no-warn-script-location ''kfp==2.14.3'' ''--no-deps'' + python3 -m pip install --quiet --no-warn-script-location ''kfp==2.14.6'' ''--no-deps'' ''typing-extensions\u003e=3.7.4,\u003c5; python_version\u003c\"3.9\"'' \u0026\u0026 \"$0\" \"$@\"\n","sh","-ec","program_path=$(mktemp -d)\n\nprintf \"%s\" \"$0\" \u003e \"$program_path/ephemeral_component.py\"\n_KFP_RUNTIME=true python3 @@ -46,6 +46,7 @@ spec: - name: components-root value: '{"dag":{"outputs":{"parameters":{"Output":{"valueFromParameter":{"outputParameterKey":"Output","producerSubtask":"read-from-workspace"}}}},"tasks":{"read-from-workspace":{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-read-from-workspace"},"dependentTasks":["write-to-workspace"],"inputs":{"parameters":{"file_path":{"taskOutputParameter":{"outputParameterKey":"Output","producerTask":"write-to-workspace"}}}},"taskInfo":{"name":"read-from-workspace"}},"write-to-workspace":{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-write-to-workspace"},"inputs":{"parameters":{"workspace_path":{"runtimeValue":{"constant":"{{$.workspace_path}}"}}}},"taskInfo":{"name":"write-to-workspace"}}}},"outputDefinitions":{"parameters":{"Output":{"parameterType":"STRING"}}}}' entrypoint: entrypoint + parallelism: 3 podMetadata: annotations: pipelines.kubeflow.org/v2_component: "true" @@ -229,11 +230,11 @@ spec: - arguments: parameters: - name: component - value: '{{workflow.parameters.components-099322c2301dec986b734828a313953d589b6759b84e2204d3be55050a87abf3}}' + value: '{{workflow.parameters.components-fdb9ebacd988353f9117beea48067da4e44b4484a444ad8f093929476660be50}}' - name: task value: '{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-read-from-workspace"},"dependentTasks":["write-to-workspace"],"inputs":{"parameters":{"file_path":{"taskOutputParameter":{"outputParameterKey":"Output","producerTask":"write-to-workspace"}}}},"taskInfo":{"name":"read-from-workspace"}}' - name: container - value: '{{workflow.parameters.implementations-099322c2301dec986b734828a313953d589b6759b84e2204d3be55050a87abf3}}' + value: '{{workflow.parameters.implementations-fdb9ebacd988353f9117beea48067da4e44b4484a444ad8f093929476660be50}}' - name: task-name value: read-from-workspace - name: parent-dag-id @@ -254,11 +255,11 @@ spec: - arguments: parameters: - name: component - value: '{{workflow.parameters.components-d94cb6155685654cc8749b6be9ebf3fb71fe13a85b91fd244acdd09265f42081}}' + value: '{{workflow.parameters.components-8547baf43414445c3b300de6c0e4bea20119f0d9de05c3af3f5dfcb181cc9bad}}' - name: task value: '{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-write-to-workspace"},"inputs":{"parameters":{"workspace_path":{"runtimeValue":{"constant":"{{$.workspace_path}}"}}}},"taskInfo":{"name":"write-to-workspace"}}' - name: container - value: '{{workflow.parameters.implementations-d94cb6155685654cc8749b6be9ebf3fb71fe13a85b91fd244acdd09265f42081}}' + value: '{{workflow.parameters.implementations-8547baf43414445c3b300de6c0e4bea20119f0d9de05c3af3f5dfcb181cc9bad}}' - name: task-name value: write-to-workspace - name: parent-dag-id diff --git a/test_data/sdk_compiled_pipelines/valid/critical/pipeline_with_workspace.py b/test_data/sdk_compiled_pipelines/valid/critical/pipeline_with_workspace.py index 0f8cead6df5..ccf6f0c20cb 100644 --- a/test_data/sdk_compiled_pipelines/valid/critical/pipeline_with_workspace.py +++ b/test_data/sdk_compiled_pipelines/valid/critical/pipeline_with_workspace.py @@ -53,6 +53,7 @@ def read_from_workspace(file_path: str) -> str: name="pipeline-with-workspace", description="A pipeline that demonstrates workspace functionality", pipeline_config=dsl.PipelineConfig( + pipeline_run_parallelism=3, workspace=dsl.WorkspaceConfig( size='1Gi', kubernetes=dsl.KubernetesWorkspaceConfig( diff --git a/test_data/sdk_compiled_pipelines/valid/critical/pipeline_with_workspace.yaml b/test_data/sdk_compiled_pipelines/valid/critical/pipeline_with_workspace.yaml index 0b7297dc057..ddb3c15fba5 100644 --- a/test_data/sdk_compiled_pipelines/valid/critical/pipeline_with_workspace.yaml +++ b/test_data/sdk_compiled_pipelines/valid/critical/pipeline_with_workspace.yaml @@ -38,7 +38,7 @@ deploymentSpec: - -c - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ - \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.14.3'\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.14.6'\ \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ $0\" \"$@\"\n" - sh @@ -73,7 +73,7 @@ deploymentSpec: - -c - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ - \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.14.3'\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.14.6'\ \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ $0\" \"$@\"\n" - sh @@ -138,11 +138,12 @@ root: Output: parameterType: STRING schemaVersion: 2.1.0 -sdkVersion: kfp-2.14.3 +sdkVersion: kfp-2.14.6 --- platforms: kubernetes: pipelineConfig: + pipelineRunParallelism: 3 workspace: kubernetes: pvcSpecPatch: From f7e180ea22c3f02ede4b9973403a6dc336efbac2 Mon Sep 17 00:00:00 2001 From: sduvvuri1603 Date: Fri, 14 Nov 2025 16:24:44 -0500 Subject: [PATCH 02/10] Add cluster RBAC for pipeline parallelism configmap Signed-off-by: sduvvuri1603 --- .../cluster-scoped/kustomization.yaml | 2 ++ ...ipeline-configmap-manager-clusterrole.yaml | 27 +++++++++++++++++++ ...-configmap-manager-clusterrolebinding.yaml | 15 +++++++++++ 3 files changed, 44 insertions(+) create mode 100644 manifests/kustomize/base/pipeline/cluster-scoped/ml-pipeline-configmap-manager-clusterrole.yaml create mode 100644 manifests/kustomize/base/pipeline/cluster-scoped/ml-pipeline-configmap-manager-clusterrolebinding.yaml diff --git a/manifests/kustomize/base/pipeline/cluster-scoped/kustomization.yaml b/manifests/kustomize/base/pipeline/cluster-scoped/kustomization.yaml index 9a92c2ced66..ed4bf51c736 100644 --- a/manifests/kustomize/base/pipeline/cluster-scoped/kustomization.yaml +++ b/manifests/kustomize/base/pipeline/cluster-scoped/kustomization.yaml @@ -3,3 +3,5 @@ kind: Kustomization resources: - scheduled-workflow-crd.yaml - viewer-crd.yaml +- ml-pipeline-configmap-manager-clusterrole.yaml +- ml-pipeline-configmap-manager-clusterrolebinding.yaml diff --git a/manifests/kustomize/base/pipeline/cluster-scoped/ml-pipeline-configmap-manager-clusterrole.yaml b/manifests/kustomize/base/pipeline/cluster-scoped/ml-pipeline-configmap-manager-clusterrole.yaml new file mode 100644 index 00000000000..e42b3295a1f --- /dev/null +++ b/manifests/kustomize/base/pipeline/cluster-scoped/ml-pipeline-configmap-manager-clusterrole.yaml @@ -0,0 +1,27 @@ +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + labels: + app: ml-pipeline + name: ml-pipeline-configmap-manager +rules: +- apiGroups: + - "" + resources: + - configmaps + verbs: + - create +- apiGroups: + - "" + resourceNames: + - kfp-pipeline-config + resources: + - configmaps + verbs: + - get + - list + - watch + - update + - patch + - delete + diff --git a/manifests/kustomize/base/pipeline/cluster-scoped/ml-pipeline-configmap-manager-clusterrolebinding.yaml b/manifests/kustomize/base/pipeline/cluster-scoped/ml-pipeline-configmap-manager-clusterrolebinding.yaml new file mode 100644 index 00000000000..bb74738fb78 --- /dev/null +++ b/manifests/kustomize/base/pipeline/cluster-scoped/ml-pipeline-configmap-manager-clusterrolebinding.yaml @@ -0,0 +1,15 @@ +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRoleBinding +metadata: + labels: + app: ml-pipeline + name: ml-pipeline-configmap-manager +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: ml-pipeline-configmap-manager +subjects: +- kind: ServiceAccount + name: ml-pipeline + namespace: kubeflow + From c587b039110f65a4fa7673ce183f38dd5afee146 Mon Sep 17 00:00:00 2001 From: sduvvuri1603 Date: Wed, 19 Nov 2025 16:05:28 -0500 Subject: [PATCH 03/10] Allow configmap manager RBAC to follow custom namespace Signed-off-by: sduvvuri1603 --- .../base/pipeline/cluster-scoped/kustomization.yaml | 2 ++ .../ml-pipeline-configmap-manager-clusterrolebinding.yaml | 2 +- .../kustomize/base/pipeline/cluster-scoped/params.yaml | 7 +++++++ 3 files changed, 10 insertions(+), 1 deletion(-) create mode 100644 manifests/kustomize/base/pipeline/cluster-scoped/params.yaml diff --git a/manifests/kustomize/base/pipeline/cluster-scoped/kustomization.yaml b/manifests/kustomize/base/pipeline/cluster-scoped/kustomization.yaml index ed4bf51c736..f3c91a34137 100644 --- a/manifests/kustomize/base/pipeline/cluster-scoped/kustomization.yaml +++ b/manifests/kustomize/base/pipeline/cluster-scoped/kustomization.yaml @@ -5,3 +5,5 @@ resources: - viewer-crd.yaml - ml-pipeline-configmap-manager-clusterrole.yaml - ml-pipeline-configmap-manager-clusterrolebinding.yaml +configurations: +- params.yaml diff --git a/manifests/kustomize/base/pipeline/cluster-scoped/ml-pipeline-configmap-manager-clusterrolebinding.yaml b/manifests/kustomize/base/pipeline/cluster-scoped/ml-pipeline-configmap-manager-clusterrolebinding.yaml index bb74738fb78..760307b7ee3 100644 --- a/manifests/kustomize/base/pipeline/cluster-scoped/ml-pipeline-configmap-manager-clusterrolebinding.yaml +++ b/manifests/kustomize/base/pipeline/cluster-scoped/ml-pipeline-configmap-manager-clusterrolebinding.yaml @@ -11,5 +11,5 @@ roleRef: subjects: - kind: ServiceAccount name: ml-pipeline - namespace: kubeflow + namespace: $(kfp-cluster-scoped-namespace) diff --git a/manifests/kustomize/base/pipeline/cluster-scoped/params.yaml b/manifests/kustomize/base/pipeline/cluster-scoped/params.yaml new file mode 100644 index 00000000000..790e2f3b167 --- /dev/null +++ b/manifests/kustomize/base/pipeline/cluster-scoped/params.yaml @@ -0,0 +1,7 @@ +# Allow Kustomize vars to substitute namespaces for service accounts referenced +# by cluster-scoped bindings. +varReference: +- kind: ClusterRoleBinding + path: subjects/namespace + + From 41df1d35c8af1ba28d875958faef5eaecd2d4166 Mon Sep 17 00:00:00 2001 From: sduvvuri1603 Date: Wed, 19 Nov 2025 16:43:52 -0500 Subject: [PATCH 04/10] Add essential pipeline for run parallelism config Signed-off-by: sduvvuri1603 --- .../compilation/pipeline_compilation_test.py | 9 +++ .../pipeline_with_run_parallelism.py | 36 ++++++++++ .../pipeline_with_run_parallelism.yaml | 71 +++++++++++++++++++ 3 files changed, 116 insertions(+) create mode 100644 test_data/sdk_compiled_pipelines/valid/essential/pipeline_with_run_parallelism.py create mode 100644 test_data/sdk_compiled_pipelines/valid/essential/pipeline_with_run_parallelism.yaml diff --git a/sdk/python/test/compilation/pipeline_compilation_test.py b/sdk/python/test/compilation/pipeline_compilation_test.py index 5262e1c9abd..456add92f22 100644 --- a/sdk/python/test/compilation/pipeline_compilation_test.py +++ b/sdk/python/test/compilation/pipeline_compilation_test.py @@ -154,6 +154,8 @@ my_pipeline as pipeline_with_params_containing_format from test_data.sdk_compiled_pipelines.valid.essential.pipeline_with_reused_component import \ my_pipeline as reused_component_pipeline +from test_data.sdk_compiled_pipelines.valid.essential.pipeline_with_run_parallelism import \ + pipeline_with_run_parallelism from test_data.sdk_compiled_pipelines.valid.failing.pipeline_with_exit_handler import \ my_pipeline as exit_handler_pipeline from test_data.sdk_compiled_pipelines.valid.failing.pipeline_with_multiple_exit_handlers import \ @@ -311,6 +313,13 @@ def __repr__(self) -> str: compiled_file_name='outputs_pipeline.yaml', expected_compiled_file_path=f'{_VALID_PIPELINE_FILES}/essential/pipeline_with_outputs.yaml' ), + TestData( + pipeline_name='pipeline-with-run-parallelism', + pipeline_func=pipeline_with_run_parallelism, + pipline_func_args=None, + compiled_file_name='pipeline_with_run_parallelism.yaml', + expected_compiled_file_path=f'{_VALID_PIPELINE_FILES}/essential/pipeline_with_run_parallelism.yaml' + ), TestData( pipeline_name='collected-param-pipeline', pipeline_func=collected_param_pipeline, diff --git a/test_data/sdk_compiled_pipelines/valid/essential/pipeline_with_run_parallelism.py b/test_data/sdk_compiled_pipelines/valid/essential/pipeline_with_run_parallelism.py new file mode 100644 index 00000000000..0e13c399844 --- /dev/null +++ b/test_data/sdk_compiled_pipelines/valid/essential/pipeline_with_run_parallelism.py @@ -0,0 +1,36 @@ +# Copyright 2025 The Kubeflow Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""A minimal pipeline that customizes pipeline_run_parallelism.""" + +from kfp import compiler, dsl + + +@dsl.component +def produce_message() -> str: + return 'hello parallel world' + + +@dsl.pipeline( + name='pipeline-with-run-parallelism', + pipeline_config=dsl.PipelineConfig(pipeline_run_parallelism=7), +) +def pipeline_with_run_parallelism() -> str: + task = produce_message() + return task.output + + +if __name__ == '__main__': + compiler.Compiler().compile( + pipeline_func=pipeline_with_run_parallelism, + package_path=__file__.replace('.py', '.yaml')) diff --git a/test_data/sdk_compiled_pipelines/valid/essential/pipeline_with_run_parallelism.yaml b/test_data/sdk_compiled_pipelines/valid/essential/pipeline_with_run_parallelism.yaml new file mode 100644 index 00000000000..4bde422f313 --- /dev/null +++ b/test_data/sdk_compiled_pipelines/valid/essential/pipeline_with_run_parallelism.yaml @@ -0,0 +1,71 @@ +# PIPELINE DEFINITION +# Name: pipeline-with-run-parallelism +# Outputs: +# Output: str +components: + comp-produce-message: + executorLabel: exec-produce-message + outputDefinitions: + parameters: + Output: + parameterType: STRING +deploymentSpec: + executors: + exec-produce-message: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - produce_message + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.14.6'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ + $0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef produce_message() -> str:\n return 'hello parallel world'\n\ + \n" + image: python:3.11 +pipelineInfo: + name: pipeline-with-run-parallelism +root: + dag: + outputs: + parameters: + Output: + valueFromParameter: + outputParameterKey: Output + producerSubtask: produce-message + tasks: + produce-message: + cachingOptions: + enableCache: true + componentRef: + name: comp-produce-message + taskInfo: + name: produce-message + outputDefinitions: + parameters: + Output: + parameterType: STRING +schemaVersion: 2.1.0 +sdkVersion: kfp-2.14.6 +--- +platforms: + kubernetes: + pipelineConfig: + pipelineRunParallelism: 7 From fe21d6d002249a846376f1803e0a72f376e4afa6 Mon Sep 17 00:00:00 2001 From: sduvvuri1603 Date: Wed, 19 Nov 2025 16:57:00 -0500 Subject: [PATCH 05/10] Update compiler goldens for run parallelism Signed-off-by: sduvvuri1603 --- .../pipeline_with_run_parallelism.yaml | 344 ++++++++++++++++++ ...ith_string_machine_fields_task_output.yaml | 8 +- 2 files changed, 348 insertions(+), 4 deletions(-) create mode 100644 test_data/compiled-workflows/pipeline_with_run_parallelism.yaml diff --git a/test_data/compiled-workflows/pipeline_with_run_parallelism.yaml b/test_data/compiled-workflows/pipeline_with_run_parallelism.yaml new file mode 100644 index 00000000000..5fe705926c6 --- /dev/null +++ b/test_data/compiled-workflows/pipeline_with_run_parallelism.yaml @@ -0,0 +1,344 @@ +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + creationTimestamp: null + generateName: pipeline-with-run-parallelism- +spec: + arguments: + parameters: + - name: components-49ed1f863907856117ffc90bf51c826cc3afd6238936620d7c038bf9a87f40cd + value: '{"executorLabel":"exec-produce-message","outputDefinitions":{"parameters":{"Output":{"parameterType":"STRING"}}}}' + - name: implementations-49ed1f863907856117ffc90bf51c826cc3afd6238936620d7c038bf9a87f40cd + value: '{"args":["--executor_input","{{$}}","--function_to_execute","produce_message"],"command":["sh","-c","\nif + ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip || python3 + -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1 + python3 -m pip install --quiet --no-warn-script-location ''kfp==2.14.6'' ''--no-deps'' + ''typing-extensions\u003e=3.7.4,\u003c5; python_version\u003c\"3.9\"'' \u0026\u0026 + \"$0\" \"$@\"\n","sh","-ec","program_path=$(mktemp -d)\n\nprintf \"%s\" \"$0\" + \u003e \"$program_path/ephemeral_component.py\"\n_KFP_RUNTIME=true python3 + -m kfp.dsl.executor_main --component_module_path \"$program_path/ephemeral_component.py\" \"$@\"\n","\nimport + kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import *\n\ndef + produce_message() -\u003e str:\n return ''hello parallel world''\n\n"],"image":"python:3.11"}' + - name: components-root + value: '{"dag":{"outputs":{"parameters":{"Output":{"valueFromParameter":{"outputParameterKey":"Output","producerSubtask":"produce-message"}}}},"tasks":{"produce-message":{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-produce-message"},"taskInfo":{"name":"produce-message"}}}},"outputDefinitions":{"parameters":{"Output":{"parameterType":"STRING"}}}}' + entrypoint: entrypoint + parallelism: 7 + podMetadata: + annotations: + pipelines.kubeflow.org/v2_component: "true" + labels: + pipelines.kubeflow.org/v2_component: "true" + serviceAccountName: pipeline-runner + templates: + - container: + args: + - --type + - CONTAINER + - --pipeline_name + - pipeline-with-run-parallelism + - --run_id + - '{{workflow.uid}}' + - --run_name + - '{{workflow.name}}' + - --run_display_name + - "" + - --dag_execution_id + - '{{inputs.parameters.parent-dag-id}}' + - --component + - '{{inputs.parameters.component}}' + - --task + - '{{inputs.parameters.task}}' + - --task_name + - '{{inputs.parameters.task-name}}' + - --container + - '{{inputs.parameters.container}}' + - --iteration_index + - '{{inputs.parameters.iteration-index}}' + - --cached_decision_path + - '{{outputs.parameters.cached-decision.path}}' + - --pod_spec_patch_path + - '{{outputs.parameters.pod-spec-patch.path}}' + - --condition_path + - '{{outputs.parameters.condition.path}}' + - --kubernetes_config + - '{{inputs.parameters.kubernetes-config}}' + - --http_proxy + - "" + - --https_proxy + - "" + - --no_proxy + - "" + - --mlmd_server_address + - metadata-grpc-service.kubeflow.svc.cluster.local + - --mlmd_server_port + - "8080" + command: + - driver + image: ghcr.io/kubeflow/kfp-driver:latest + name: "" + resources: + limits: + cpu: 500m + memory: 512Mi + requests: + cpu: 100m + memory: 64Mi + inputs: + parameters: + - name: component + - name: task + - name: container + - name: task-name + - name: parent-dag-id + - default: "-1" + name: iteration-index + - default: "" + name: kubernetes-config + metadata: {} + name: system-container-driver + outputs: + parameters: + - name: pod-spec-patch + valueFrom: + default: "" + path: /tmp/outputs/pod-spec-patch + - default: "false" + name: cached-decision + valueFrom: + default: "false" + path: /tmp/outputs/cached-decision + - name: condition + valueFrom: + default: "true" + path: /tmp/outputs/condition + - dag: + tasks: + - arguments: + parameters: + - name: pod-spec-patch + value: '{{inputs.parameters.pod-spec-patch}}' + name: executor + template: system-container-impl + when: '{{inputs.parameters.cached-decision}} != true' + inputs: + parameters: + - name: pod-spec-patch + - default: "false" + name: cached-decision + metadata: {} + name: system-container-executor + outputs: {} + - container: + command: + - should-be-overridden-during-runtime + env: + - name: KFP_POD_NAME + valueFrom: + fieldRef: + fieldPath: metadata.name + - name: KFP_POD_UID + valueFrom: + fieldRef: + fieldPath: metadata.uid + envFrom: + - configMapRef: + name: metadata-grpc-configmap + optional: true + image: gcr.io/ml-pipeline/should-be-overridden-during-runtime + name: "" + resources: {} + volumeMounts: + - mountPath: /kfp-launcher + name: kfp-launcher + - mountPath: /gcs + name: gcs-scratch + - mountPath: /s3 + name: s3-scratch + - mountPath: /minio + name: minio-scratch + - mountPath: /.local + name: dot-local-scratch + - mountPath: /.cache + name: dot-cache-scratch + - mountPath: /.config + name: dot-config-scratch + initContainers: + - args: + - --copy + - /kfp-launcher/launch + command: + - launcher-v2 + image: ghcr.io/kubeflow/kfp-launcher:latest + name: kfp-launcher + resources: + limits: + cpu: 500m + memory: 128Mi + requests: + cpu: 100m + volumeMounts: + - mountPath: /kfp-launcher + name: kfp-launcher + inputs: + parameters: + - name: pod-spec-patch + metadata: {} + name: system-container-impl + outputs: {} + podSpecPatch: '{{inputs.parameters.pod-spec-patch}}' + volumes: + - emptyDir: {} + name: kfp-launcher + - emptyDir: {} + name: gcs-scratch + - emptyDir: {} + name: s3-scratch + - emptyDir: {} + name: minio-scratch + - emptyDir: {} + name: dot-local-scratch + - emptyDir: {} + name: dot-cache-scratch + - emptyDir: {} + name: dot-config-scratch + - dag: + tasks: + - arguments: + parameters: + - name: component + value: '{{workflow.parameters.components-49ed1f863907856117ffc90bf51c826cc3afd6238936620d7c038bf9a87f40cd}}' + - name: task + value: '{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-produce-message"},"taskInfo":{"name":"produce-message"}}' + - name: container + value: '{{workflow.parameters.implementations-49ed1f863907856117ffc90bf51c826cc3afd6238936620d7c038bf9a87f40cd}}' + - name: task-name + value: produce-message + - name: parent-dag-id + value: '{{inputs.parameters.parent-dag-id}}' + name: produce-message-driver + template: system-container-driver + - arguments: + parameters: + - name: pod-spec-patch + value: '{{tasks.produce-message-driver.outputs.parameters.pod-spec-patch}}' + - default: "false" + name: cached-decision + value: '{{tasks.produce-message-driver.outputs.parameters.cached-decision}}' + depends: produce-message-driver.Succeeded + name: produce-message + template: system-container-executor + inputs: + parameters: + - name: parent-dag-id + metadata: {} + name: root + outputs: {} + - container: + args: + - --type + - '{{inputs.parameters.driver-type}}' + - --pipeline_name + - pipeline-with-run-parallelism + - --run_id + - '{{workflow.uid}}' + - --run_name + - '{{workflow.name}}' + - --run_display_name + - "" + - --dag_execution_id + - '{{inputs.parameters.parent-dag-id}}' + - --component + - '{{inputs.parameters.component}}' + - --task + - '{{inputs.parameters.task}}' + - --task_name + - '{{inputs.parameters.task-name}}' + - --runtime_config + - '{{inputs.parameters.runtime-config}}' + - --iteration_index + - '{{inputs.parameters.iteration-index}}' + - --execution_id_path + - '{{outputs.parameters.execution-id.path}}' + - --iteration_count_path + - '{{outputs.parameters.iteration-count.path}}' + - --condition_path + - '{{outputs.parameters.condition.path}}' + - --http_proxy + - "" + - --https_proxy + - "" + - --no_proxy + - "" + - --mlmd_server_address + - metadata-grpc-service.kubeflow.svc.cluster.local + - --mlmd_server_port + - "8080" + command: + - driver + image: ghcr.io/kubeflow/kfp-driver:latest + name: "" + resources: + limits: + cpu: 500m + memory: 512Mi + requests: + cpu: 100m + memory: 64Mi + inputs: + parameters: + - name: component + - default: "" + name: runtime-config + - default: "" + name: task + - default: "" + name: task-name + - default: "0" + name: parent-dag-id + - default: "-1" + name: iteration-index + - default: DAG + name: driver-type + metadata: {} + name: system-dag-driver + outputs: + parameters: + - name: execution-id + valueFrom: + path: /tmp/outputs/execution-id + - name: iteration-count + valueFrom: + default: "0" + path: /tmp/outputs/iteration-count + - name: condition + valueFrom: + default: "true" + path: /tmp/outputs/condition + - dag: + tasks: + - arguments: + parameters: + - name: component + value: '{{workflow.parameters.components-root}}' + - name: runtime-config + value: '{}' + - name: driver-type + value: ROOT_DAG + name: root-driver + template: system-dag-driver + - arguments: + parameters: + - name: parent-dag-id + value: '{{tasks.root-driver.outputs.parameters.execution-id}}' + - name: condition + value: "" + depends: root-driver.Succeeded + name: root + template: root + inputs: {} + metadata: {} + name: entrypoint + outputs: {} +status: + finishedAt: null + startedAt: null diff --git a/test_data/compiled-workflows/pipeline_with_string_machine_fields_task_output.yaml b/test_data/compiled-workflows/pipeline_with_string_machine_fields_task_output.yaml index b8e6a4cd3cd..46163455a8f 100644 --- a/test_data/compiled-workflows/pipeline_with_string_machine_fields_task_output.yaml +++ b/test_data/compiled-workflows/pipeline_with_string_machine_fields_task_output.yaml @@ -72,7 +72,7 @@ spec: kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import *\n\ndef sum_numbers(a: int, b: int) -\u003e int:\n return a + b\n\n"],"image":"python:3.11","resources":{"accelerator":{"resourceCount":"{{$.inputs.parameters[''pipelinechannel--accelerator-limit-Output'']}}","resourceType":"{{$.inputs.parameters[''pipelinechannel--accelerator-type-Output'']}}"},"resourceCpuLimit":"{{$.inputs.parameters[''pipelinechannel--cpu-limit-Output'']}}","resourceMemoryLimit":"{{$.inputs.parameters[''pipelinechannel--memory-limit-Output'']}}"}}' - name: components-root - value: '{"dag":{"tasks":{"accelerator-limit":{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-accelerator-limit"},"taskInfo":{"name":"accelerator-limit"}},"accelerator-type":{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-accelerator-type"},"taskInfo":{"name":"accelerator-type"}},"cpu-limit":{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-cpu-limit"},"taskInfo":{"name":"cpu-limit"}},"memory-limit":{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-memory-limit"},"taskInfo":{"name":"memory-limit"}},"sum-numbers":{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-sum-numbers"},"dependentTasks":["accelerator-limit","cpu-limit","accelerator-type","memory-limit"],"inputs":{"parameters":{"a":{"runtimeValue":{"constant":1}},"accelerator_count":{"runtimeValue":{"constant":"{{$.inputs.parameters[''pipelinechannel--accelerator-limit-Output'']}}"}},"accelerator_type":{"runtimeValue":{"constant":"{{$.inputs.parameters[''pipelinechannel--accelerator-type-Output'']}}"}},"b":{"runtimeValue":{"constant":2}},"cpu_limit":{"runtimeValue":{"constant":"{{$.inputs.parameters[''pipelinechannel--cpu-limit-Output'']}}"}},"memory_limit":{"runtimeValue":{"constant":"{{$.inputs.parameters[''pipelinechannel--memory-limit-Output'']}}"}},"pipelinechannel--accelerator-limit-Output":{"taskOutputParameter":{"outputParameterKey":"Output","producerTask":"accelerator-limit"}},"pipelinechannel--accelerator-type-Output":{"taskOutputParameter":{"outputParameterKey":"Output","producerTask":"accelerator-type"}},"pipelinechannel--cpu-limit-Output":{"taskOutputParameter":{"outputParameterKey":"Output","producerTask":"cpu-limit"}},"pipelinechannel--memory-limit-Output":{"taskOutputParameter":{"outputParameterKey":"Output","producerTask":"memory-limit"}}}},"taskInfo":{"name":"sum-numbers"}}}}}' + value: '{"dag":{"tasks":{"accelerator-limit":{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-accelerator-limit"},"taskInfo":{"name":"accelerator-limit"}},"accelerator-type":{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-accelerator-type"},"taskInfo":{"name":"accelerator-type"}},"cpu-limit":{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-cpu-limit"},"taskInfo":{"name":"cpu-limit"}},"memory-limit":{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-memory-limit"},"taskInfo":{"name":"memory-limit"}},"sum-numbers":{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-sum-numbers"},"dependentTasks":["cpu-limit","memory-limit","accelerator-limit","accelerator-type"],"inputs":{"parameters":{"a":{"runtimeValue":{"constant":1}},"accelerator_count":{"runtimeValue":{"constant":"{{$.inputs.parameters[''pipelinechannel--accelerator-limit-Output'']}}"}},"accelerator_type":{"runtimeValue":{"constant":"{{$.inputs.parameters[''pipelinechannel--accelerator-type-Output'']}}"}},"b":{"runtimeValue":{"constant":2}},"cpu_limit":{"runtimeValue":{"constant":"{{$.inputs.parameters[''pipelinechannel--cpu-limit-Output'']}}"}},"memory_limit":{"runtimeValue":{"constant":"{{$.inputs.parameters[''pipelinechannel--memory-limit-Output'']}}"}},"pipelinechannel--accelerator-limit-Output":{"taskOutputParameter":{"outputParameterKey":"Output","producerTask":"accelerator-limit"}},"pipelinechannel--accelerator-type-Output":{"taskOutputParameter":{"outputParameterKey":"Output","producerTask":"accelerator-type"}},"pipelinechannel--cpu-limit-Output":{"taskOutputParameter":{"outputParameterKey":"Output","producerTask":"cpu-limit"}},"pipelinechannel--memory-limit-Output":{"taskOutputParameter":{"outputParameterKey":"Output","producerTask":"memory-limit"}}}},"taskInfo":{"name":"sum-numbers"}}}}}' entrypoint: entrypoint podMetadata: annotations: @@ -355,15 +355,15 @@ spec: - name: component value: '{{workflow.parameters.components-49f9a898b718a077f30b7fd8c02d39767cff91ff0bbda4379daf866a91dbdb1b}}' - name: task - value: '{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-sum-numbers"},"dependentTasks":["accelerator-limit","cpu-limit","accelerator-type","memory-limit"],"inputs":{"parameters":{"a":{"runtimeValue":{"constant":1}},"accelerator_count":{"runtimeValue":{"constant":"{{$.inputs.parameters[''pipelinechannel--accelerator-limit-Output'']}}"}},"accelerator_type":{"runtimeValue":{"constant":"{{$.inputs.parameters[''pipelinechannel--accelerator-type-Output'']}}"}},"b":{"runtimeValue":{"constant":2}},"cpu_limit":{"runtimeValue":{"constant":"{{$.inputs.parameters[''pipelinechannel--cpu-limit-Output'']}}"}},"memory_limit":{"runtimeValue":{"constant":"{{$.inputs.parameters[''pipelinechannel--memory-limit-Output'']}}"}},"pipelinechannel--accelerator-limit-Output":{"taskOutputParameter":{"outputParameterKey":"Output","producerTask":"accelerator-limit"}},"pipelinechannel--accelerator-type-Output":{"taskOutputParameter":{"outputParameterKey":"Output","producerTask":"accelerator-type"}},"pipelinechannel--cpu-limit-Output":{"taskOutputParameter":{"outputParameterKey":"Output","producerTask":"cpu-limit"}},"pipelinechannel--memory-limit-Output":{"taskOutputParameter":{"outputParameterKey":"Output","producerTask":"memory-limit"}}}},"taskInfo":{"name":"sum-numbers"}}' + value: '{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-sum-numbers"},"dependentTasks":["cpu-limit","memory-limit","accelerator-limit","accelerator-type"],"inputs":{"parameters":{"a":{"runtimeValue":{"constant":1}},"accelerator_count":{"runtimeValue":{"constant":"{{$.inputs.parameters[''pipelinechannel--accelerator-limit-Output'']}}"}},"accelerator_type":{"runtimeValue":{"constant":"{{$.inputs.parameters[''pipelinechannel--accelerator-type-Output'']}}"}},"b":{"runtimeValue":{"constant":2}},"cpu_limit":{"runtimeValue":{"constant":"{{$.inputs.parameters[''pipelinechannel--cpu-limit-Output'']}}"}},"memory_limit":{"runtimeValue":{"constant":"{{$.inputs.parameters[''pipelinechannel--memory-limit-Output'']}}"}},"pipelinechannel--accelerator-limit-Output":{"taskOutputParameter":{"outputParameterKey":"Output","producerTask":"accelerator-limit"}},"pipelinechannel--accelerator-type-Output":{"taskOutputParameter":{"outputParameterKey":"Output","producerTask":"accelerator-type"}},"pipelinechannel--cpu-limit-Output":{"taskOutputParameter":{"outputParameterKey":"Output","producerTask":"cpu-limit"}},"pipelinechannel--memory-limit-Output":{"taskOutputParameter":{"outputParameterKey":"Output","producerTask":"memory-limit"}}}},"taskInfo":{"name":"sum-numbers"}}' - name: container value: '{{workflow.parameters.implementations-49f9a898b718a077f30b7fd8c02d39767cff91ff0bbda4379daf866a91dbdb1b}}' - name: task-name value: sum-numbers - name: parent-dag-id value: '{{inputs.parameters.parent-dag-id}}' - depends: accelerator-limit.Succeeded && cpu-limit.Succeeded && accelerator-type.Succeeded - && memory-limit.Succeeded + depends: cpu-limit.Succeeded && memory-limit.Succeeded && accelerator-limit.Succeeded + && accelerator-type.Succeeded name: sum-numbers-driver template: system-container-driver - arguments: From 28aed8e73851fd7d46bc79fcc33b86d00c91ffe9 Mon Sep 17 00:00:00 2001 From: sduvvuri1603 Date: Thu, 20 Nov 2025 13:27:29 -0500 Subject: [PATCH 06/10] Docs: keep workspace proposal focused Signed-off-by: sduvvuri1603 --- proposals/11875-pipeline-workspace/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/proposals/11875-pipeline-workspace/README.md b/proposals/11875-pipeline-workspace/README.md index 0d79e013d6d..5e05694d72c 100644 --- a/proposals/11875-pipeline-workspace/README.md +++ b/proposals/11875-pipeline-workspace/README.md @@ -364,7 +364,7 @@ An example of overriding the PVC configuration defaults set on the API server: ) ``` -This will lead to protocol buffer changes similar to the following. In addition to the workspace-related fields, a new `pipeline_run_parallelism` option allows authors to cap the number of concurrent tasks for a run: +This will lead to protocol buffer changes similar to the following: ```diff diff --git a/api/v2alpha1/pipeline_spec.proto b/api/v2alpha1/pipeline_spec.proto From 90b4b75c78fe2212c88271a81b69db847390cb69 Mon Sep 17 00:00:00 2001 From: sduvvuri1603 Date: Thu, 20 Nov 2025 16:49:43 -0500 Subject: [PATCH 07/10] Refresh compiler goldens Signed-off-by: sduvvuri1603 --- .../compiled-workflows/pipeline_with_run_parallelism.yaml | 8 ++++++++ .../pipeline_with_string_machine_fields_task_output.yaml | 8 ++++---- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/test_data/compiled-workflows/pipeline_with_run_parallelism.yaml b/test_data/compiled-workflows/pipeline_with_run_parallelism.yaml index 5fe705926c6..70ee4595f71 100644 --- a/test_data/compiled-workflows/pipeline_with_run_parallelism.yaml +++ b/test_data/compiled-workflows/pipeline_with_run_parallelism.yaml @@ -68,6 +68,10 @@ spec: - "" - --no_proxy - "" + - --ml_pipeline_server_address + - ml-pipeline.kubeflow + - --ml_pipeline_server_port + - "8887" - --mlmd_server_address - metadata-grpc-service.kubeflow.svc.cluster.local - --mlmd_server_port @@ -269,6 +273,10 @@ spec: - "" - --no_proxy - "" + - --ml_pipeline_server_address + - ml-pipeline.kubeflow + - --ml_pipeline_server_port + - "8887" - --mlmd_server_address - metadata-grpc-service.kubeflow.svc.cluster.local - --mlmd_server_port diff --git a/test_data/compiled-workflows/pipeline_with_string_machine_fields_task_output.yaml b/test_data/compiled-workflows/pipeline_with_string_machine_fields_task_output.yaml index 290fe74383b..fde208d8b24 100644 --- a/test_data/compiled-workflows/pipeline_with_string_machine_fields_task_output.yaml +++ b/test_data/compiled-workflows/pipeline_with_string_machine_fields_task_output.yaml @@ -72,7 +72,7 @@ spec: kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import *\n\ndef sum_numbers(a: int, b: int) -\u003e int:\n return a + b\n\n"],"image":"python:3.11","resources":{"accelerator":{"resourceCount":"{{$.inputs.parameters[''pipelinechannel--accelerator-limit-Output'']}}","resourceType":"{{$.inputs.parameters[''pipelinechannel--accelerator-type-Output'']}}"},"resourceCpuLimit":"{{$.inputs.parameters[''pipelinechannel--cpu-limit-Output'']}}","resourceMemoryLimit":"{{$.inputs.parameters[''pipelinechannel--memory-limit-Output'']}}"}}' - name: components-root - value: '{"dag":{"tasks":{"accelerator-limit":{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-accelerator-limit"},"taskInfo":{"name":"accelerator-limit"}},"accelerator-type":{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-accelerator-type"},"taskInfo":{"name":"accelerator-type"}},"cpu-limit":{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-cpu-limit"},"taskInfo":{"name":"cpu-limit"}},"memory-limit":{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-memory-limit"},"taskInfo":{"name":"memory-limit"}},"sum-numbers":{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-sum-numbers"},"dependentTasks":["accelerator-type","cpu-limit","memory-limit","accelerator-limit"],"inputs":{"parameters":{"a":{"runtimeValue":{"constant":1}},"accelerator_count":{"runtimeValue":{"constant":"{{$.inputs.parameters[''pipelinechannel--accelerator-limit-Output'']}}"}},"accelerator_type":{"runtimeValue":{"constant":"{{$.inputs.parameters[''pipelinechannel--accelerator-type-Output'']}}"}},"b":{"runtimeValue":{"constant":2}},"cpu_limit":{"runtimeValue":{"constant":"{{$.inputs.parameters[''pipelinechannel--cpu-limit-Output'']}}"}},"memory_limit":{"runtimeValue":{"constant":"{{$.inputs.parameters[''pipelinechannel--memory-limit-Output'']}}"}},"pipelinechannel--accelerator-limit-Output":{"taskOutputParameter":{"outputParameterKey":"Output","producerTask":"accelerator-limit"}},"pipelinechannel--accelerator-type-Output":{"taskOutputParameter":{"outputParameterKey":"Output","producerTask":"accelerator-type"}},"pipelinechannel--cpu-limit-Output":{"taskOutputParameter":{"outputParameterKey":"Output","producerTask":"cpu-limit"}},"pipelinechannel--memory-limit-Output":{"taskOutputParameter":{"outputParameterKey":"Output","producerTask":"memory-limit"}}}},"taskInfo":{"name":"sum-numbers"}}}}}' + value: '{"dag":{"tasks":{"accelerator-limit":{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-accelerator-limit"},"taskInfo":{"name":"accelerator-limit"}},"accelerator-type":{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-accelerator-type"},"taskInfo":{"name":"accelerator-type"}},"cpu-limit":{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-cpu-limit"},"taskInfo":{"name":"cpu-limit"}},"memory-limit":{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-memory-limit"},"taskInfo":{"name":"memory-limit"}},"sum-numbers":{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-sum-numbers"},"dependentTasks":["cpu-limit","memory-limit","accelerator-limit","accelerator-type"],"inputs":{"parameters":{"a":{"runtimeValue":{"constant":1}},"accelerator_count":{"runtimeValue":{"constant":"{{$.inputs.parameters[''pipelinechannel--accelerator-limit-Output'']}}"}},"accelerator_type":{"runtimeValue":{"constant":"{{$.inputs.parameters[''pipelinechannel--accelerator-type-Output'']}}"}},"b":{"runtimeValue":{"constant":2}},"cpu_limit":{"runtimeValue":{"constant":"{{$.inputs.parameters[''pipelinechannel--cpu-limit-Output'']}}"}},"memory_limit":{"runtimeValue":{"constant":"{{$.inputs.parameters[''pipelinechannel--memory-limit-Output'']}}"}},"pipelinechannel--accelerator-limit-Output":{"taskOutputParameter":{"outputParameterKey":"Output","producerTask":"accelerator-limit"}},"pipelinechannel--accelerator-type-Output":{"taskOutputParameter":{"outputParameterKey":"Output","producerTask":"accelerator-type"}},"pipelinechannel--cpu-limit-Output":{"taskOutputParameter":{"outputParameterKey":"Output","producerTask":"cpu-limit"}},"pipelinechannel--memory-limit-Output":{"taskOutputParameter":{"outputParameterKey":"Output","producerTask":"memory-limit"}}}},"taskInfo":{"name":"sum-numbers"}}}}}' entrypoint: entrypoint podMetadata: annotations: @@ -359,15 +359,15 @@ spec: - name: component value: '{{workflow.parameters.components-49f9a898b718a077f30b7fd8c02d39767cff91ff0bbda4379daf866a91dbdb1b}}' - name: task - value: '{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-sum-numbers"},"dependentTasks":["accelerator-type","cpu-limit","memory-limit","accelerator-limit"],"inputs":{"parameters":{"a":{"runtimeValue":{"constant":1}},"accelerator_count":{"runtimeValue":{"constant":"{{$.inputs.parameters[''pipelinechannel--accelerator-limit-Output'']}}"}},"accelerator_type":{"runtimeValue":{"constant":"{{$.inputs.parameters[''pipelinechannel--accelerator-type-Output'']}}"}},"b":{"runtimeValue":{"constant":2}},"cpu_limit":{"runtimeValue":{"constant":"{{$.inputs.parameters[''pipelinechannel--cpu-limit-Output'']}}"}},"memory_limit":{"runtimeValue":{"constant":"{{$.inputs.parameters[''pipelinechannel--memory-limit-Output'']}}"}},"pipelinechannel--accelerator-limit-Output":{"taskOutputParameter":{"outputParameterKey":"Output","producerTask":"accelerator-limit"}},"pipelinechannel--accelerator-type-Output":{"taskOutputParameter":{"outputParameterKey":"Output","producerTask":"accelerator-type"}},"pipelinechannel--cpu-limit-Output":{"taskOutputParameter":{"outputParameterKey":"Output","producerTask":"cpu-limit"}},"pipelinechannel--memory-limit-Output":{"taskOutputParameter":{"outputParameterKey":"Output","producerTask":"memory-limit"}}}},"taskInfo":{"name":"sum-numbers"}}' + value: '{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-sum-numbers"},"dependentTasks":["cpu-limit","memory-limit","accelerator-limit","accelerator-type"],"inputs":{"parameters":{"a":{"runtimeValue":{"constant":1}},"accelerator_count":{"runtimeValue":{"constant":"{{$.inputs.parameters[''pipelinechannel--accelerator-limit-Output'']}}"}},"accelerator_type":{"runtimeValue":{"constant":"{{$.inputs.parameters[''pipelinechannel--accelerator-type-Output'']}}"}},"b":{"runtimeValue":{"constant":2}},"cpu_limit":{"runtimeValue":{"constant":"{{$.inputs.parameters[''pipelinechannel--cpu-limit-Output'']}}"}},"memory_limit":{"runtimeValue":{"constant":"{{$.inputs.parameters[''pipelinechannel--memory-limit-Output'']}}"}},"pipelinechannel--accelerator-limit-Output":{"taskOutputParameter":{"outputParameterKey":"Output","producerTask":"accelerator-limit"}},"pipelinechannel--accelerator-type-Output":{"taskOutputParameter":{"outputParameterKey":"Output","producerTask":"accelerator-type"}},"pipelinechannel--cpu-limit-Output":{"taskOutputParameter":{"outputParameterKey":"Output","producerTask":"cpu-limit"}},"pipelinechannel--memory-limit-Output":{"taskOutputParameter":{"outputParameterKey":"Output","producerTask":"memory-limit"}}}},"taskInfo":{"name":"sum-numbers"}}' - name: container value: '{{workflow.parameters.implementations-49f9a898b718a077f30b7fd8c02d39767cff91ff0bbda4379daf866a91dbdb1b}}' - name: task-name value: sum-numbers - name: parent-dag-id value: '{{inputs.parameters.parent-dag-id}}' - depends: accelerator-type.Succeeded && cpu-limit.Succeeded && memory-limit.Succeeded - && accelerator-limit.Succeeded + depends: cpu-limit.Succeeded && memory-limit.Succeeded && accelerator-limit.Succeeded + && accelerator-type.Succeeded name: sum-numbers-driver template: system-container-driver - arguments: From 42c13d3d6e804d74cc86333384b4fbf5f29da1ac Mon Sep 17 00:00:00 2001 From: sduvvuri1603 Date: Fri, 21 Nov 2025 20:20:01 -0500 Subject: [PATCH 08/10] Reset workspace pipeline parallelism Signed-off-by: sduvvuri1603 --- test_data/compiled-workflows/pipeline_with_workspace.yaml | 1 - .../valid/critical/pipeline_with_workspace.py | 2 +- .../valid/critical/pipeline_with_workspace.yaml | 1 - 3 files changed, 1 insertion(+), 3 deletions(-) diff --git a/test_data/compiled-workflows/pipeline_with_workspace.yaml b/test_data/compiled-workflows/pipeline_with_workspace.yaml index 900c10c59b4..66eb58149ae 100644 --- a/test_data/compiled-workflows/pipeline_with_workspace.yaml +++ b/test_data/compiled-workflows/pipeline_with_workspace.yaml @@ -46,7 +46,6 @@ spec: - name: components-root value: '{"dag":{"outputs":{"parameters":{"Output":{"valueFromParameter":{"outputParameterKey":"Output","producerSubtask":"read-from-workspace"}}}},"tasks":{"read-from-workspace":{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-read-from-workspace"},"dependentTasks":["write-to-workspace"],"inputs":{"parameters":{"file_path":{"taskOutputParameter":{"outputParameterKey":"Output","producerTask":"write-to-workspace"}}}},"taskInfo":{"name":"read-from-workspace"}},"write-to-workspace":{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-write-to-workspace"},"inputs":{"parameters":{"workspace_path":{"runtimeValue":{"constant":"{{$.workspace_path}}"}}}},"taskInfo":{"name":"write-to-workspace"}}}},"outputDefinitions":{"parameters":{"Output":{"parameterType":"STRING"}}}}' entrypoint: entrypoint - parallelism: 3 podMetadata: annotations: pipelines.kubeflow.org/v2_component: "true" diff --git a/test_data/sdk_compiled_pipelines/valid/critical/pipeline_with_workspace.py b/test_data/sdk_compiled_pipelines/valid/critical/pipeline_with_workspace.py index ccf6f0c20cb..6bda2102e23 100644 --- a/test_data/sdk_compiled_pipelines/valid/critical/pipeline_with_workspace.py +++ b/test_data/sdk_compiled_pipelines/valid/critical/pipeline_with_workspace.py @@ -53,7 +53,7 @@ def read_from_workspace(file_path: str) -> str: name="pipeline-with-workspace", description="A pipeline that demonstrates workspace functionality", pipeline_config=dsl.PipelineConfig( - pipeline_run_parallelism=3, + pipeline_run_parallelism=None, workspace=dsl.WorkspaceConfig( size='1Gi', kubernetes=dsl.KubernetesWorkspaceConfig( diff --git a/test_data/sdk_compiled_pipelines/valid/critical/pipeline_with_workspace.yaml b/test_data/sdk_compiled_pipelines/valid/critical/pipeline_with_workspace.yaml index ddb3c15fba5..1b35ff824fe 100644 --- a/test_data/sdk_compiled_pipelines/valid/critical/pipeline_with_workspace.yaml +++ b/test_data/sdk_compiled_pipelines/valid/critical/pipeline_with_workspace.yaml @@ -143,7 +143,6 @@ sdkVersion: kfp-2.14.6 platforms: kubernetes: pipelineConfig: - pipelineRunParallelism: 3 workspace: kubernetes: pvcSpecPatch: From 8ccafa9089f833a3fcedc59f2772328e6d2d6725 Mon Sep 17 00:00:00 2001 From: sduvvuri1603 Date: Mon, 24 Nov 2025 12:35:46 -0500 Subject: [PATCH 09/10] Refine pipeline_run_parallelism tests Signed-off-by: sduvvuri1603 --- sdk/python/kfp/dsl/pipeline_config.py | 18 ++-- .../pipeline_with_run_parallelism.yaml | 98 +++++++++++++++++-- .../pipeline_with_run_parallelism.py | 13 +-- .../pipeline_with_run_parallelism.yaml | 52 ++++++---- 4 files changed, 136 insertions(+), 45 deletions(-) diff --git a/sdk/python/kfp/dsl/pipeline_config.py b/sdk/python/kfp/dsl/pipeline_config.py index 7f247edf5c6..0a139ffffed 100644 --- a/sdk/python/kfp/dsl/pipeline_config.py +++ b/sdk/python/kfp/dsl/pipeline_config.py @@ -108,13 +108,13 @@ def pipeline_run_parallelism(self) -> Optional[int]: @pipeline_run_parallelism.setter def pipeline_run_parallelism(self, value: Optional[int]) -> None: # pylint: disable=attribute-defined-outside-init - if value is None: + if value is not None: + if not isinstance(value, int): + raise ValueError( + 'pipeline_run_parallelism must be an integer if specified.') + if value <= 0: + raise ValueError( + 'pipeline_run_parallelism must be a positive integer.') + self._pipeline_run_parallelism = value + else: self._pipeline_run_parallelism = None - return - if not isinstance(value, int): - raise ValueError( - 'pipeline_run_parallelism must be an integer if specified.') - if value <= 0: - raise ValueError( - 'pipeline_run_parallelism must be a positive integer.') - self._pipeline_run_parallelism = value diff --git a/test_data/compiled-workflows/pipeline_with_run_parallelism.yaml b/test_data/compiled-workflows/pipeline_with_run_parallelism.yaml index 70ee4595f71..b29156a319f 100644 --- a/test_data/compiled-workflows/pipeline_with_run_parallelism.yaml +++ b/test_data/compiled-workflows/pipeline_with_run_parallelism.yaml @@ -6,9 +6,9 @@ metadata: spec: arguments: parameters: - - name: components-49ed1f863907856117ffc90bf51c826cc3afd6238936620d7c038bf9a87f40cd - value: '{"executorLabel":"exec-produce-message","outputDefinitions":{"parameters":{"Output":{"parameterType":"STRING"}}}}' - - name: implementations-49ed1f863907856117ffc90bf51c826cc3afd6238936620d7c038bf9a87f40cd + - name: components-00cdf4de1b0ff15d542d6cb54821aeebf3de511087a035956ba6381b97ace0a1 + value: '{"executorLabel":"exec-produce-message","inputDefinitions":{"parameters":{"msg":{"parameterType":"STRING"}}},"outputDefinitions":{"parameters":{"Output":{"parameterType":"STRING"}}}}' + - name: implementations-00cdf4de1b0ff15d542d6cb54821aeebf3de511087a035956ba6381b97ace0a1 value: '{"args":["--executor_input","{{$}}","--function_to_execute","produce_message"],"command":["sh","-c","\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip || python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1 @@ -18,11 +18,14 @@ spec: \u003e \"$program_path/ephemeral_component.py\"\n_KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path \"$program_path/ephemeral_component.py\" \"$@\"\n","\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import *\n\ndef - produce_message() -\u003e str:\n return ''hello parallel world''\n\n"],"image":"python:3.11"}' + produce_message(msg: str) -\u003e str:\n print(msg)\n return msg\n\n"],"image":"python:3.11"}' + - name: components-comp-for-loop-2 + value: '{"dag":{"tasks":{"produce-message":{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-produce-message"},"inputs":{"parameters":{"msg":{"componentInputParameter":"pipelinechannel--loop-item-param-1"}}},"taskInfo":{"name":"produce-message"}}}},"inputDefinitions":{"parameters":{"pipelinechannel--loop-item-param-1":{"parameterType":"STRING"}}}}' - name: components-root - value: '{"dag":{"outputs":{"parameters":{"Output":{"valueFromParameter":{"outputParameterKey":"Output","producerSubtask":"produce-message"}}}},"tasks":{"produce-message":{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-produce-message"},"taskInfo":{"name":"produce-message"}}}},"outputDefinitions":{"parameters":{"Output":{"parameterType":"STRING"}}}}' + value: '{"dag":{"tasks":{"for-loop-2":{"componentRef":{"name":"comp-for-loop-2"},"parameterIterator":{"itemInput":"pipelinechannel--loop-item-param-1","items":{"raw":"[\"one\", + \"two\", \"three\"]"}},"taskInfo":{"name":"for-loop-2"}}}}}' entrypoint: entrypoint - parallelism: 7 + parallelism: 2 podMetadata: annotations: pipelines.kubeflow.org/v2_component: "true" @@ -210,11 +213,11 @@ spec: - arguments: parameters: - name: component - value: '{{workflow.parameters.components-49ed1f863907856117ffc90bf51c826cc3afd6238936620d7c038bf9a87f40cd}}' + value: '{{workflow.parameters.components-00cdf4de1b0ff15d542d6cb54821aeebf3de511087a035956ba6381b97ace0a1}}' - name: task - value: '{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-produce-message"},"taskInfo":{"name":"produce-message"}}' + value: '{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-produce-message"},"inputs":{"parameters":{"msg":{"componentInputParameter":"pipelinechannel--loop-item-param-1"}}},"taskInfo":{"name":"produce-message"}}' - name: container - value: '{{workflow.parameters.implementations-49ed1f863907856117ffc90bf51c826cc3afd6238936620d7c038bf9a87f40cd}}' + value: '{{workflow.parameters.implementations-00cdf4de1b0ff15d542d6cb54821aeebf3de511087a035956ba6381b97ace0a1}}' - name: task-name value: produce-message - name: parent-dag-id @@ -235,7 +238,7 @@ spec: parameters: - name: parent-dag-id metadata: {} - name: root + name: comp-for-loop-2 outputs: {} - container: args: @@ -322,6 +325,81 @@ spec: valueFrom: default: "true" path: /tmp/outputs/condition + - dag: + tasks: + - arguments: + parameters: + - name: component + value: '{{workflow.parameters.components-comp-for-loop-2}}' + - name: iteration-index + value: '{{inputs.parameters.iteration-index}}' + - name: parent-dag-id + value: '{{inputs.parameters.parent-dag-id}}' + - name: task + value: '{"componentRef":{"name":"comp-for-loop-2"},"parameterIterator":{"itemInput":"pipelinechannel--loop-item-param-1","items":{"raw":"[\"one\", + \"two\", \"three\"]"}},"taskInfo":{"name":"for-loop-2"}}' + name: iteration-item-driver + template: system-dag-driver + - arguments: + parameters: + - name: parent-dag-id + value: '{{tasks.iteration-item-driver.outputs.parameters.execution-id}}' + - name: condition + value: '{{tasks.iteration-item-driver.outputs.parameters.condition}}' + depends: iteration-item-driver.Succeeded + name: iteration-item + template: comp-for-loop-2 + inputs: + parameters: + - name: parent-dag-id + - name: iteration-index + metadata: {} + name: comp-for-loop-2-iteration + outputs: {} + - dag: + tasks: + - arguments: + parameters: + - name: component + value: '{{workflow.parameters.components-comp-for-loop-2}}' + - name: parent-dag-id + value: '{{inputs.parameters.parent-dag-id}}' + - name: task + value: '{"componentRef":{"name":"comp-for-loop-2"},"parameterIterator":{"itemInput":"pipelinechannel--loop-item-param-1","items":{"raw":"[\"one\", + \"two\", \"three\"]"}},"taskInfo":{"name":"for-loop-2"}}' + name: iteration-driver + template: system-dag-driver + - arguments: + parameters: + - name: parent-dag-id + value: '{{tasks.iteration-driver.outputs.parameters.execution-id}}' + - name: iteration-index + value: '{{item}}' + depends: iteration-driver.Succeeded + name: iteration-iterations + template: comp-for-loop-2-iteration + withSequence: + count: '{{tasks.iteration-driver.outputs.parameters.iteration-count}}' + inputs: + parameters: + - name: parent-dag-id + metadata: {} + name: comp-for-loop-2-for-loop-2-iterator + outputs: {} + - dag: + tasks: + - arguments: + parameters: + - name: parent-dag-id + value: '{{inputs.parameters.parent-dag-id}}' + name: for-loop-2 + template: comp-for-loop-2-for-loop-2-iterator + inputs: + parameters: + - name: parent-dag-id + metadata: {} + name: root + outputs: {} - dag: tasks: - arguments: diff --git a/test_data/sdk_compiled_pipelines/valid/essential/pipeline_with_run_parallelism.py b/test_data/sdk_compiled_pipelines/valid/essential/pipeline_with_run_parallelism.py index 0e13c399844..a938863edba 100644 --- a/test_data/sdk_compiled_pipelines/valid/essential/pipeline_with_run_parallelism.py +++ b/test_data/sdk_compiled_pipelines/valid/essential/pipeline_with_run_parallelism.py @@ -17,17 +17,18 @@ @dsl.component -def produce_message() -> str: - return 'hello parallel world' +def produce_message(msg: str) -> str: + print(msg) + return msg @dsl.pipeline( name='pipeline-with-run-parallelism', - pipeline_config=dsl.PipelineConfig(pipeline_run_parallelism=7), + pipeline_config=dsl.PipelineConfig(pipeline_run_parallelism=2), ) -def pipeline_with_run_parallelism() -> str: - task = produce_message() - return task.output +def pipeline_with_run_parallelism(): + with dsl.ParallelFor(items=['one', 'two', 'three']) as item: + produce_message(msg=item) if __name__ == '__main__': diff --git a/test_data/sdk_compiled_pipelines/valid/essential/pipeline_with_run_parallelism.yaml b/test_data/sdk_compiled_pipelines/valid/essential/pipeline_with_run_parallelism.yaml index 4bde422f313..5b644d05e68 100644 --- a/test_data/sdk_compiled_pipelines/valid/essential/pipeline_with_run_parallelism.yaml +++ b/test_data/sdk_compiled_pipelines/valid/essential/pipeline_with_run_parallelism.yaml @@ -1,10 +1,30 @@ # PIPELINE DEFINITION # Name: pipeline-with-run-parallelism -# Outputs: -# Output: str components: + comp-for-loop-2: + dag: + tasks: + produce-message: + cachingOptions: + enableCache: true + componentRef: + name: comp-produce-message + inputs: + parameters: + msg: + componentInputParameter: pipelinechannel--loop-item-param-1 + taskInfo: + name: produce-message + inputDefinitions: + parameters: + pipelinechannel--loop-item-param-1: + parameterType: STRING comp-produce-message: executorLabel: exec-produce-message + inputDefinitions: + parameters: + msg: + parameterType: STRING outputDefinitions: parameters: Output: @@ -37,35 +57,27 @@ deploymentSpec: ' - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ - \ *\n\ndef produce_message() -> str:\n return 'hello parallel world'\n\ - \n" + \ *\n\ndef produce_message(msg: str) -> str:\n print(msg)\n return\ + \ msg\n\n" image: python:3.11 pipelineInfo: name: pipeline-with-run-parallelism root: dag: - outputs: - parameters: - Output: - valueFromParameter: - outputParameterKey: Output - producerSubtask: produce-message tasks: - produce-message: - cachingOptions: - enableCache: true + for-loop-2: componentRef: - name: comp-produce-message + name: comp-for-loop-2 + parameterIterator: + itemInput: pipelinechannel--loop-item-param-1 + items: + raw: '["one", "two", "three"]' taskInfo: - name: produce-message - outputDefinitions: - parameters: - Output: - parameterType: STRING + name: for-loop-2 schemaVersion: 2.1.0 sdkVersion: kfp-2.14.6 --- platforms: kubernetes: pipelineConfig: - pipelineRunParallelism: 7 + pipelineRunParallelism: 2 From dcbc311f7cd7f2c87045e9aa5118ceb1db7ed731 Mon Sep 17 00:00:00 2001 From: sduvvuri1603 Date: Wed, 26 Nov 2025 09:38:12 -0500 Subject: [PATCH 10/10] test: validate pipeline run parallelism e2e Signed-off-by: sduvvuri1603 --- backend/test/end2end/pipeline_e2e_test.go | 1 + backend/test/end2end/utils/e2e_utils.go | 88 +++++++++++++++++++ .../pipeline_with_run_parallelism.yaml | 20 +++-- .../pipeline_with_run_parallelism.py | 12 ++- .../pipeline_with_run_parallelism.yaml | 13 ++- 5 files changed, 117 insertions(+), 17 deletions(-) diff --git a/backend/test/end2end/pipeline_e2e_test.go b/backend/test/end2end/pipeline_e2e_test.go index ed0a0536ca5..836c7860835 100644 --- a/backend/test/end2end/pipeline_e2e_test.go +++ b/backend/test/end2end/pipeline_e2e_test.go @@ -257,5 +257,6 @@ func validatePipelineRunSuccess(pipelineFile string, pipelineDir string, testCon } compiledWorkflow := workflowutils.UnmarshallWorkflowYAML(filepath.Join(testutil.GetCompiledWorkflowsFilesDir(), pipelineFile)) e2e_utils.ValidateComponentStatuses(runClient, k8Client, testContext, createdRunID, compiledWorkflow) + e2e_utils.ValidateParallelismIfConfigured(runClient, pipelineFilePath, createdRunID) } diff --git a/backend/test/end2end/utils/e2e_utils.go b/backend/test/end2end/utils/e2e_utils.go index 09526501700..e3dc1771e35 100644 --- a/backend/test/end2end/utils/e2e_utils.go +++ b/backend/test/end2end/utils/e2e_utils.go @@ -4,12 +4,15 @@ package utils import ( "fmt" "maps" + "path/filepath" "sort" + "strings" "time" runparams "github.com/kubeflow/pipelines/backend/api/v2beta1/go_http_client/run_client/run_service" "github.com/kubeflow/pipelines/backend/api/v2beta1/go_http_client/run_model" apiserver "github.com/kubeflow/pipelines/backend/src/common/client/api_server/v2" + workflowutils "github.com/kubeflow/pipelines/backend/test/compiler/utils" "github.com/kubeflow/pipelines/backend/test/config" "github.com/kubeflow/pipelines/backend/test/logger" "github.com/kubeflow/pipelines/backend/test/testutil" @@ -93,6 +96,91 @@ func ValidateComponentStatuses(runClient *apiserver.RunClient, k8Client *kuberne } +// ValidateParallelismIfConfigured checks pipelineRunParallelism constraints when set. +func ValidateParallelismIfConfigured(runClient *apiserver.RunClient, pipelineFilePath, runID string) { + spec := testutil.ParseFileToSpecs(pipelineFilePath, false, nil) + if spec == nil || spec.PlatformSpec() == nil { + return + } + k8sSpec, ok := spec.PlatformSpec().GetPlatforms()["kubernetes"] + if !ok || k8sSpec == nil { + return + } + cfg := k8sSpec.GetPipelineConfig() + if cfg == nil || cfg.PipelineRunParallelism == nil { + return + } + limit := cfg.GetPipelineRunParallelism() + + baseName := filepath.Base(pipelineFilePath) + if ext := filepath.Ext(baseName); ext != "" { + baseName = strings.TrimSuffix(baseName, ext) + } + compiledWorkflow := workflowutils.UnmarshallWorkflowYAML( + filepath.Join(testutil.GetCompiledWorkflowsFilesDir(), baseName+".yaml")) + componentTaskNames := map[string]struct{}{} + for _, tmpl := range compiledWorkflow.Spec.Templates { + if tmpl.DAG == nil { + continue + } + for _, task := range tmpl.DAG.Tasks { + if task.Template == "system-container-executor" { + componentTaskNames[task.Name] = struct{}{} + } + } + } + if len(componentTaskNames) == 0 { + return + } + + runIDCopy := runID + runDetail := testutil.GetPipelineRun(runClient, &runIDCopy) + gomega.Expect(runDetail).NotTo(gomega.BeNil()) + gomega.Expect(runDetail.RunDetails).NotTo(gomega.BeNil()) + + type event struct { + at time.Time + delta int + } + var events []event + for _, task := range runDetail.RunDetails.TaskDetails { + if task == nil { + continue + } + name := task.DisplayName + if name == "" { + name = task.TaskID + } + if _, ok := componentTaskNames[name]; !ok { + continue + } + start := time.Time(task.StartTime) + if start.IsZero() { + continue + } + events = append(events, event{at: start, delta: +1}) + end := time.Time(task.EndTime) + if !end.IsZero() { + events = append(events, event{at: end, delta: -1}) + } + } + if len(events) == 0 { + return + } + sort.Slice(events, func(i, j int) bool { + if events[i].at.Equal(events[j].at) { + return events[i].delta < events[j].delta + } + return events[i].at.Before(events[j].at) + }) + running := 0 + for _, ev := range events { + running += ev.delta + gomega.Expect(running).To(gomega.BeNumerically(">=", 0)) + gomega.Expect(running).To(gomega.BeNumerically("<=", limit)) + } +} + // CapturePodLogsForUnsuccessfulTasks - Capture pod logs of a failed component func CapturePodLogsForUnsuccessfulTasks(k8Client *kubernetes.Clientset, testContext *apitests.TestContext, taskDetails []*run_model.V2beta1PipelineTaskDetail) { failedTasks := make(map[string]string) diff --git a/test_data/compiled-workflows/pipeline_with_run_parallelism.yaml b/test_data/compiled-workflows/pipeline_with_run_parallelism.yaml index b29156a319f..15f3432abfb 100644 --- a/test_data/compiled-workflows/pipeline_with_run_parallelism.yaml +++ b/test_data/compiled-workflows/pipeline_with_run_parallelism.yaml @@ -6,9 +6,9 @@ metadata: spec: arguments: parameters: - - name: components-00cdf4de1b0ff15d542d6cb54821aeebf3de511087a035956ba6381b97ace0a1 - value: '{"executorLabel":"exec-produce-message","inputDefinitions":{"parameters":{"msg":{"parameterType":"STRING"}}},"outputDefinitions":{"parameters":{"Output":{"parameterType":"STRING"}}}}' - - name: implementations-00cdf4de1b0ff15d542d6cb54821aeebf3de511087a035956ba6381b97ace0a1 + - name: components-178514df38896ed194ce06e7bfe28c7b70b505ef42774615200c302567af37de + value: '{"executorLabel":"exec-produce-message","inputDefinitions":{"parameters":{"msg":{"parameterType":"STRING"},"sleep_seconds":{"defaultValue":20,"isOptional":true,"parameterType":"NUMBER_INTEGER"}}},"outputDefinitions":{"parameters":{"Output":{"parameterType":"STRING"}}}}' + - name: implementations-178514df38896ed194ce06e7bfe28c7b70b505ef42774615200c302567af37de value: '{"args":["--executor_input","{{$}}","--function_to_execute","produce_message"],"command":["sh","-c","\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip || python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1 @@ -18,12 +18,14 @@ spec: \u003e \"$program_path/ephemeral_component.py\"\n_KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path \"$program_path/ephemeral_component.py\" \"$@\"\n","\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import *\n\ndef - produce_message(msg: str) -\u003e str:\n print(msg)\n return msg\n\n"],"image":"python:3.11"}' + produce_message(msg: str, sleep_seconds: int = 20) -\u003e str:\n import + time\n\n print(f''Processing {msg}...'')\n time.sleep(sleep_seconds)\n return + msg\n\n"],"image":"python:3.9"}' - name: components-comp-for-loop-2 value: '{"dag":{"tasks":{"produce-message":{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-produce-message"},"inputs":{"parameters":{"msg":{"componentInputParameter":"pipelinechannel--loop-item-param-1"}}},"taskInfo":{"name":"produce-message"}}}},"inputDefinitions":{"parameters":{"pipelinechannel--loop-item-param-1":{"parameterType":"STRING"}}}}' - name: components-root value: '{"dag":{"tasks":{"for-loop-2":{"componentRef":{"name":"comp-for-loop-2"},"parameterIterator":{"itemInput":"pipelinechannel--loop-item-param-1","items":{"raw":"[\"one\", - \"two\", \"three\"]"}},"taskInfo":{"name":"for-loop-2"}}}}}' + \"two\", \"three\", \"four\", \"five\"]"}},"taskInfo":{"name":"for-loop-2"}}}}}' entrypoint: entrypoint parallelism: 2 podMetadata: @@ -213,11 +215,11 @@ spec: - arguments: parameters: - name: component - value: '{{workflow.parameters.components-00cdf4de1b0ff15d542d6cb54821aeebf3de511087a035956ba6381b97ace0a1}}' + value: '{{workflow.parameters.components-178514df38896ed194ce06e7bfe28c7b70b505ef42774615200c302567af37de}}' - name: task value: '{"cachingOptions":{"enableCache":true},"componentRef":{"name":"comp-produce-message"},"inputs":{"parameters":{"msg":{"componentInputParameter":"pipelinechannel--loop-item-param-1"}}},"taskInfo":{"name":"produce-message"}}' - name: container - value: '{{workflow.parameters.implementations-00cdf4de1b0ff15d542d6cb54821aeebf3de511087a035956ba6381b97ace0a1}}' + value: '{{workflow.parameters.implementations-178514df38896ed194ce06e7bfe28c7b70b505ef42774615200c302567af37de}}' - name: task-name value: produce-message - name: parent-dag-id @@ -337,7 +339,7 @@ spec: value: '{{inputs.parameters.parent-dag-id}}' - name: task value: '{"componentRef":{"name":"comp-for-loop-2"},"parameterIterator":{"itemInput":"pipelinechannel--loop-item-param-1","items":{"raw":"[\"one\", - \"two\", \"three\"]"}},"taskInfo":{"name":"for-loop-2"}}' + \"two\", \"three\", \"four\", \"five\"]"}},"taskInfo":{"name":"for-loop-2"}}' name: iteration-item-driver template: system-dag-driver - arguments: @@ -366,7 +368,7 @@ spec: value: '{{inputs.parameters.parent-dag-id}}' - name: task value: '{"componentRef":{"name":"comp-for-loop-2"},"parameterIterator":{"itemInput":"pipelinechannel--loop-item-param-1","items":{"raw":"[\"one\", - \"two\", \"three\"]"}},"taskInfo":{"name":"for-loop-2"}}' + \"two\", \"three\", \"four\", \"five\"]"}},"taskInfo":{"name":"for-loop-2"}}' name: iteration-driver template: system-dag-driver - arguments: diff --git a/test_data/sdk_compiled_pipelines/valid/essential/pipeline_with_run_parallelism.py b/test_data/sdk_compiled_pipelines/valid/essential/pipeline_with_run_parallelism.py index a938863edba..0f667a62093 100644 --- a/test_data/sdk_compiled_pipelines/valid/essential/pipeline_with_run_parallelism.py +++ b/test_data/sdk_compiled_pipelines/valid/essential/pipeline_with_run_parallelism.py @@ -16,9 +16,12 @@ from kfp import compiler, dsl -@dsl.component -def produce_message(msg: str) -> str: - print(msg) +@dsl.component(base_image='python:3.9') +def produce_message(msg: str, sleep_seconds: int = 20) -> str: + import time + + print(f'Processing {msg}...') + time.sleep(sleep_seconds) return msg @@ -27,7 +30,8 @@ def produce_message(msg: str) -> str: pipeline_config=dsl.PipelineConfig(pipeline_run_parallelism=2), ) def pipeline_with_run_parallelism(): - with dsl.ParallelFor(items=['one', 'two', 'three']) as item: + loop_args = ['one', 'two', 'three', 'four', 'five'] + with dsl.ParallelFor(items=loop_args) as item: produce_message(msg=item) diff --git a/test_data/sdk_compiled_pipelines/valid/essential/pipeline_with_run_parallelism.yaml b/test_data/sdk_compiled_pipelines/valid/essential/pipeline_with_run_parallelism.yaml index 5b644d05e68..3b60b006468 100644 --- a/test_data/sdk_compiled_pipelines/valid/essential/pipeline_with_run_parallelism.yaml +++ b/test_data/sdk_compiled_pipelines/valid/essential/pipeline_with_run_parallelism.yaml @@ -25,6 +25,10 @@ components: parameters: msg: parameterType: STRING + sleep_seconds: + defaultValue: 20.0 + isOptional: true + parameterType: NUMBER_INTEGER outputDefinitions: parameters: Output: @@ -57,9 +61,10 @@ deploymentSpec: ' - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ - \ *\n\ndef produce_message(msg: str) -> str:\n print(msg)\n return\ - \ msg\n\n" - image: python:3.11 + \ *\n\ndef produce_message(msg: str, sleep_seconds: int = 20) -> str:\n\ + \ import time\n\n print(f'Processing {msg}...')\n time.sleep(sleep_seconds)\n\ + \ return msg\n\n" + image: python:3.9 pipelineInfo: name: pipeline-with-run-parallelism root: @@ -71,7 +76,7 @@ root: parameterIterator: itemInput: pipelinechannel--loop-item-param-1 items: - raw: '["one", "two", "three"]' + raw: '["one", "two", "three", "four", "five"]' taskInfo: name: for-loop-2 schemaVersion: 2.1.0