Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions docs/guides/execution.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ The packager support matrix is described below:
| SkypilotExecutor | run.Packager, run.GitArchivePackager, run.PatternPackager, run.HybridPackager |
| DGXCloudExecutor | run.Packager, run.GitArchivePackager, run.PatternPackager, run.HybridPackager |
| LeptonExecutor | run.Packager, run.GitArchivePackager, run.PatternPackager, run.HybridPackager |
| PyTorchJobExecutor | run.Packager |

`run.Packager` is a passthrough base packager.

Expand Down Expand Up @@ -293,6 +294,34 @@ def your_dgx_executor(nodes: int, gpus_per_node: int, container_image: str):

For a complete end-to-end example using DGX Cloud with NeMo, refer to the [NVIDIA DGX Cloud NeMo End-to-End Workflow Example](https://docs.nvidia.com/dgx-cloud/run-ai/latest/nemo-e2e-example.html).

#### PyTorchJobExecutor

The `PyTorchJobExecutor` integrates with the [Kubeflow Training Operator](https://github.com/kubeflow/training-operator) to run distributed PyTorchJobs on any Kubernetes cluster. It submits PyTorchJob CRDs directly via the Kubernetes API — no `kubectl` or separate tooling required for job submission.

Kubernetes configuration is loaded automatically: local kubeconfig is tried first, falling back to in-cluster config when running inside a pod.

Here's an example configuration:

```python
executor = run.PyTorchJobExecutor(
namespace="runai-nemo-ci",
image="nvcr.io/nvidian/nemo:nightly",
num_workers=2, # Worker replicas; a Master replica is always added
nproc_per_node=8, # Maps to spec.nprocPerNode
gpus_per_node=8,
cpu_requests="16",
memory_requests="64Gi",
volumes=[
{"name": "model-cache", "persistentVolumeClaim": {"claimName": "nemo-ci-datasets-project-nkf5l"}}
],
volume_mounts=[{"name": "model-cache", "mountPath": "/nemo-workspace"}],
labels={"app": "nemo-ci-training"},
env_vars={"NCCL_DEBUG": "INFO"},
)
```

`cancel(wait=True)` polls until both the PyTorchJob CR and all associated pods are fully terminated before returning.

#### LeptonExecutor

The `LeptonExecutor` integrates with an NVIDIA DGX Cloud Lepton cluster's Python SDK to launch distributed jobs. It uses API calls behind the Lepton SDK to authenticate, identify the target node group and resource shapes, and submit the job specification which will be launched as a batch job on the cluster.
Expand Down
62 changes: 62 additions & 0 deletions local/example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import time

from nemo_run.core.execution.pytorchjob import PyTorchJobExecutor

EXPECTED_LOG_CONTENT = "NEMO_TEST_OK"

e = PyTorchJobExecutor(
namespace="runai-nemo-ci",
image="nvcr.io/nvidian/nemo:nightly",
num_workers=2,
nproc_per_node=8,
gpus_per_node=8,
cpu_requests="16",
memory_requests="64Gi",
volumes=[
{
"name": "model-cache",
"persistentVolumeClaim": {"claimName": "nemo-ci-datasets-project-nkf5l"},
}
],
volume_mounts=[{"name": "model-cache", "mountPath": "/nemo-workspace"}],
labels={"app": "nemo-ci-training"},
)

# Script: print the sentinel, then sleep so we can read logs and cancel cleanly
cmd = [
"/bin/bash",
"-c",
f"echo 'print(\"{EXPECTED_LOG_CONTENT}\"); import time; time.sleep(300)' > /tmp/test.py && "
"torchrun --nnodes=$PET_NNODES --nproc_per_node=$PET_NPROC_PER_NODE "
"--node_rank=$RANK --master_addr=$MASTER_ADDR --master_port=$MASTER_PORT /tmp/test.py",
]

# ── Launch and wait until RUNNING ────────────────────────────────────────────
job_name, state = e.launch("nemo-ci-training", cmd, wait=True, timeout=300)
print(f"Launched: {job_name}, state: {state}")

# ── Fetch logs and verify sentinel ────────────────────────────────────────────
print("Polling logs until sentinel appears (up to 2 min)...")
logs = []
deadline = time.time() + 120
while time.time() < deadline:
logs = list(e.fetch_logs(job_name, stream=False, lines=50))
if any(EXPECTED_LOG_CONTENT in line for line in logs):
break
print(f" waiting for sentinel ({len(logs)} lines so far)...")
time.sleep(5)

print(f" received {len(logs)} lines")
for line in logs[:5]:
print(f" | {line}")

assert any(EXPECTED_LOG_CONTENT in line for line in logs), (
f"Expected '{EXPECTED_LOG_CONTENT}' not found in logs.\nGot: {logs}"
)
print(f"✓ Log sentinel '{EXPECTED_LOG_CONTENT}' verified")

# ── Cancel and wait for full cleanup ─────────────────────────────────────────
print("Cancelling job and waiting for cleanup...")
cleaned = e.cancel(job_name, wait=True, timeout=120)
assert cleaned, "Cleanup failed — pods or CR still present after timeout"
print("Full cycle complete without kubectl")
2 changes: 2 additions & 0 deletions nemo_run/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from nemo_run.core.execution.base import Executor, ExecutorMacros, import_executor
from nemo_run.core.execution.dgxcloud import DGXCloudExecutor
from nemo_run.core.execution.docker import DockerExecutor
from nemo_run.core.execution.pytorchjob import PyTorchJobExecutor
from nemo_run.core.execution.launcher import FaultTolerance, SlurmRay, SlurmTemplate, Torchrun
from nemo_run.core.execution.lepton import LeptonExecutor
from nemo_run.core.execution.local import LocalExecutor
Expand Down Expand Up @@ -66,6 +67,7 @@
"Packager",
"Partial",
"Plugin",
"PyTorchJobExecutor",
"run",
"Script",
"SkypilotExecutor",
Expand Down
2 changes: 2 additions & 0 deletions nemo_run/core/execution/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from nemo_run.core.execution.dgxcloud import DGXCloudExecutor
from nemo_run.core.execution.lepton import LeptonExecutor
from nemo_run.core.execution.local import LocalExecutor
from nemo_run.core.execution.pytorchjob import PyTorchJobExecutor
from nemo_run.core.execution.skypilot import SkypilotExecutor
from nemo_run.core.execution.slurm import SlurmExecutor

Expand All @@ -25,4 +26,5 @@
"SkypilotExecutor",
"DGXCloudExecutor",
"LeptonExecutor",
"PyTorchJobExecutor",
]
Loading
Loading