Skip to content

feat: PyTorchJobExecutor for Kubeflow Training Operator#461

Closed
svcnvidia-nemo-ci wants to merge 3 commits intomainfrom
feat/pytorchjob-executor
Closed

feat: PyTorchJobExecutor for Kubeflow Training Operator#461
svcnvidia-nemo-ci wants to merge 3 commits intomainfrom
feat/pytorchjob-executor

Conversation

@svcnvidia-nemo-ci
Copy link
Contributor

Summary

  • Adds PyTorchJobExecutor that builds and submits PyTorchJob CRDs to a Kubernetes cluster running the Kubeflow Training Operator
  • Pairs with a TorchX scheduler so jobs integrate with run.run() and run.Experiment
  • Kubernetes config loaded automatically (local kubeconfig → in-cluster fallback)
  • cancel(wait=True) polls until both the CR and all associated pods are fully terminated
  • Full TDD: tests written before implementation; covers happy path, error cases, state mapping, and persistence

Test plan

  • uv run -- pytest test/core/execution/test_pytorchjob.py test/run/torchx_backend/schedulers/test_pytorchjob.py -v passes (44/44)
  • uv run --group lint -- ruff check --fix . && uv run --group lint -- ruff format . clean
  • End-to-end: launch()status()cancel(wait=True) cycle against a real cluster

🤖 Generated with Claude Code

…etes

Introduces PyTorchJobExecutor and a matching TorchX scheduler so users
can deploy distributed PyTorchJobs to any Kubernetes cluster running the
Kubeflow Training Operator via run.run() / run.Experiment.

- PyTorchJobExecutor builds and submits PyTorchJob CRDs via the K8s API
  (local kubeconfig with in-cluster fallback)
- cancel(wait=True) polls until both the CR and all associated pods are
  fully terminated
- TorchX scheduler persists job state and maps PyTorchJobState -> AppState
- Full TDD: tests written before implementation
- Documentation added to docs/guides/execution.md

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
return None
executor.cancel(job_name)

def list(self) -> list[ListAppResponse]: ...

Check notice

Code scanning / CodeQL

Statement has no effect Note

This statement has no effect.

Copilot Autofix

AI 10 days ago

In general, to fix a “statement has no effect” caused by a bare ... in a concrete method, replace the ellipsis with a meaningful implementation or, if the method is intentionally unsupported, with an explicit raise NotImplementedError (or similar) so the intent is clear and the code has an observable effect.

Here, PyTorchJobScheduler.list is declared but unimplemented. Without changing existing functionality (i.e., without guessing at how to list jobs), the safest, least-invasive fix is to replace the ... body with a raise NotImplementedError explaining that listing is not yet supported for PyTorchJobScheduler. This turns the no-op ellipsis into a deliberate runtime error if the method is called, which is standard practice for unimplemented interface methods.

Specifically, in nemo_run/run/torchx_backend/schedulers/pytorchjob.py, at the def list method around line 197, replace the entire line def list(self) -> list[ListAppResponse]: ... with a multi-line method definition:

    def list(self) -> list[ListAppResponse]:
        raise NotImplementedError("Listing apps is not implemented for PyTorchJobScheduler.")

No new imports or helper methods are required.

Suggested changeset 1
nemo_run/run/torchx_backend/schedulers/pytorchjob.py

Autofix patch

Autofix patch
Run the following command in your local git repository to apply this patch
cat << 'EOF' | git apply
diff --git a/nemo_run/run/torchx_backend/schedulers/pytorchjob.py b/nemo_run/run/torchx_backend/schedulers/pytorchjob.py
--- a/nemo_run/run/torchx_backend/schedulers/pytorchjob.py
+++ b/nemo_run/run/torchx_backend/schedulers/pytorchjob.py
@@ -194,7 +194,8 @@
             return None
         executor.cancel(job_name)
 
-    def list(self) -> list[ListAppResponse]: ...
+    def list(self) -> list[ListAppResponse]:
+        raise NotImplementedError("Listing apps is not implemented for PyTorchJobScheduler.")
 
     def _validate(self, app: AppDef, scheduler: str) -> None:
         pass
EOF
@@ -194,7 +194,8 @@
return None
executor.cancel(job_name)

def list(self) -> list[ListAppResponse]: ...
def list(self) -> list[ListAppResponse]:
raise NotImplementedError("Listing apps is not implemented for PyTorchJobScheduler.")

def _validate(self, app: AppDef, scheduler: str) -> None:
pass
Copilot is powered by AI and may make mistakes. Always verify output.
Aligns the constructor parameter name with the plan spec and example.py.
The field now shadows the base-class method, which is intentional:
PyTorchJob specifies parallelism in spec.nprocPerNode, not via the
TorchX Torchrun launcher machinery.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
- launch() gains wait=True/timeout/poll_interval: blocks until RUNNING,
  SUCCEEDED, or FAILED — callers no longer need to poll manually
- fetch_logs: stream=False uses subprocess.run (tolerates pods still
  initializing); stream=True uses Popen + generator, matching DGXCloud
  streaming behaviour
- local/example.py: full e2e cycle — launch(wait=True), poll logs until
  sentinel 'NEMO_TEST_OK' appears, assert, cancel(wait=True)
- 46/46 tests pass; verified against real cluster

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
@ko3n1g
Copy link
Contributor

ko3n1g commented Mar 12, 2026

Closing to recreate under correct author.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants