From 79100b2e3bad613d327cb6776ecf54268fdca8c8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?oliver=20k=C3=B6nig?= Date: Mon, 16 Mar 2026 10:28:07 +0000 Subject: [PATCH 1/7] feat: add Experiment.export() to write runnable scripts without submitting MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds `Experiment.export(output_dir)` which writes one script per job into a self-contained directory plus a `submit_all.sh` launcher, enabling users to inspect, version, and manually submit jobs without going through the NeMo Run execution pipeline. Each scheduler's `_submit_dryrun()` now writes its script to `executor.experiment_dir` when set: - LocalExecutor → .sh (executable bash) - DockerExecutor → .yaml - SkypilotExecutor / SkypilotJobsExecutor → .yaml - LeptonExecutor → .sh (executable bash) - DGXCloudExecutor → _torchrun_job.sh (was hardcoded, now uses job_name) `Experiment.export()` redirects all executor experiment_dirs to output_dir, runs dryrun to trigger script writing, then generates submit_all.sh with the correct submit command per executor type (sbatch, sky launch, etc.). Co-Authored-By: Claude Sonnet 4.6 Signed-off-by: oliver könig --- nemo_run/run/experiment.py | 78 +++++++++++++++++++ .../run/torchx_backend/schedulers/dgxcloud.py | 4 +- .../run/torchx_backend/schedulers/docker.py | 6 ++ .../run/torchx_backend/schedulers/lepton.py | 14 ++++ .../run/torchx_backend/schedulers/local.py | 16 ++++ .../run/torchx_backend/schedulers/skypilot.py | 6 ++ .../schedulers/skypilot_jobs.py | 6 ++ test/run/test_experiment.py | 69 ++++++++++++++++ 8 files changed, 197 insertions(+), 2 deletions(-) diff --git a/nemo_run/run/experiment.py b/nemo_run/run/experiment.py index 460f04f6..f998aafd 100644 --- a/nemo_run/run/experiment.py +++ b/nemo_run/run/experiment.py @@ -624,6 +624,51 @@ def dryrun(self, log: bool = True, exist_ok: bool = False, delete_exp_dir: bool if delete_exp_dir: shutil.rmtree(self._exp_dir) + def export(self, output_dir: str, exist_ok: bool = False) -> None: + """ + Export runnable scripts for all tasks to output_dir without submitting. + + Each task produces a script file in output_dir: + - SlurmExecutor → _sbatch.sh (sbatch ) + - DGXCloudExecutor → _torchrun_job.sh + - LocalExecutor → .sh (bash ) + - DockerExecutor → .yaml + - SkypilotExecutor → .yaml (sky launch ) + - SkypilotJobsExecutor → .yaml + - LeptonExecutor → .sh + + Also generates submit_all.sh. + + Args: + output_dir: Directory to write into. Created if it doesn't exist. + exist_ok: Passed to mkdir. + """ + out = Path(output_dir) + out.mkdir(parents=True, exist_ok=exist_ok) + + self._prepare(exist_ok=True) + + # Redirect all executors to write scripts into output_dir + for job in self.jobs: + if isinstance(job, JobGroup): + executors = ( + [job.executors] if isinstance(job.executors, Executor) else job.executors + ) + for executor in executors: + executor.experiment_dir = str(out) + else: + job.executor.experiment_dir = str(out) + + # Run dryrun — each scheduler writes its script file to output_dir + for job in self.jobs: + job.launch(wait=False, runner=self._runner, dryrun=True, direct=False, log_dryrun=False) + + # Generate submit_all.sh + _write_submit_script(out, self._title, self.jobs) + + self.console.log(f"[bold green]Exported scripts to {out}") + shutil.rmtree(self._exp_dir) + def run( self, sequential: bool = False, @@ -1332,6 +1377,39 @@ def _get_sorted_dirs(path: str) -> list[str]: _LOADED_MAINS = set() +def _write_submit_script(out: Path, title: str, jobs: list) -> None: + _SUBMIT_CMDS = { + "SlurmExecutor": "sbatch", + "SkypilotExecutor": "sky launch", + "SkypilotJobsExecutor": "sky jobs launch", + "DockerExecutor": "docker compose -f", + } + + lines = [ + "#!/bin/bash", + f"# Submit all jobs for experiment: {title}", + "# Generated by NeMo Run", + "", + 'SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"', + "", + ] + + for job in jobs: + job_list = job.jobs if isinstance(job, JobGroup) else [job] + for j in job_list: + executor_type = type(j.executor).__name__ + cmd = _SUBMIT_CMDS.get(executor_type, "bash") + scripts = sorted(out.glob(f"{j.id}*")) + for s in scripts: + if s.name == "submit_all.sh": + continue + lines.append(f'{cmd} "$SCRIPT_DIR/{s.name}"') + + submit = out / "submit_all.sh" + submit.write_text("\n".join(lines) + "\n") + submit.chmod(0o755) + + def maybe_load_external_main(exp_dir: str): main_file = Path(exp_dir) / "__main__.py" if main_file.exists() and main_file not in _LOADED_MAINS: diff --git a/nemo_run/run/torchx_backend/schedulers/dgxcloud.py b/nemo_run/run/torchx_backend/schedulers/dgxcloud.py index 9bc2c969..aa8b97de 100644 --- a/nemo_run/run/torchx_backend/schedulers/dgxcloud.py +++ b/nemo_run/run/torchx_backend/schedulers/dgxcloud.py @@ -120,7 +120,7 @@ def _submit_dryrun( # type: ignore ) # Write and copy sbatch script - path = os.path.join(executor.experiment_dir, "torchrun_job.sh") + path = os.path.join(executor.experiment_dir, f"{executor.job_name}_torchrun_job.sh") script = req.materialize() with open(path, "w") as f: @@ -145,7 +145,7 @@ def schedule(self, dryrun_info: AppDryRunInfo[DGXRequest]) -> str: # The DGXExecutor's launch call typically returns (job_id, handle). # We'll call it without additional parameters here. - cmd = os.path.join(executor.experiment_dir, "torchrun_job.sh") + cmd = os.path.join(executor.experiment_dir, f"{executor.job_name}_torchrun_job.sh") req.launch_cmd = ["bash", cmd] job_id, status = executor.launch(name=req.name, cmd=req.launch_cmd) if not job_id: diff --git a/nemo_run/run/torchx_backend/schedulers/docker.py b/nemo_run/run/torchx_backend/schedulers/docker.py index 4f68920c..f7733cef 100644 --- a/nemo_run/run/torchx_backend/schedulers/docker.py +++ b/nemo_run/run/torchx_backend/schedulers/docker.py @@ -99,6 +99,12 @@ def _submit_dryrun(self, app: AppDef, cfg: Executor) -> AppDryRunInfo[DockerJobR basename = Path(executor.job_dir).name app_id = make_unique(basename) req = DockerJobRequest(id=app_id, executor=executor, containers=containers) + + if executor.experiment_dir: + path = os.path.join(executor.experiment_dir, f"{executor.job_name}.yaml") + with open(path, "w") as f: + f.write(str(req)) + return AppDryRunInfo(req, repr) def schedule(self, dryrun_info: AppDryRunInfo[DockerJobRequest]) -> str: # type: ignore diff --git a/nemo_run/run/torchx_backend/schedulers/lepton.py b/nemo_run/run/torchx_backend/schedulers/lepton.py index 0b012c19..6d34f1c9 100644 --- a/nemo_run/run/torchx_backend/schedulers/lepton.py +++ b/nemo_run/run/torchx_backend/schedulers/lepton.py @@ -16,6 +16,7 @@ import json import logging import os +import shlex import shutil import tempfile from dataclasses import dataclass @@ -98,6 +99,19 @@ def _submit_dryrun( # type: ignore role = values.apply(role) cmd = [role.entrypoint] + role.args + + if executor.experiment_dir: + path = os.path.join(executor.experiment_dir, f"{executor.job_name}.sh") + lines = ["#!/bin/bash", "# Generated by NeMo Run", "# Submit via Lepton AI API", ""] + for key, val in executor.env_vars.items(): + lines.append(f"export {key}={shlex.quote(str(val))}") + for key, val in role.env.items(): + lines.append(f"export {key}={shlex.quote(str(val))}") + lines.append(" ".join(shlex.quote(p) for p in cmd)) + with open(path, "w") as f: + f.write("\n".join(lines) + "\n") + os.chmod(path, 0o755) + return AppDryRunInfo( LeptonRequest(app=app, executor=executor, cmd=cmd, name=role.name), # Minimal function to show the config, if any diff --git a/nemo_run/run/torchx_backend/schedulers/local.py b/nemo_run/run/torchx_backend/schedulers/local.py index 5e95c7dd..7c8913b2 100644 --- a/nemo_run/run/torchx_backend/schedulers/local.py +++ b/nemo_run/run/torchx_backend/schedulers/local.py @@ -16,6 +16,7 @@ import json import os import pprint +import shlex import shutil import tempfile import warnings @@ -100,6 +101,21 @@ def _submit_dryrun(self, app: AppDef, cfg: Executor) -> AppDryRunInfo[PopenReque cfg_dict = asdict(cfg) cfg_dict["log_dir"] = cfg_dict.pop("job_dir") request = self._to_popen_request(app, cfg_dict) # type: ignore + + if cfg.experiment_dir: + path = os.path.join(cfg.experiment_dir, f"{app.name}.sh") + lines = ["#!/bin/bash", "# Generated by NeMo Run", ""] + for key, val in cfg.env_vars.items(): + lines.append(f"export {key}={shlex.quote(str(val))}") + for role in app.roles: + for key, val in role.env.items(): + lines.append(f"export {key}={shlex.quote(str(val))}") + cmd_parts = [role.entrypoint] + role.args + lines.append(" ".join(shlex.quote(p) for p in cmd_parts)) + with open(path, "w") as f: + f.write("\n".join(lines) + "\n") + os.chmod(path, 0o755) + return AppDryRunInfo(request, lambda p: pprint.pformat(asdict(p), indent=2, width=80)) def schedule(self, dryrun_info: AppDryRunInfo[PopenRequest]) -> str: diff --git a/nemo_run/run/torchx_backend/schedulers/skypilot.py b/nemo_run/run/torchx_backend/schedulers/skypilot.py index 2a3f4bb6..86f08156 100644 --- a/nemo_run/run/torchx_backend/schedulers/skypilot.py +++ b/nemo_run/run/torchx_backend/schedulers/skypilot.py @@ -143,6 +143,12 @@ def _submit_dryrun( # type: ignore task = cfg.to_task(name=role.name, cmd=cmd, env_vars=role.env) req = SkypilotRequest(task=task, executor=cfg) + + if cfg.experiment_dir: + path = os.path.join(cfg.experiment_dir, f"{cfg.job_name}.yaml") + with open(path, "w") as f: + f.write(common_utils.dump_yaml_str(req.task.to_yaml_config())) + return AppDryRunInfo(req, lambda req: common_utils.dump_yaml_str(req.task.to_yaml_config())) def _validate(self, app: AppDef, scheduler: str) -> None: diff --git a/nemo_run/run/torchx_backend/schedulers/skypilot_jobs.py b/nemo_run/run/torchx_backend/schedulers/skypilot_jobs.py index d364c359..23b1440c 100644 --- a/nemo_run/run/torchx_backend/schedulers/skypilot_jobs.py +++ b/nemo_run/run/torchx_backend/schedulers/skypilot_jobs.py @@ -141,6 +141,12 @@ def _submit_dryrun( # type: ignore task = cfg.to_task(name=role.name, cmd=cmd, env_vars=role.env) req = SkypilotJobsRequest(task=task, executor=cfg) + + if cfg.experiment_dir: + path = os.path.join(cfg.experiment_dir, f"{cfg.job_name}.yaml") + with open(path, "w") as f: + f.write(common_utils.dump_yaml_str(req.task.to_yaml_config())) + return AppDryRunInfo(req, lambda req: common_utils.dump_yaml_str(req.task.to_yaml_config())) def _validate(self, app: AppDef, scheduler: str) -> None: diff --git a/test/run/test_experiment.py b/test/run/test_experiment.py index 4f160237..4e48fb6e 100644 --- a/test/run/test_experiment.py +++ b/test/run/test_experiment.py @@ -1511,3 +1511,72 @@ def to_config(self): # Should pull tunnel and connect exp._initialize_tunnels(extract_from_executors=True) assert "t1" in exp.tunnels + + +def test_experiment_export_local(temp_dir): + """export() with LocalExecutor writes a .sh script and submit_all.sh.""" + output_dir = os.path.join(temp_dir, "exported") + with Experiment("test-exp") as exp: + task = run.Partial(dummy_function, x=1, y=2) + exp.add(task, name="hello-job") + exp.export(output_dir) + + files = os.listdir(output_dir) + # At least one .sh script for the job and submit_all.sh + sh_scripts = [f for f in files if f.endswith(".sh") and f != "submit_all.sh"] + assert len(sh_scripts) >= 1, f"Expected a .sh script, got: {files}" + assert "submit_all.sh" in files + + # submit_all.sh must be executable and contain "bash" + submit_path = os.path.join(output_dir, "submit_all.sh") + assert os.access(submit_path, os.X_OK) + content = Path(submit_path).read_text() + assert "bash" in content + + # The job script must be executable + job_script_path = os.path.join(output_dir, sh_scripts[0]) + assert os.access(job_script_path, os.X_OK) + + # The exp_dir should have been cleaned up + assert not os.path.exists(exp._exp_dir) + + +def test_experiment_export_creates_output_dir(temp_dir): + """export() creates the output directory if it does not exist.""" + output_dir = os.path.join(temp_dir, "new_dir", "nested") + with Experiment("test-exp") as exp: + task = run.Partial(dummy_function, x=1, y=2) + exp.add(task, name="nested-job") + exp.export(output_dir) + + assert os.path.isdir(output_dir) + assert "submit_all.sh" in os.listdir(output_dir) + + +def test_experiment_export_exist_ok(temp_dir): + """export() with exist_ok=True does not raise if output_dir already exists.""" + output_dir = os.path.join(temp_dir, "exists") + os.makedirs(output_dir) + with Experiment("test-exp") as exp: + task = run.Partial(dummy_function, x=1, y=2) + exp.add(task, name="job") + exp.export(output_dir, exist_ok=True) + + assert "submit_all.sh" in os.listdir(output_dir) + + +def test_experiment_export_multiple_jobs(temp_dir): + """export() produces one script per job and all are referenced in submit_all.sh.""" + output_dir = os.path.join(temp_dir, "multi") + with Experiment("test-exp") as exp: + exp.add(run.Partial(dummy_function, x=1, y=2), name="job-a") + exp.add(run.Partial(dummy_function, x=3, y=4), name="job-b") + exp.export(output_dir) + + files = os.listdir(output_dir) + sh_scripts = [f for f in files if f.endswith(".sh") and f != "submit_all.sh"] + assert len(sh_scripts) == 2, f"Expected 2 scripts, got: {sh_scripts}" + + submit_content = Path(os.path.join(output_dir, "submit_all.sh")).read_text() + for script in sh_scripts: + assert script in submit_content, f"{script} not referenced in submit_all.sh" From 9a6c46c37373b9cd2c71e0a45024ee91b67129c1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?oliver=20k=C3=B6nig?= Date: Mon, 16 Mar 2026 10:35:08 +0000 Subject: [PATCH 2/7] docs: add export() e2e examples for Local, SLURM, DGXCloud, and Script tasks MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Demonstrates Experiment.export() across all common executor types: - export_local.py — single LocalExecutor job → .sh + submit_all.sh - export_multi_job.py — three-job pipeline (preprocess/train/evaluate) - export_script.py — run.Script (inline bash) tasks - export_slurm.py — two SlurmExecutor jobs → *_sbatch.sh; no cluster needed - export_dgxcloud.py — DGXCloudExecutor job → *_torchrun_job.sh; no API calls needed Co-Authored-By: Claude Sonnet 4.6 Signed-off-by: oliver könig --- local/export_dgxcloud.py | 69 ++++++++++++++++++++++++++++++++ local/export_local.py | 47 ++++++++++++++++++++++ local/export_multi_job.py | 61 ++++++++++++++++++++++++++++ local/export_script.py | 42 ++++++++++++++++++++ local/export_slurm.py | 84 +++++++++++++++++++++++++++++++++++++++ 5 files changed, 303 insertions(+) create mode 100644 local/export_dgxcloud.py create mode 100644 local/export_local.py create mode 100644 local/export_multi_job.py create mode 100644 local/export_script.py create mode 100644 local/export_slurm.py diff --git a/local/export_dgxcloud.py b/local/export_dgxcloud.py new file mode 100644 index 00000000..d1114a2d --- /dev/null +++ b/local/export_dgxcloud.py @@ -0,0 +1,69 @@ +""" +E2E example: Experiment.export() with DGXCloudExecutor + +Demonstrates exporting DGX Cloud jobs to a self-contained directory without +any API calls or authentication. The generated script can be inspected and +submitted manually via the DGX Cloud CLI or API. + +The output directory contains: + - _torchrun_job.sh (the torchrun launch script uploaded to the PVC) + - submit_all.sh (calls: bash _torchrun_job.sh for each job) + +Run: + python local/export_dgxcloud.py + cat /tmp/nemo_export_dgxcloud/train_torchrun_job.sh +""" + +import os +import shutil + +import nemo_run as run +from nemo_run.core.execution.dgxcloud import DGXCloudExecutor + +OUTPUT_DIR = "/tmp/nemo_export_dgxcloud" +shutil.rmtree(OUTPUT_DIR, ignore_errors=True) + + +def train(model: str, steps: int = 10_000): + import torch + + print(f"Training {model} on {torch.cuda.device_count()} GPUs for {steps} steps") + + +# Configure the DGX Cloud executor (credentials are placeholders — not contacted during export) +executor = DGXCloudExecutor( + base_url="https://api.ngc.nvidia.com/v2/org/my-org/dgxcloud", + kube_apiserver_url="https://my-cluster.k8s.example.com", + app_id="my-app-id", + app_secret="my-app-secret", + project_name="my-project", + container_image="nvcr.io/nvidia/nemo:latest", + pvc_nemo_run_dir="/mnt/pvc/nemo_run", + pvcs=[{"claimName": "nemo-pvc", "path": "/mnt/pvc"}], + nodes=2, + gpus_per_node=8, + packager=run.GitArchivePackager(), +) + +with run.Experiment("export-dgxcloud-demo") as exp: + exp.add( + run.Partial(train, model="mistral-7b", steps=100_000), + executor=executor, + name="train", + ) + exp.export(OUTPUT_DIR) + +files = sorted(os.listdir(OUTPUT_DIR)) +print(f"\nExported files: {files}") + +torchrun_scripts = [f for f in files if f.endswith("_torchrun_job.sh")] +assert len(torchrun_scripts) == 1, f"Expected 1 torchrun script, got: {torchrun_scripts}" + +print("\n--- train_torchrun_job.sh (first 40 lines) ---") +with open(f"{OUTPUT_DIR}/{torchrun_scripts[0]}") as f: + lines = f.readlines() + print("".join(lines[:40])) + +print("--- submit_all.sh ---") +with open(f"{OUTPUT_DIR}/submit_all.sh") as f: + print(f.read()) diff --git a/local/export_local.py b/local/export_local.py new file mode 100644 index 00000000..943c89fc --- /dev/null +++ b/local/export_local.py @@ -0,0 +1,47 @@ +""" +E2E example: Experiment.export() with LocalExecutor + +Demonstrates exporting a single job to a self-contained script directory +without submitting anything. The output directory contains: + - hello-job.sh (executable bash script) + - submit_all.sh (launcher that calls: bash hello-job.sh) + +Run: + python local/export_local.py + ls /tmp/nemo_export_local/ + cat /tmp/nemo_export_local/hello-job.sh + bash /tmp/nemo_export_local/submit_all.sh +""" + +import os +import shutil + +import nemo_run as run +from nemo_run.core.execution.local import LocalExecutor + +OUTPUT_DIR = "/tmp/nemo_export_local" +shutil.rmtree(OUTPUT_DIR, ignore_errors=True) + + +def greet(name: str, times: int = 1): + for _ in range(times): + print(f"Hello, {name}!") + + +with run.Experiment("export-local-demo") as exp: + task = run.Partial(greet, name="NeMo", times=3) + exp.add(task, executor=LocalExecutor(), name="hello-job") + exp.export(OUTPUT_DIR) + +files = sorted(os.listdir(OUTPUT_DIR)) +print(f"\nExported files: {files}") +assert "hello-job.sh" in files, "Expected hello-job.sh" +assert "submit_all.sh" in files, "Expected submit_all.sh" + +print("\n--- hello-job.sh ---") +with open(f"{OUTPUT_DIR}/hello-job.sh") as f: + print(f.read()) + +print("--- submit_all.sh ---") +with open(f"{OUTPUT_DIR}/submit_all.sh") as f: + print(f.read()) diff --git a/local/export_multi_job.py b/local/export_multi_job.py new file mode 100644 index 00000000..112911fe --- /dev/null +++ b/local/export_multi_job.py @@ -0,0 +1,61 @@ +""" +E2E example: Experiment.export() with multiple LocalExecutor jobs + +Demonstrates exporting multiple jobs to a shared output directory. +Each job produces its own .sh script; submit_all.sh chains them all. + +Run: + python local/export_multi_job.py + ls /tmp/nemo_export_multi/ + bash /tmp/nemo_export_multi/submit_all.sh +""" + +import os +import shutil + +import nemo_run as run +from nemo_run.core.execution.local import LocalExecutor + +OUTPUT_DIR = "/tmp/nemo_export_multi" +shutil.rmtree(OUTPUT_DIR, ignore_errors=True) + + +def preprocess(dataset: str, workers: int = 4): + print(f"Preprocessing {dataset} with {workers} workers") + + +def train(model: str, epochs: int = 10, lr: float = 1e-3): + print(f"Training {model} for {epochs} epochs at lr={lr}") + + +def evaluate(model: str, split: str = "test"): + print(f"Evaluating {model} on {split} split") + + +with run.Experiment("export-multi-demo") as exp: + exp.add( + run.Partial(preprocess, dataset="imagenet", workers=8), + executor=LocalExecutor(), + name="preprocess", + ) + exp.add( + run.Partial(train, model="resnet50", epochs=50, lr=5e-4), + executor=LocalExecutor(), + name="train", + ) + exp.add( + run.Partial(evaluate, model="resnet50", split="val"), + executor=LocalExecutor(), + name="evaluate", + ) + exp.export(OUTPUT_DIR) + +files = sorted(os.listdir(OUTPUT_DIR)) +print(f"\nExported files: {files}") + +sh_scripts = [f for f in files if f.endswith(".sh") and f != "submit_all.sh"] +assert len(sh_scripts) == 3, f"Expected 3 job scripts, got: {sh_scripts}" + +print("\n--- submit_all.sh ---") +with open(f"{OUTPUT_DIR}/submit_all.sh") as f: + print(f.read()) diff --git a/local/export_script.py b/local/export_script.py new file mode 100644 index 00000000..1ceaac9f --- /dev/null +++ b/local/export_script.py @@ -0,0 +1,42 @@ +""" +E2E example: Experiment.export() with run.Script tasks + +Shows that export() works with shell Script tasks (not just Partial), +which is a common pattern for SLURM-style jobs where the user provides +a raw bash script. + +The exported .sh file wraps the inline command; submit_all.sh calls +`bash