Skip to content

feat: add KubeflowExecutor for Kubeflow Training Operator (TrainJob CRD)#462

Open
ko3n1g wants to merge 16 commits intomainfrom
feat/pytorchjob-executor
Open

feat: add KubeflowExecutor for Kubeflow Training Operator (TrainJob CRD)#462
ko3n1g wants to merge 16 commits intomainfrom
feat/pytorchjob-executor

Conversation

@ko3n1g
Copy link
Contributor

@ko3n1g ko3n1g commented Mar 12, 2026

Summary

  • Adds `KubeflowExecutor` that submits distributed training jobs to any Kubernetes cluster running the Kubeflow Training Operator
  • Supports both PyTorchJob (Training Operator v1) and TrainJob (Training Operator v2) via a `job_kind` toggle
  • Pairs with a TorchX scheduler so jobs integrate with `run.run()` and `run.Experiment`
  • Kubernetes config loaded automatically (local kubeconfig → in-cluster fallback)

PyTorchJob vs TrainJob

PyTorchJob TrainJob
API `kubeflow.org/v1` `trainer.kubeflow.org/v1alpha1`
Pod config directly in replica pod spec `podTemplateOverrides[].spec`
`nproc` `spec.nprocPerNode` `spec.trainer.numProcPerNode`

Notable fields

  • `tolerations`, `affinity` — go into pod spec / `podTemplateOverrides` automatically
  • `env_list` — full env var dicts supporting `valueFrom` / `secretKeyRef`
  • `pod_spec_overrides` — arbitrary extra pod spec fields (e.g. `resourceClaims` for IMEX channels)
  • `launch(wait=True)` — polls until `RUNNING` / `SUCCEEDED` / `FAILED`
  • `cancel(wait=True)` — polls until CR gone and all pods terminated
  • `UNKNOWN`/`None` status → `AppState.PENDING` (avoids false failures on transient API errors)

Minimal E2E example

```python
import nemo_run as run
from nemo_run.core.execution.kubeflow import KubeflowExecutor

executor = KubeflowExecutor(
namespace="my-namespace",
image="pytorch/pytorch:2.3.0-cuda12.1-cudnn8-runtime",
num_nodes=2,
gpus_per_node=8,
launcher=run.Torchrun(), # torchrun args injected automatically
volumes=[{"name": "data", "persistentVolumeClaim": {"claimName": "my-pvc"}}],
volume_mounts=[{"name": "data", "mountPath": "/data"}],
)

script = run.Script("train.py")

run.run(script, executor=executor, name="my-training-job")
```

Test plan

  • 63 unit tests passing (`pytest test/core/execution/test_kubeflow.py test/run/torchx_backend/schedulers/test_kubeflow.py`)
  • PyTorchJob e2e verified against AWS EKS (`local/example.py`): launch → RUNNING → log sentinel → cancel(wait=True)
  • TrainJob e2e pending GKE cluster readiness (`local/example_trainjob.py`)

🤖 Generated with Claude Code

…ent, test cleanup)

- Add explanatory comment to empty AttributeError except in _get_job_dirs
  (backwards-compat field migration — absence is expected and handled)
- Add noqa + comment to Jinja2 Environment for shell-script template
  (autoescape intentionally disabled for .sh/.j2; no XSS risk)
- Remove unused _raise_on_read helper in test_fetch_logs_stream_handles_exception
- Use sys.modules lookup instead of duplicate import in test_import_error_when_kubernetes_unavailable

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Signed-off-by: oliver könig <okoenig@nvidia.com>
Copy link
Contributor

@chtruong814 chtruong814 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome. I think we should drop the v1 support. It's deprecated. Had some other feedback. Please take a look.

runtime_ref: str = "torch-distributed"
namespace: str = "default"
image: str = ""
num_nodes: int = 2
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would num_nodes default to 2?

try:
cfg = serializer.deserialize(app["executor"])
# Backwards compat: migrate renamed field nproc_per_node → nprocs_per_node.
# AttributeError means the field doesn't exist so no migration is needed.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this because of the v1 vs v2 spec difference?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure.. might be something we can reduce.. i'll check

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay that was a local caching issue, this backward compat isn't required anymore

ko3n1g and others added 4 commits March 16, 2026 16:18
Signed-off-by: oliver könig <okoenig@nvidia.com>
Signed-off-by: oliver könig <okoenig@nvidia.com>
Signed-off-by: oliver könig <okoenig@nvidia.com>
PyTorchJob (Training Operator v1) is deprecated in favour of TrainJob
(Training Operator v2).  Simplify the executor to support TrainJob only:

- Remove PyTorchJob constants, `job_kind` field, `_get_pytorchjob_body`,
  and all PyTorchJob branches in `_group`, `_version`, `_plural`,
  `_pod_label_selector`, `get_job_body`, and `status`.
- Inline the trivial `_group()`, `_version()`, `_plural()`, and
  `_pod_label_selector()` helpers; callers now reference the
  `_TRAINJOB_*` constants and the label-selector format string directly.
- Rename `_get_trainjob_body` → `get_job_body` (drop the one-line wrapper).
- Remove backwards-compat `nproc_per_node → nprocs_per_node` migration
  block in `schedulers/kubeflow.py` (only relevant for legacy PyTorchJob
  persisted state).
- Add docstrings to all public methods that lacked them.
- Update tests: remove PyTorchJob-specific tests, drop `job_kind="TrainJob"`
  params (now the only kind), fix status/launch-wait fixtures to use the
  TrainJob `jobsStatus` format.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Signed-off-by: oliver könig <okoenig@nvidia.com>
…or state

Persisted entries in ~/.nemo_run/.kubeflow_jobs.json written before
PyTorchJob was removed still carry job_kind in their serialized Fiddle
config.  Strip it before fdl.build() to avoid a TypeError on status
polling and log fetching for those old runs.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Signed-off-by: oliver könig <okoenig@nvidia.com>
…utor state

Entries written before the nproc_per_node → nprocs_per_node rename still
exist in ~/.nemo_run/.kubeflow_jobs.json.  Migrate the value and drop the
old key alongside the existing job_kind removal so both old field names are
handled in one place.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Signed-off-by: oliver könig <okoenig@nvidia.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Signed-off-by: oliver könig <okoenig@nvidia.com>
kubectl enforces a default max of 5 concurrent log requests when using
a label selector. Pass --max-log-requests=num_nodes so fetch_logs works
correctly for larger jobs.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Signed-off-by: oliver könig <okoenig@nvidia.com>
ko3n1g added 2 commits March 17, 2026 21:31
Signed-off-by: oliver könig <okoenig@nvidia.com>
Signed-off-by: oliver könig <okoenig@nvidia.com>
chtruong814
chtruong814 previously approved these changes Mar 17, 2026
@ko3n1g ko3n1g changed the title feat: add KubeflowExecutor for Kubeflow Training Operator (PyTorchJob + TrainJob) feat: add KubeflowExecutor for Kubeflow Training Operator (TrainJob CRD) Mar 17, 2026
Comment on lines +146 to +148
elif fn_or_script.inline and role_args:
# Inline scripts are written to a file; role_args[0] is the pod-side path
script = role_args[0]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will this work for slurm as well?

Replace the brittle `lines_yielded > 0` and 10-minute deadline heuristics
with `status()`-based termination: the retry loop now runs until the job
reaches SUCCEEDED or FAILED, handling slow container pulls, mid-stream
crashes, and transient network failures correctly.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Signed-off-by: oliver könig <okoenig@nvidia.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Signed-off-by: oliver könig <okoenig@nvidia.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants