Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ See the Kubeflow [Pipelines API doc](https://www.kubeflow.org/docs/components/pi

Consult the [Python SDK reference docs](https://kubeflow-pipelines.readthedocs.io/en/stable/) when writing pipelines using the Python SDK.

> New in master: `dsl.PipelineConfig` now accepts an optional `pipeline_run_parallelism` integer to cap concurrent task execution for a run. The backend stores the requested limit in a shared ConfigMap and surfaces it to Argo Workflows via `spec.parallelism`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may be more appropriate to add this entry to the CHANGELOG.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, but will this be a part of a new section called "Unreleased Features" ? because I only see version release details in the file.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe the PR will be included here as part of the release process. @mprahl , could you confirm if that’s correct?


## Deep Wiki
Check out our AI Powered repo documentation on [DeepWiki](https://deepwiki.com/kubeflow/pipelines).

Expand Down
24 changes: 18 additions & 6 deletions api/v2alpha1/go/pipelinespec/pipeline_spec.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions api/v2alpha1/pipeline_spec.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1176,4 +1176,8 @@ message PipelineConfig {
// Configuration for a shared storage workspace that persists for the duration of the pipeline run.
// The workspace can be configured with size and Kubernetes-specific settings to override default PVC configurations.
optional WorkspaceConfig workspace = 2;

// Maximum number of tasks that can be scheduled simultaneously for a single
// pipeline run.
optional int32 pipeline_run_parallelism = 3;
}
5 changes: 5 additions & 0 deletions backend/src/apiserver/client/kubernetes_core.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

type KubernetesCoreInterface interface {
PodClient(namespace string) v1.PodInterface
ConfigMapClient(namespace string) v1.ConfigMapInterface
GetClientSet() kubernetes.Interface
}

Expand All @@ -24,6 +25,10 @@ func (c *KubernetesCore) PodClient(namespace string) v1.PodInterface {
return c.coreV1Client.Pods(namespace)
}

func (c *KubernetesCore) ConfigMapClient(namespace string) v1.ConfigMapInterface {
return c.coreV1Client.ConfigMaps(namespace)
}

func (c *KubernetesCore) GetClientSet() kubernetes.Interface {
return c.clientSet
}
Expand Down
33 changes: 27 additions & 6 deletions backend/src/apiserver/client/kubernetes_core_fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,14 @@ import (
policyv1 "k8s.io/api/policy/v1"
policyv1beta1 "k8s.io/api/policy/v1beta1"
"k8s.io/client-go/kubernetes"
kubernetesfake "k8s.io/client-go/kubernetes/fake"
v1 "k8s.io/client-go/kubernetes/typed/core/v1"
)

type FakeKuberneteCoreClient struct {
podClientFake *FakePodClient
coreClient v1.CoreV1Interface
clientset kubernetes.Interface
}

func (c *FakeKuberneteCoreClient) PodClient(namespace string) v1.PodInterface {
Expand All @@ -35,30 +38,48 @@ func (c *FakeKuberneteCoreClient) PodClient(namespace string) v1.PodInterface {
return c.podClientFake
}

func (c *FakeKuberneteCoreClient) ConfigMapClient(namespace string) v1.ConfigMapInterface {
return c.coreClient.ConfigMaps(namespace)
}

func (c *FakeKuberneteCoreClient) GetClientSet() kubernetes.Interface {
// Return nil for fake implementation - tests that need this should use a mock
return nil
return c.clientset
}

func NewFakeKuberneteCoresClient() *FakeKuberneteCoreClient {
return &FakeKuberneteCoreClient{&FakePodClient{}}
clientset := kubernetesfake.NewSimpleClientset()
return &FakeKuberneteCoreClient{
podClientFake: &FakePodClient{},
coreClient: clientset.CoreV1(),
clientset: clientset,
}
}

type FakeKubernetesCoreClientWithBadPodClient struct {
podClientFake *FakeBadPodClient
coreClient v1.CoreV1Interface
clientset kubernetes.Interface
}

func NewFakeKubernetesCoreClientWithBadPodClient() *FakeKubernetesCoreClientWithBadPodClient {
return &FakeKubernetesCoreClientWithBadPodClient{&FakeBadPodClient{}}
clientset := kubernetesfake.NewSimpleClientset()
return &FakeKubernetesCoreClientWithBadPodClient{
podClientFake: &FakeBadPodClient{},
coreClient: clientset.CoreV1(),
clientset: clientset,
}
}

func (c *FakeKubernetesCoreClientWithBadPodClient) PodClient(namespace string) v1.PodInterface {
return c.podClientFake
}

func (c *FakeKubernetesCoreClientWithBadPodClient) ConfigMapClient(namespace string) v1.ConfigMapInterface {
return c.coreClient.ConfigMaps(namespace)
}

func (c *FakeKubernetesCoreClientWithBadPodClient) GetClientSet() kubernetes.Interface {
// Return nil for fake implementation
return nil
return c.clientset
}

func (c *FakePodClient) EvictV1(context.Context, *policyv1.Eviction) error {
Expand Down
72 changes: 72 additions & 0 deletions backend/src/apiserver/resource/resource_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ import (
utilerrors "k8s.io/apimachinery/pkg/util/errors"
)

const pipelineParallelismConfigMapName = "kfp-pipeline-config"

// Metric variables. Please prefix the metric names with resource_manager_.
var (
extraLabels = []string{
Expand Down Expand Up @@ -153,6 +155,65 @@ func NewResourceManager(clientManager ClientManagerInterface, options *ResourceM
}
}

func workflowParallelism(executionSpec util.ExecutionSpec) (int64, bool) {
if executionSpec == nil {
return 0, false
}
wf, ok := executionSpec.(*util.Workflow)
if !ok || wf == nil || wf.Spec.Parallelism == nil {
return 0, false
}
return *wf.Spec.Parallelism, true
}

func (r *ResourceManager) upsertPipelineParallelismConfigMap(ctx context.Context, namespace, key string, parallelism int64) error {
if key == "" {
return nil
}

configMaps := r.k8sCoreClient.ConfigMapClient(namespace)
value := strconv.FormatInt(parallelism, 10)

for {
current, err := configMaps.Get(ctx, pipelineParallelismConfigMapName, v1.GetOptions{})
if apierrors.IsNotFound(err) {
newConfigMap := &corev1.ConfigMap{
ObjectMeta: v1.ObjectMeta{
Name: pipelineParallelismConfigMapName,
Namespace: namespace,
},
Data: map[string]string{
key: value,
},
}
if _, err := configMaps.Create(ctx, newConfigMap, v1.CreateOptions{}); apierrors.IsAlreadyExists(err) {
continue
} else if err != nil {
return util.Wrap(err, "failed to create pipeline parallelism ConfigMap entry")
}
return nil
}
if err != nil {
return util.Wrap(err, "failed to retrieve pipeline parallelism ConfigMap")
}

if current.Data == nil {
current.Data = map[string]string{}
}
if existing, ok := current.Data[key]; ok && existing == value {
return nil
}

current.Data[key] = value
if _, err := configMaps.Update(ctx, current, v1.UpdateOptions{}); apierrors.IsConflict(err) {
continue
} else if err != nil {
return util.Wrap(err, "failed to update pipeline parallelism ConfigMap entry")
}
return nil
}
}

func (r *ResourceManager) getWorkflowClient(namespace string) util.ExecutionInterface {
return r.execClient.Execution(namespace)
}
Expand Down Expand Up @@ -562,6 +623,17 @@ func (r *ResourceManager) CreateRun(ctx context.Context, run *model.Run) (*model
}
executionSpec.SetExecutionNamespace(k8sNamespace)

if parallelism, ok := workflowParallelism(executionSpec); ok && parallelism > 0 && run.PipelineVersionId != "" {
if err := r.upsertPipelineParallelismConfigMap(
ctx, k8sNamespace, run.PipelineVersionId, parallelism); err != nil {
if apierrors.IsForbidden(errors.Cause(err)) {
glog.Warningf("Skipping pipeline_run_parallelism ConfigMap update in namespace %q: %v", k8sNamespace, err)
} else {
return nil, util.Wrap(err, "Failed to persist pipeline_run_parallelism configuration")
}
}
}

// assign OwnerReference to scheduledworkflow
if run.RecurringRunId != "" {
job, err := r.jobStore.GetJob(run.RecurringRunId)
Expand Down
68 changes: 68 additions & 0 deletions backend/src/apiserver/resource/resource_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1936,6 +1936,66 @@ func TestCreateRun_ThroughPipelineVersion(t *testing.T) {
assert.Equal(t, expectedRunDetail.ToV1(), runDetail.ToV1(), "CreateRun stored invalid data in database")
}

func TestCreateRun_PipelineRunParallelismConfigMap(t *testing.T) {
initEnvVars()

store := NewFakeClientManagerOrFatal(util.NewFakeTimeForEpoch())
defer store.Close()

manager := NewResourceManager(store, &ResourceManagerOptions{CollectMetrics: false})

pipeline, err := manager.CreatePipeline(createPipeline("parallelism-pipeline", "", "kubeflow"))
assert.NoError(t, err)

pipelineStore, ok := store.pipelineStore.(*storage.PipelineStore)
assert.True(t, ok)
pipelineStore.SetUUIDGenerator(util.NewFakeUUIDGeneratorOrFatal(FakeUUIDOne, nil))

pv := createPipelineVersion(
pipeline.UUID,
"parallelism-version",
"",
"",
v2SpecHelloWorldWithParallelism,
"",
"kubeflow",
)
version, err := manager.CreatePipelineVersion(pv)
assert.NoError(t, err)

run := &model.Run{
DisplayName: "run-with-parallelism",
Namespace: "kubeflow",
PipelineSpec: model.PipelineSpec{
PipelineId: pipeline.UUID,
PipelineVersionId: version.UUID,
RuntimeConfig: model.RuntimeConfig{
Parameters: "{\"text\":\"hello\"}",
},
},
}

createdRun, err := manager.CreateRun(context.Background(), run)
assert.NoError(t, err)
assert.NotNil(t, createdRun)

configMap, err := store.k8sCoreClientFake.ConfigMapClient("kubeflow").Get(
context.Background(), pipelineParallelismConfigMapName, v1.GetOptions{})
assert.NoError(t, err)
if assert.NotNil(t, configMap.Data) {
assert.Equal(t, "5", configMap.Data[version.UUID])
}

execSpec, err := store.ExecClientFake.Execution("kubeflow").Get(
context.Background(), createdRun.K8SName, v1.GetOptions{})
assert.NoError(t, err)
workflow, ok := execSpec.(*util.Workflow)
assert.True(t, ok)
if assert.NotNil(t, workflow.Spec.Parallelism) {
assert.Equal(t, int64(5), *workflow.Spec.Parallelism)
}
}

func TestCreateRun_ThroughPipelineIdAndPipelineVersion(t *testing.T) {
// Create experiment, pipeline, and pipeline version.
store, manager, experiment, pipeline, _ := initWithExperimentAndPipeline(t)
Expand Down Expand Up @@ -4047,6 +4107,14 @@ schemaVersion: 2.1.0
sdkVersion: kfp-1.6.5
`

var v2SpecHelloWorldWithParallelism = v2SpecHelloWorld + `
---
platforms:
kubernetes:
pipelineConfig:
pipelineRunParallelism: 5
`

var v2SpecHelloWorldMutated = `
components:
comp-hello-world:
Expand Down
5 changes: 5 additions & 0 deletions backend/src/cache/client/kubernetes_core.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

type KubernetesCoreInterface interface {
PodClient(namespace string) v1.PodInterface
ConfigMapClient(namespace string) v1.ConfigMapInterface
}

type KubernetesCore struct {
Expand All @@ -23,6 +24,10 @@ func (c *KubernetesCore) PodClient(namespace string) v1.PodInterface {
return c.coreV1Client.Pods(namespace)
}

func (c *KubernetesCore) ConfigMapClient(namespace string) v1.ConfigMapInterface {
return c.coreV1Client.ConfigMaps(namespace)
}

func createKubernetesCore(clientParams util.ClientParameters) (KubernetesCoreInterface, error) {
restConfig, err := util.GetKubernetesConfig()
if err != nil {
Expand Down
Loading
Loading