diff --git a/nemo_run/run/experiment.py b/nemo_run/run/experiment.py index 460f04f6..ac7ebb7c 100644 --- a/nemo_run/run/experiment.py +++ b/nemo_run/run/experiment.py @@ -624,6 +624,51 @@ def dryrun(self, log: bool = True, exist_ok: bool = False, delete_exp_dir: bool if delete_exp_dir: shutil.rmtree(self._exp_dir) + def export(self, output_dir: str, exist_ok: bool = False) -> None: + """ + Export runnable scripts for all tasks to output_dir without submitting. + + Each task produces a script file in output_dir: + - SlurmExecutor → _sbatch.sh (sbatch ) + - DGXCloudExecutor → _torchrun_job.sh + - LocalExecutor → .sh (bash ) + - DockerExecutor → .yaml + - SkypilotExecutor → .yaml (sky launch ) + - SkypilotJobsExecutor → .yaml + - LeptonExecutor → .sh + + Also generates submit_all.sh. + + Args: + output_dir: Directory to write into. Created if it doesn't exist. + exist_ok: Passed to mkdir. + """ + out = Path(output_dir) + out.mkdir(parents=True, exist_ok=exist_ok) + + self._prepare(exist_ok=True) + + # Redirect all executors to write scripts into output_dir + for job in self.jobs: + if isinstance(job, JobGroup): + executors = ( + [job.executors] if isinstance(job.executors, Executor) else job.executors + ) + for executor in executors: + executor.experiment_dir = str(out) + else: + job.executor.experiment_dir = str(out) + + # Run dryrun — each scheduler writes its script file to output_dir + for job in self.jobs: + job.launch(wait=False, runner=self._runner, dryrun=True, direct=False, log_dryrun=False) + + # Generate submit_all.sh + _write_submit_script(out, self._title, self.jobs) + + self.console.log(f"[bold green]Exported scripts to {out}") + shutil.rmtree(self._exp_dir) + def run( self, sequential: bool = False, @@ -1332,6 +1377,47 @@ def _get_sorted_dirs(path: str) -> list[str]: _LOADED_MAINS = set() +def _write_submit_script(out: Path, title: str, jobs: list) -> None: + _SUBMIT_CMDS = { + "SlurmExecutor": "sbatch", + "SkypilotExecutor": "sky launch", + "SkypilotJobsExecutor": "sky jobs launch", + "DockerExecutor": "docker compose -f", + } + + lines = [ + "#!/bin/bash", + f"# Submit all jobs for experiment: {title}", + "# Generated by NeMo Run", + "", + 'SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"', + "", + ] + + for job in jobs: + if isinstance(job, JobGroup): + executors = [job.executors] if isinstance(job.executors, Executor) else job.executors + executor_type = type(executors[0]).__name__ + cmd = _SUBMIT_CMDS.get(executor_type, "bash") + scripts = sorted(out.glob(f"{job.id}*")) + for s in scripts: + if s.name == "submit_all.sh": + continue + lines.append(f'{cmd} "$SCRIPT_DIR/{s.name}"') + else: + executor_type = type(job.executor).__name__ + cmd = _SUBMIT_CMDS.get(executor_type, "bash") + scripts = sorted(out.glob(f"{job.id}*")) + for s in scripts: + if s.name == "submit_all.sh": + continue + lines.append(f'{cmd} "$SCRIPT_DIR/{s.name}"') + + submit = out / "submit_all.sh" + submit.write_text("\n".join(lines) + "\n") + submit.chmod(0o700) + + def maybe_load_external_main(exp_dir: str): main_file = Path(exp_dir) / "__main__.py" if main_file.exists() and main_file not in _LOADED_MAINS: diff --git a/nemo_run/run/torchx_backend/schedulers/dgxcloud.py b/nemo_run/run/torchx_backend/schedulers/dgxcloud.py index 9bc2c969..5563f1dc 100644 --- a/nemo_run/run/torchx_backend/schedulers/dgxcloud.py +++ b/nemo_run/run/torchx_backend/schedulers/dgxcloud.py @@ -120,11 +120,11 @@ def _submit_dryrun( # type: ignore ) # Write and copy sbatch script - path = os.path.join(executor.experiment_dir, "torchrun_job.sh") script = req.materialize() - - with open(path, "w") as f: - f.write(script) + if executor.experiment_dir: + path = os.path.join(executor.experiment_dir, f"{executor.job_name}_torchrun_job.sh") + with open(path, "w") as f: + f.write(script) return AppDryRunInfo( DGXRequest(app=app, executor=executor, cmd=cmd, name=role.name), @@ -145,7 +145,7 @@ def schedule(self, dryrun_info: AppDryRunInfo[DGXRequest]) -> str: # The DGXExecutor's launch call typically returns (job_id, handle). # We'll call it without additional parameters here. - cmd = os.path.join(executor.experiment_dir, "torchrun_job.sh") + cmd = os.path.join(executor.experiment_dir, f"{executor.job_name}_torchrun_job.sh") req.launch_cmd = ["bash", cmd] job_id, status = executor.launch(name=req.name, cmd=req.launch_cmd) if not job_id: diff --git a/nemo_run/run/torchx_backend/schedulers/docker.py b/nemo_run/run/torchx_backend/schedulers/docker.py index 4f68920c..f7733cef 100644 --- a/nemo_run/run/torchx_backend/schedulers/docker.py +++ b/nemo_run/run/torchx_backend/schedulers/docker.py @@ -99,6 +99,12 @@ def _submit_dryrun(self, app: AppDef, cfg: Executor) -> AppDryRunInfo[DockerJobR basename = Path(executor.job_dir).name app_id = make_unique(basename) req = DockerJobRequest(id=app_id, executor=executor, containers=containers) + + if executor.experiment_dir: + path = os.path.join(executor.experiment_dir, f"{executor.job_name}.yaml") + with open(path, "w") as f: + f.write(str(req)) + return AppDryRunInfo(req, repr) def schedule(self, dryrun_info: AppDryRunInfo[DockerJobRequest]) -> str: # type: ignore diff --git a/nemo_run/run/torchx_backend/schedulers/lepton.py b/nemo_run/run/torchx_backend/schedulers/lepton.py index 0b012c19..1efb0529 100644 --- a/nemo_run/run/torchx_backend/schedulers/lepton.py +++ b/nemo_run/run/torchx_backend/schedulers/lepton.py @@ -16,6 +16,7 @@ import json import logging import os +import shlex import shutil import tempfile from dataclasses import dataclass @@ -98,6 +99,19 @@ def _submit_dryrun( # type: ignore role = values.apply(role) cmd = [role.entrypoint] + role.args + + if executor.experiment_dir: + path = os.path.join(executor.experiment_dir, f"{executor.job_name}.sh") + lines = ["#!/bin/bash", "# Generated by NeMo Run", "# Submit via Lepton AI API", ""] + for key, val in executor.env_vars.items(): + lines.append(f"export {key}={shlex.quote(str(val))}") + for key, val in role.env.items(): + lines.append(f"export {key}={shlex.quote(str(val))}") + lines.append(" ".join(shlex.quote(p) for p in cmd)) + with open(path, "w") as f: + f.write("\n".join(lines) + "\n") + os.chmod(path, 0o700) + return AppDryRunInfo( LeptonRequest(app=app, executor=executor, cmd=cmd, name=role.name), # Minimal function to show the config, if any diff --git a/nemo_run/run/torchx_backend/schedulers/local.py b/nemo_run/run/torchx_backend/schedulers/local.py index 5e95c7dd..444a7bcf 100644 --- a/nemo_run/run/torchx_backend/schedulers/local.py +++ b/nemo_run/run/torchx_backend/schedulers/local.py @@ -16,6 +16,7 @@ import json import os import pprint +import shlex import shutil import tempfile import warnings @@ -100,6 +101,21 @@ def _submit_dryrun(self, app: AppDef, cfg: Executor) -> AppDryRunInfo[PopenReque cfg_dict = asdict(cfg) cfg_dict["log_dir"] = cfg_dict.pop("job_dir") request = self._to_popen_request(app, cfg_dict) # type: ignore + + if cfg.experiment_dir: + path = os.path.join(cfg.experiment_dir, f"{app.name}.sh") + lines = ["#!/bin/bash", "# Generated by NeMo Run", ""] + for key, val in cfg.env_vars.items(): + lines.append(f"export {key}={shlex.quote(str(val))}") + for role in app.roles: + for key, val in role.env.items(): + lines.append(f"export {key}={shlex.quote(str(val))}") + cmd_parts = [role.entrypoint] + role.args + lines.append(" ".join(shlex.quote(p) for p in cmd_parts)) + with open(path, "w") as f: + f.write("\n".join(lines) + "\n") + os.chmod(path, 0o700) + return AppDryRunInfo(request, lambda p: pprint.pformat(asdict(p), indent=2, width=80)) def schedule(self, dryrun_info: AppDryRunInfo[PopenRequest]) -> str: diff --git a/nemo_run/run/torchx_backend/schedulers/skypilot.py b/nemo_run/run/torchx_backend/schedulers/skypilot.py index 2a3f4bb6..86f08156 100644 --- a/nemo_run/run/torchx_backend/schedulers/skypilot.py +++ b/nemo_run/run/torchx_backend/schedulers/skypilot.py @@ -143,6 +143,12 @@ def _submit_dryrun( # type: ignore task = cfg.to_task(name=role.name, cmd=cmd, env_vars=role.env) req = SkypilotRequest(task=task, executor=cfg) + + if cfg.experiment_dir: + path = os.path.join(cfg.experiment_dir, f"{cfg.job_name}.yaml") + with open(path, "w") as f: + f.write(common_utils.dump_yaml_str(req.task.to_yaml_config())) + return AppDryRunInfo(req, lambda req: common_utils.dump_yaml_str(req.task.to_yaml_config())) def _validate(self, app: AppDef, scheduler: str) -> None: diff --git a/nemo_run/run/torchx_backend/schedulers/skypilot_jobs.py b/nemo_run/run/torchx_backend/schedulers/skypilot_jobs.py index d364c359..23b1440c 100644 --- a/nemo_run/run/torchx_backend/schedulers/skypilot_jobs.py +++ b/nemo_run/run/torchx_backend/schedulers/skypilot_jobs.py @@ -141,6 +141,12 @@ def _submit_dryrun( # type: ignore task = cfg.to_task(name=role.name, cmd=cmd, env_vars=role.env) req = SkypilotJobsRequest(task=task, executor=cfg) + + if cfg.experiment_dir: + path = os.path.join(cfg.experiment_dir, f"{cfg.job_name}.yaml") + with open(path, "w") as f: + f.write(common_utils.dump_yaml_str(req.task.to_yaml_config())) + return AppDryRunInfo(req, lambda req: common_utils.dump_yaml_str(req.task.to_yaml_config())) def _validate(self, app: AppDef, scheduler: str) -> None: diff --git a/test/run/test_experiment.py b/test/run/test_experiment.py index 4f160237..a9207b8b 100644 --- a/test/run/test_experiment.py +++ b/test/run/test_experiment.py @@ -1511,3 +1511,95 @@ def to_config(self): # Should pull tunnel and connect exp._initialize_tunnels(extract_from_executors=True) assert "t1" in exp.tunnels + + +def test_experiment_export_local(temp_dir): + """export() with LocalExecutor writes a .sh script and submit_all.sh.""" + output_dir = os.path.join(temp_dir, "exported") + with Experiment("test-exp") as exp: + task = run.Partial(dummy_function, x=1, y=2) + exp.add(task, name="hello-job") + exp.export(output_dir) + + files = os.listdir(output_dir) + # At least one .sh script for the job and submit_all.sh + sh_scripts = [f for f in files if f.endswith(".sh") and f != "submit_all.sh"] + assert len(sh_scripts) >= 1, f"Expected a .sh script, got: {files}" + assert "submit_all.sh" in files + + # submit_all.sh must be executable and contain "bash" + submit_path = os.path.join(output_dir, "submit_all.sh") + assert os.access(submit_path, os.X_OK) + content = Path(submit_path).read_text() + assert "bash" in content + + # The job script must be executable + job_script_path = os.path.join(output_dir, sh_scripts[0]) + assert os.access(job_script_path, os.X_OK) + + # The exp_dir should have been cleaned up + assert not os.path.exists(exp._exp_dir) + + +def test_experiment_export_creates_output_dir(temp_dir): + """export() creates the output directory if it does not exist.""" + output_dir = os.path.join(temp_dir, "new_dir", "nested") + with Experiment("test-exp") as exp: + task = run.Partial(dummy_function, x=1, y=2) + exp.add(task, name="nested-job") + exp.export(output_dir) + + assert os.path.isdir(output_dir) + assert "submit_all.sh" in os.listdir(output_dir) + + +def test_experiment_export_exist_ok(temp_dir): + """export() with exist_ok=True does not raise if output_dir already exists.""" + output_dir = os.path.join(temp_dir, "exists") + os.makedirs(output_dir) + with Experiment("test-exp") as exp: + task = run.Partial(dummy_function, x=1, y=2) + exp.add(task, name="job") + exp.export(output_dir, exist_ok=True) + + assert "submit_all.sh" in os.listdir(output_dir) + + +def test_experiment_export_multiple_jobs(temp_dir): + """export() produces one script per job and all are referenced in submit_all.sh.""" + output_dir = os.path.join(temp_dir, "multi") + with Experiment("test-exp") as exp: + exp.add(run.Partial(dummy_function, x=1, y=2), name="job-a") + exp.add(run.Partial(dummy_function, x=3, y=4), name="job-b") + exp.export(output_dir) + + files = os.listdir(output_dir) + sh_scripts = [f for f in files if f.endswith(".sh") and f != "submit_all.sh"] + assert len(sh_scripts) == 2, f"Expected 2 scripts, got: {sh_scripts}" + + submit_content = Path(os.path.join(output_dir, "submit_all.sh")).read_text() + for script in sh_scripts: + assert script in submit_content, f"{script} not referenced in submit_all.sh" + + +def test_experiment_export_job_group(temp_dir): + """export() with a JobGroup redirects all executors and writes scripts.""" + output_dir = os.path.join(temp_dir, "group_export") + + with patch( + "nemo_run.run.job.JobGroup.SUPPORTED_EXECUTORS", new_callable=PropertyMock + ) as mock_supported: + mock_supported.return_value = {LocalExecutor} + + with Experiment("test-exp") as exp: + from typing import Sequence + + tasks: Sequence[run.Partial] = [ + run.Partial(dummy_function, x=1, y=2), + run.Partial(dummy_function, x=3, y=4), + ] + exp.add(tasks, name="group-job") # type: ignore + exp.export(output_dir) + + files = os.listdir(output_dir) + assert "submit_all.sh" in files diff --git a/test/run/torchx_backend/schedulers/test_dgxcloud.py b/test/run/torchx_backend/schedulers/test_dgxcloud.py index 767c2106..d7427f61 100644 --- a/test/run/torchx_backend/schedulers/test_dgxcloud.py +++ b/test/run/torchx_backend/schedulers/test_dgxcloud.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import os import tempfile from unittest import mock from unittest.mock import MagicMock @@ -71,6 +72,16 @@ def test_submit_dryrun(dgx_cloud_scheduler, mock_app_def, dgx_cloud_executor): assert dryrun_info.request is not None +def test_submit_dryrun_writes_script(dgx_cloud_scheduler, mock_app_def, dgx_cloud_executor): + with tempfile.TemporaryDirectory() as exp_dir: + dgx_cloud_executor.job_name = "test-job" + dgx_cloud_executor.experiment_dir = exp_dir + with mock.patch.object(DGXCloudExecutor, "package"): + dgx_cloud_scheduler._submit_dryrun(mock_app_def, dgx_cloud_executor) + script = os.path.join(exp_dir, "test-job_torchrun_job.sh") + assert os.path.isfile(script) + + def test_dgx_cloud_scheduler_methods(dgx_cloud_scheduler): # Test that basic methods exist assert hasattr(dgx_cloud_scheduler, "_submit_dryrun") diff --git a/test/run/torchx_backend/schedulers/test_docker.py b/test/run/torchx_backend/schedulers/test_docker.py index 551d8a60..10c63a4c 100644 --- a/test/run/torchx_backend/schedulers/test_docker.py +++ b/test/run/torchx_backend/schedulers/test_docker.py @@ -67,6 +67,16 @@ def test_submit_dryrun(docker_scheduler, mock_app_def, docker_executor): assert dryrun_info.request is not None +def test_submit_dryrun_writes_yaml(docker_scheduler, mock_app_def, docker_executor): + with tempfile.TemporaryDirectory() as exp_dir: + docker_executor.job_name = "test-job" + docker_executor.experiment_dir = exp_dir + with mock.patch.object(DockerExecutor, "package"): + docker_scheduler._submit_dryrun(mock_app_def, docker_executor) + yaml_file = os.path.join(exp_dir, "test-job.yaml") + assert os.path.isfile(yaml_file) + + def test_check_docker_version_success(): with mock.patch("subprocess.check_output") as mock_check_output: mock_check_output.return_value = b"Docker version 20.10.0, build abcdef\n" diff --git a/test/run/torchx_backend/schedulers/test_lepton.py b/test/run/torchx_backend/schedulers/test_lepton.py new file mode 100644 index 00000000..bdd826f7 --- /dev/null +++ b/test/run/torchx_backend/schedulers/test_lepton.py @@ -0,0 +1,94 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import tempfile +from unittest import mock + +import pytest +from torchx.schedulers.api import AppDryRunInfo +from torchx.specs import AppDef, Role + +from nemo_run.core.execution.lepton import LeptonExecutor +from nemo_run.run.torchx_backend.schedulers.lepton import ( + LeptonScheduler, + create_scheduler, +) + + +@pytest.fixture +def mock_app_def(): + return AppDef(name="test_app", roles=[Role(name="test_role", image="")]) + + +@pytest.fixture +def lepton_executor(): + return LeptonExecutor( + container_image="nvcr.io/nvidia/test:latest", + nemo_run_dir="/workspace/nemo_run", + job_dir=tempfile.mkdtemp(), + ) + + +@pytest.fixture +def lepton_scheduler(): + return create_scheduler(session_name="test_session") + + +def test_create_scheduler(): + scheduler = create_scheduler(session_name="test_session") + assert isinstance(scheduler, LeptonScheduler) + assert scheduler.session_name == "test_session" + + +def test_lepton_scheduler_methods(lepton_scheduler): + assert hasattr(lepton_scheduler, "_submit_dryrun") + assert hasattr(lepton_scheduler, "schedule") + assert hasattr(lepton_scheduler, "describe") + assert hasattr(lepton_scheduler, "_cancel_existing") + assert hasattr(lepton_scheduler, "_validate") + + +def test_submit_dryrun(lepton_scheduler, mock_app_def, lepton_executor): + with mock.patch.object(LeptonExecutor, "package") as mock_package: + mock_package.return_value = None + + dryrun_info = lepton_scheduler._submit_dryrun(mock_app_def, lepton_executor) + assert isinstance(dryrun_info, AppDryRunInfo) + assert dryrun_info.request is not None + + +def test_submit_dryrun_writes_script(lepton_scheduler, mock_app_def, lepton_executor): + with tempfile.TemporaryDirectory() as exp_dir: + lepton_executor.job_name = "test-job" + lepton_executor.experiment_dir = exp_dir + with mock.patch.object(LeptonExecutor, "package"): + lepton_scheduler._submit_dryrun(mock_app_def, lepton_executor) + script = os.path.join(exp_dir, "test-job.sh") + assert os.path.isfile(script) + with open(script) as f: + content = f.read() + assert "#!/bin/bash" in content + + +def test_submit_dryrun_no_file_without_experiment_dir( + lepton_scheduler, mock_app_def, lepton_executor +): + with tempfile.TemporaryDirectory() as exp_dir: + # experiment_dir is NOT set + with mock.patch.object(LeptonExecutor, "package"): + lepton_scheduler._submit_dryrun(mock_app_def, lepton_executor) + # No script should have been written + assert len(os.listdir(exp_dir)) == 0 diff --git a/test/run/torchx_backend/schedulers/test_local.py b/test/run/torchx_backend/schedulers/test_local.py index 5220d9aa..91d55346 100644 --- a/test/run/torchx_backend/schedulers/test_local.py +++ b/test/run/torchx_backend/schedulers/test_local.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import os import tempfile from unittest import mock @@ -59,6 +60,18 @@ def test_submit_dryrun(local_scheduler, mock_app_def, local_executor): # assert callable(dryrun_info.fmt) +def test_submit_dryrun_writes_script(local_scheduler, mock_app_def, local_executor): + with tempfile.TemporaryDirectory() as exp_dir: + local_executor.experiment_dir = exp_dir + local_scheduler._submit_dryrun(mock_app_def, local_executor) + script = os.path.join(exp_dir, f"{mock_app_def.name}.sh") + assert os.path.isfile(script) + with open(script) as f: + content = f.read() + assert "#!/bin/bash" in content + assert oct(os.stat(script).st_mode)[-3:] == "700" + + @mock.patch("nemo_run.run.torchx_backend.schedulers.local._save_job_dir") def test_schedule(mock_save, local_scheduler, mock_app_def, local_executor): dryrun_info = local_scheduler._submit_dryrun(mock_app_def, local_executor) diff --git a/test/run/torchx_backend/schedulers/test_skypilot.py b/test/run/torchx_backend/schedulers/test_skypilot.py index d5fc751e..7f24caea 100644 --- a/test/run/torchx_backend/schedulers/test_skypilot.py +++ b/test/run/torchx_backend/schedulers/test_skypilot.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import os import tempfile from unittest import mock @@ -70,6 +71,16 @@ def test_submit_dryrun(skypilot_scheduler, mock_app_def, skypilot_executor): assert dryrun_info.request is not None +def test_submit_dryrun_writes_yaml(skypilot_scheduler, mock_app_def, skypilot_executor): + with tempfile.TemporaryDirectory() as exp_dir: + skypilot_executor.job_name = "test-job" + skypilot_executor.experiment_dir = exp_dir + with mock.patch.object(SkypilotExecutor, "package"): + skypilot_scheduler._submit_dryrun(mock_app_def, skypilot_executor) + yaml_file = os.path.join(exp_dir, "test-job.yaml") + assert os.path.isfile(yaml_file) + + def test_schedule(skypilot_scheduler, mock_app_def, skypilot_executor): class MockHandle: def get_cluster_name(self): diff --git a/test/run/torchx_backend/schedulers/test_skypilot_jobs.py b/test/run/torchx_backend/schedulers/test_skypilot_jobs.py index 46c6d75e..70ba27f9 100644 --- a/test/run/torchx_backend/schedulers/test_skypilot_jobs.py +++ b/test/run/torchx_backend/schedulers/test_skypilot_jobs.py @@ -73,6 +73,16 @@ def test_submit_dryrun(skypilot_jobs_scheduler, mock_app_def, skypilot_jobs_exec assert dryrun_info.request is not None +def test_submit_dryrun_writes_yaml(skypilot_jobs_scheduler, mock_app_def, skypilot_jobs_executor): + with tempfile.TemporaryDirectory() as exp_dir: + skypilot_jobs_executor.job_name = "test-job" + skypilot_jobs_executor.experiment_dir = exp_dir + with mock.patch.object(SkypilotJobsExecutor, "package"): + skypilot_jobs_scheduler._submit_dryrun(mock_app_def, skypilot_jobs_executor) + yaml_file = os.path.join(exp_dir, "test-job.yaml") + assert os.path.isfile(yaml_file) + + def test_schedule(skypilot_jobs_scheduler, mock_app_def, skypilot_jobs_executor): class MockHandle: def get_cluster_name(self):