diff --git a/README.md b/README.md index 696476457..e7144e4a9 100644 --- a/README.md +++ b/README.md @@ -84,6 +84,13 @@ cloudai generate-report\ --result-dir /path/to/result_directory ``` +Generated artifacts depend on the scenario contents: + +- A plain scenario status report is written as `.html`. +- If the scenario contains DSE test cases, an additional DSE-specific report is written as `-dse-report.html`. +- For DSE runs, the best discovered test configuration is also written as `//.toml`. +- Custom reporters could generate additional artifacts. + ### install This mode installs test prerequisites. For more details, please refer to the [installation guide](https://nvidia.github.io/cloudai/workloads_requirements_installation.html). It automatically runs as part of the `run` mode if prerequisites are not met. diff --git a/pyproject.toml b/pyproject.toml index fa80670bb..4e14aa151 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -145,8 +145,8 @@ root_package = "cloudai" [[tool.importlinter.contracts]] name = "Report generator is leaf dependency" type = "forbidden" - forbidden_modules = ["cloudai.systems", "cloudai.workloads", "cloudai.cli"] - allow_indirect_imports = true # allow "from cloudai.core import ..." + forbidden_modules = ["cloudai.workloads", "cloudai.cli"] + allow_indirect_imports = true # allow "from cloudai.core import ..." source_modules = ["cloudai.report_generator"] [[tool.importlinter.contracts]] diff --git a/src/cloudai/_core/registry.py b/src/cloudai/_core/registry.py index b180b9cee..2e2adf6b7 100644 --- a/src/cloudai/_core/registry.py +++ b/src/cloudai/_core/registry.py @@ -1,5 +1,5 @@ # SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES -# Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: Apache-2.0 # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -228,7 +228,8 @@ def report_order(k: str) -> int: return { "per_test": 0, # first "status": 2, - "tarball": 3, # last + "dse": 3, + "tarball": 4, # last }.get(k, 1) return sorted(self.scenario_reports.items(), key=lambda kv: report_order(kv[0])) diff --git a/src/cloudai/registration.py b/src/cloudai/registration.py index 866baa945..c49fd0b56 100644 --- a/src/cloudai/registration.py +++ b/src/cloudai/registration.py @@ -46,7 +46,7 @@ def register_all(): ) from cloudai.core import Registry from cloudai.models.scenario import ReportConfig - from cloudai.reporter import PerTestReporter, StatusReporter, TarballReporter + from cloudai.reporter import DSEReporter, PerTestReporter, StatusReporter, TarballReporter # Import systems from cloudai.systems.kubernetes import KubernetesInstaller, KubernetesRunner, KubernetesSystem @@ -295,6 +295,7 @@ def register_all(): Registry().add_scenario_report("per_test", PerTestReporter, ReportConfig(enable=True)) Registry().add_scenario_report("status", StatusReporter, ReportConfig(enable=True)) + Registry().add_scenario_report("dse", DSEReporter, ReportConfig(enable=True)) Registry().add_scenario_report("tarball", TarballReporter, ReportConfig(enable=True)) Registry().add_scenario_report( "nixl_bench_summary", diff --git a/src/cloudai/report_generator/dse_report.py b/src/cloudai/report_generator/dse_report.py new file mode 100644 index 000000000..efb85956e --- /dev/null +++ b/src/cloudai/report_generator/dse_report.py @@ -0,0 +1,378 @@ +# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES +# Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +import ast +import contextlib +import logging +from dataclasses import dataclass +from pathlib import Path +from typing import Any + +import toml + +from cloudai.core import CommandGenStrategy, System, TestRun +from cloudai.models.scenario import TestRunDetails +from cloudai.systems.slurm import SlurmJobMetadata +from cloudai.util.lazy_imports import lazy + +from .util import load_system_metadata + +# https://gpus.io/en/gpus +# https://getdeploying.com/gpus +# https://docs.coreweave.com/platform/instances/gpu/ +GPU_HOURLY_COST_USD = { + "H100": 3.0, + "B200": 5.5, + "GB200": 11.00, + "GB300": 8.0, +} + + +@dataclass(frozen=True) +class DSEParameterValue: + """Represents DSE dimension value.""" + + text: str + is_best: bool + + +@dataclass(frozen=True) +class DSEParameterRow: + """Represents a dimension in DSE.""" + + name: str + values: list[DSEParameterValue] + + +@dataclass(frozen=True) +class DSECaseIterationSummary: + """Summary for DSE case iteration.""" + + name: str + saved_time: str + saved_gpu_hours: str + saved_usd: str + gpu_label: str + avg_step_runtime: str + observed_runtime: str + efficiency_ratio: str + efficiency_steps: str + best_config_toml: str + parameter_rows: list[DSEParameterRow] + reward_chart_data: dict[str, Any] | None + + +@dataclass(frozen=True) +class TrajectoryStep: + """Enriched trajectory step for DSE.""" + + step: int + reward: float + observation_text: str + action: dict[str, Any] + elapsed_time_sec: int | None + is_successful: bool + + +def format_duration(seconds: float | None) -> str: + if seconds is None: + return "n/a" + + seconds = max(float(seconds), 0.0) + if seconds < 60: + return f"{seconds:.1f}s" + + minutes, sec = divmod(round(seconds), 60) + hours, minutes = divmod(minutes, 60) + parts = [] + if hours: + parts.append(f"{hours}h") + if minutes: + parts.append(f"{minutes}m") + if sec or not parts: + parts.append(f"{sec}s") + return " ".join(parts) + + +def format_float(value: float | None, precision: int = 2) -> str: + if value is None: + return "n/a" + return f"{value:.{precision}f}" + + +def format_money(value: float | None) -> str: + if value is None: + return "n/a" + return f"${value:,.2f}" + + +def _safe_literal_eval(raw: Any, default: Any) -> Any: + if isinstance(raw, str): + with contextlib.suppress(SyntaxError, ValueError): + return ast.literal_eval(raw) + return default + + +def _format_scalar(value: Any) -> str: + if isinstance(value, float): + return f"{value:.4f}".rstrip("0").rstrip(".") + return str(value) + + +def _normalize_gpu_family(gpu_name: str | None) -> str | None: + if not gpu_name: + return None + upper = gpu_name.upper() + + # sorted because of `B200 in GB200 is True` + for family in sorted(GPU_HOURLY_COST_USD, key=len, reverse=True): + if family in upper: + return family + return None + + +def _step_elapsed_time(step_dir: Path) -> int | None: + slurm_job_path = step_dir / "slurm-job.toml" + if not slurm_job_path.exists(): + return None + + with slurm_job_path.open() as f: + try: + metadata = SlurmJobMetadata.model_validate(toml.load(f)) + except Exception as exc: + logging.debug(f"Error validating slurm job metadata for {slurm_job_path}: {exc}") + return None + + return metadata.elapsed_time_sec + + +def calculate_saved_gpu_hours( + system: System, + total_runtime_sec: float, + projected_runtime_sec: float, + test_run_details: TestRunDetails, +) -> float | None: + gpus_per_node = getattr(system, "gpus_per_node", None) + total_gpu_hours = ( + (total_runtime_sec / 3600.0) * test_run_details.nnodes * gpus_per_node if gpus_per_node is not None else None + ) + projected_gpu_hours = ( + (projected_runtime_sec / 3600.0) * test_run_details.nnodes * gpus_per_node + if projected_runtime_sec is not None and gpus_per_node is not None + else None + ) + return ( + max(projected_gpu_hours - total_gpu_hours, 0.0) + if projected_gpu_hours is not None and total_gpu_hours is not None + else None + ) + + +def calculate_savings(saved_gpu_hours: float | None, gpu_arch_label: str | None) -> float | None: + gpu_arch_family = _normalize_gpu_family(gpu_arch_label) + return ( + saved_gpu_hours * GPU_HOURLY_COST_USD[gpu_arch_family] + if saved_gpu_hours is not None and gpu_arch_family in GPU_HOURLY_COST_USD + else None + ) + + +def get_best_step(steps: list[TrajectoryStep]) -> TrajectoryStep | None: + successful_steps = [step for step in steps if step.is_successful] + if not successful_steps: + return None + return max(successful_steps, key=lambda step: step.reward) + + +def _build_reward_chart_data(steps: list[TrajectoryStep]) -> dict[str, Any] | None: + if not steps: + return None + + best_step = get_best_step(steps) + if best_step is None: + return None + + return { + "labels": [step.step for step in steps], + "rewards": [step.reward for step in steps], + "observations": [step.observation_text for step in steps], + "best_index": best_step.step - 1, + } + + +def _build_parameter_rows(param_space: dict[str, list[Any]], best_action: dict[str, Any]) -> list[DSEParameterRow]: + rows: list[DSEParameterRow] = [] + for name, values in param_space.items(): + best_value = _format_scalar(best_action.get(name, "n/a")) + rows.append( + DSEParameterRow( + name=name, + values=[ + DSEParameterValue( + text=_format_scalar(value), + is_best=_format_scalar(value) == best_value, + ) + for value in values + ], + ) + ) + return rows + + +def _build_trajectory_steps( + iteration_dir: Path, + test_case: TestRun, + test_runs: list[TestRun], +) -> list[TrajectoryStep] | None: + trajectory_file = iteration_dir / "trajectory.csv" + if not trajectory_file.is_file(): + logging.warning(f"No trajectory file found for {test_case.name} at {trajectory_file}") + return None + + df = lazy.pd.read_csv(trajectory_file) + if df.empty: + logging.warning(f"No trajectory data found for {test_case.name} at {trajectory_file}") + return None + + runs_by_step = {test_run.step: test_run for test_run in test_runs} + steps: list[TrajectoryStep] = [] + for row in df.to_dict(orient="records"): + step_no = int(row["step"]) + action = _safe_literal_eval(row.get("action"), {}) + if not isinstance(action, dict): + action = {} + observation = _safe_literal_eval(row.get("observation"), []) + if not isinstance(observation, list): + observation = [observation] + step_run = runs_by_step.get(step_no) + steps.append( + TrajectoryStep( + step=step_no, + reward=float(row["reward"]), + observation_text=", ".join(_format_scalar(value) for value in observation) if observation else "n/a", + action=action, + elapsed_time_sec=_step_elapsed_time(iteration_dir / str(step_no)), + is_successful=step_run.test.was_run_successful(step_run).is_successful if step_run else False, + ) + ) + + if not steps: + return None + + steps.sort(key=lambda step: step.step) + return steps + + +def _build_iteration_summary( + system: System, + results_root: Path, + test_case: TestRun, + iteration: int, + iteration_dir: Path, + test_runs: list[TestRun], +) -> DSECaseIterationSummary | None: + trajectory_steps = _build_trajectory_steps(iteration_dir, test_case, test_runs) + if not trajectory_steps: + return None + + best_step = get_best_step(trajectory_steps) + if best_step is None: + return None + + best_step_dump = iteration_dir / str(best_step.step) / CommandGenStrategy.TEST_RUN_DUMP_FILE_NAME + if not best_step_dump.exists(): + logging.warning(f"No test run dump found for best DSE step at {best_step_dump}") + return None + + with best_step_dump.open() as f: + test_run_details = TestRunDetails.model_validate(toml.load(f)) + + elapsed_times = [step.elapsed_time_sec for step in trajectory_steps if step.elapsed_time_sec is not None] + if not elapsed_times: + return None + + total_observed_runtime_sec = sum(elapsed_times) + avg_step_duration_sec = total_observed_runtime_sec / len(elapsed_times) + total_space = len(test_case.all_combinations) + projected_runtime_sec = avg_step_duration_sec * total_space + saved_runtime_sec = max(projected_runtime_sec - total_observed_runtime_sec, 0.0) + + metadata = load_system_metadata(iteration_dir / str(best_step.step), results_root) + gpu_arch_label = metadata.system.gpu_arch_type if metadata else None + saved_gpu_hours = calculate_saved_gpu_hours( + system=system, + total_runtime_sec=total_observed_runtime_sec, + projected_runtime_sec=projected_runtime_sec, + test_run_details=test_run_details, + ) + estimated_saved_cost_usd = calculate_savings(saved_gpu_hours, gpu_arch_label) + reduction_factor = total_space / len(trajectory_steps) + + return DSECaseIterationSummary( + name=f"{test_case.name}-{iteration}", + saved_time=format_duration(saved_runtime_sec), + saved_gpu_hours=format_float(saved_gpu_hours, 2), + saved_usd=format_money(estimated_saved_cost_usd), + gpu_label=gpu_arch_label or "unknown", + avg_step_runtime=format_duration(avg_step_duration_sec), + observed_runtime=format_duration(total_observed_runtime_sec), + efficiency_ratio=f"~{format_float(reduction_factor, 1)}x", + efficiency_steps=f"{len(trajectory_steps):,} / {total_space:,} steps", + best_config_toml=toml.dumps(test_run_details.test_definition.model_dump()), + parameter_rows=_build_parameter_rows(test_case.param_space, best_step.action), + reward_chart_data=_build_reward_chart_data(trajectory_steps), + ) + + +def build_dse_summaries( + system: System, + results_root: Path, + loaded_test_runs: list[TestRun], + test_cases: list[TestRun], +) -> list[DSECaseIterationSummary]: + result: list[DSECaseIterationSummary] = [] + + for test_case in test_cases: + if not test_case.is_dse_job: + continue + + case_root = results_root / test_case.name + if not case_root.is_dir(): + continue + + for iteration in range(test_case.iterations): + dse_iteration_runs = [ + tr for tr in loaded_test_runs if tr.name == test_case.name and tr.current_iteration == iteration + ] + + iteration_dir = case_root / str(iteration) + if not iteration_dir.is_dir(): + continue + + summary = _build_iteration_summary( + system=system, + results_root=results_root, + test_case=test_case, + iteration=iteration, + iteration_dir=case_root / str(iteration), + test_runs=dse_iteration_runs, + ) + if summary is not None: + result.append(summary) + + return result diff --git a/src/cloudai/report_generator/util.py b/src/cloudai/report_generator/util.py index ccb1af7d6..5a3254ca0 100644 --- a/src/cloudai/report_generator/util.py +++ b/src/cloudai/report_generator/util.py @@ -1,5 +1,5 @@ # SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES -# Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: Apache-2.0 # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -15,9 +15,14 @@ # limitations under the License. from __future__ import annotations +import logging +from pathlib import Path from typing import TYPE_CHECKING, List, Tuple +import toml + from cloudai.core import TestRun +from cloudai.systems.slurm import SlurmSystemMetadata from cloudai.util.lazy_imports import lazy if TYPE_CHECKING: @@ -178,3 +183,26 @@ def diff_test_runs(trs: list[TestRun]) -> dict[str, list[str]]: diff[key] = all_values return diff + + +def load_system_metadata(run_dir: Path, results_root: Path) -> SlurmSystemMetadata | None: + metadata_path = run_dir / "metadata" + if not metadata_path.exists(): + logging.debug(f"No metadata folder found in {run_dir=}") + fallback_metadata_path = results_root / "metadata" + if not fallback_metadata_path.exists(): + logging.debug(f"No metadata folder found in {results_root=}") + return None + metadata_path = fallback_metadata_path + + node_files = list(metadata_path.glob("node-*.toml")) + if not node_files: + logging.debug(f"No node files found in {metadata_path}") + return None + + with node_files[0].open() as f: + try: + return SlurmSystemMetadata.model_validate(toml.load(f)) + except Exception as exc: + logging.debug(f"Error validating metadata for {node_files[0]}: {exc}") + return None diff --git a/src/cloudai/reporter.py b/src/cloudai/reporter.py index 9f5b44110..a897015c3 100644 --- a/src/cloudai/reporter.py +++ b/src/cloudai/reporter.py @@ -27,33 +27,16 @@ from rich.console import Console from rich.table import Table +from cloudai.report_generator.dse_report import build_dse_summaries +from cloudai.report_generator.util import load_system_metadata from cloudai.util.lazy_imports import lazy from .core import CommandGenStrategy, Reporter, TestRun, case_name from .models.scenario import TestRunDetails -from .systems.slurm import SlurmSystem, SlurmSystemMetadata @dataclass class ReportItem: - """Basic report item for general systems.""" - - name: str - description: str - logs_path: Optional[str] = None - - @classmethod - def from_test_runs(cls, test_runs: list[TestRun], results_root: Path) -> list["ReportItem"]: - report_items: list[ReportItem] = [] - for tr in test_runs: - report_items.append(ReportItem(case_name(tr), tr.test.description)) - if tr.output_path.exists(): - report_items[-1].logs_path = f"./{tr.output_path.relative_to(results_root)}" - return report_items - - -@dataclass -class SlurmReportItem: """Enhanced report item for Slurm systems with node information.""" name: str @@ -62,38 +45,13 @@ class SlurmReportItem: nodes: Optional[str] = None @classmethod - def get_metadata(cls, run_dir: Path, results_root: Path) -> Optional[SlurmSystemMetadata]: - metadata_path = run_dir / "metadata" - if not metadata_path.exists(): - logging.debug(f"No metadata folder found in {run_dir=}") - if not (results_root / "metadata").exists(): - logging.debug(f"No metadata folder found in {results_root=}") - return None - else: # single-sbatch case - metadata_path = results_root / "metadata" - - node_files = list(metadata_path.glob("node-*.toml")) - if not node_files: - logging.debug(f"No node files found in {metadata_path}") - return None - - node_file = node_files[0] - with node_file.open() as f: - try: - return SlurmSystemMetadata.model_validate(toml.load(f)) - except Exception as e: - logging.debug(f"Error validating metadata for {node_file}: {e}") - - return None - - @classmethod - def from_test_runs(cls, test_runs: list[TestRun], results_root: Path) -> list["SlurmReportItem"]: - report_items: list[SlurmReportItem] = [] + def from_test_runs(cls, test_runs: list[TestRun], results_root: Path) -> list["ReportItem"]: + report_items: list[ReportItem] = [] for tr in test_runs: - ri = SlurmReportItem(case_name(tr), tr.test.description) + ri = ReportItem(case_name(tr), tr.test.description) if tr.output_path.exists(): ri.logs_path = f"./{tr.output_path.relative_to(results_root)}" - if metadata := cls.get_metadata(tr.output_path, results_root): + if metadata := load_system_metadata(tr.output_path, results_root): ri.nodes = metadata.slurm.node_list report_items.append(ri) @@ -131,17 +89,11 @@ def template_file_path(self) -> Path: @property def template_file(self) -> str: - if isinstance(self.system, SlurmSystem): - return "general-slurm-report.jinja2" return "general-report.jinja2" - def best_dse_config_file_name(self, tr: TestRun) -> str: - return f"{tr.name}.toml" - def generate(self) -> None: self.load_test_runs() self.generate_scenario_report() - self.report_best_dse_config() self.print_summary() def generate_scenario_report(self) -> None: @@ -149,39 +101,13 @@ def generate_scenario_report(self) -> None: self.template_file ) - report_items = ( - SlurmReportItem.from_test_runs(self.trs, self.results_root) - if isinstance(self.system, SlurmSystem) - else ReportItem.from_test_runs(self.trs, self.results_root) - ) + report_items = ReportItem.from_test_runs(self.trs, self.results_root) report = template.render(name=self.test_scenario.name, report_items=report_items) report_path = self.results_root / f"{self.test_scenario.name}.html" with report_path.open("w") as f: f.write(report) - logging.info(f"Generated scenario report at {report_path}") - - def report_best_dse_config(self): - for tr in self.test_scenario.test_runs: - if not tr.test.is_dse_job: - continue - - tr_root = self.results_root / tr.name / f"{tr.current_iteration}" - trajectory_file = tr_root / "trajectory.csv" - if not trajectory_file.exists(): - logging.warning(f"No trajectory file found for {tr.name} at {trajectory_file}") - continue - - df = lazy.pd.read_csv(trajectory_file) - best_step = df.loc[df["reward"].idxmax()]["step"] - best_step_details = tr_root / f"{best_step}" / CommandGenStrategy.TEST_RUN_DUMP_FILE_NAME - with best_step_details.open() as f: - trd = TestRunDetails.model_validate(toml.load(f)) - - best_config_path = tr_root / self.best_dse_config_file_name(tr) - logging.info(f"Writing best config for {tr.name} to {best_config_path}") - with best_config_path.open("w") as f: - toml.dump(trd.test_definition.model_dump(), f) + logging.info("Generated scenario report at %s", report_path) def print_summary(self) -> None: if not self.trs: @@ -209,6 +135,78 @@ def print_summary(self) -> None: logging.info(capture.get()) +class DSEReporter(Reporter): + """ + Generate DSE-specific scenario artifacts. + + For scenarios containing DSE test cases, this reporter produces: + + - a dedicated HTML report at `/-dse-report.html` + - one best-config TOML per DSE test case iteration at + `///.toml` + """ + + @property + def templates_dir(self) -> Path: + return Path(__file__).parent / "util" + + def generate(self) -> None: + self.load_test_runs() + + dse_cases = build_dse_summaries( + system=self.system, + results_root=self.results_root, + loaded_test_runs=self.trs, + test_cases=self.test_scenario.test_runs, + ) + + if not dse_cases: + return + + self.report_best_dse_config() + + jinja_env = jinja2.Environment(loader=jinja2.FileSystemLoader(self.templates_dir)) + template = jinja_env.get_template("dse-report.jinja2") + + report = template.render(name=self.test_scenario.name, dse_cases=dse_cases) + report_path = self.results_root / f"{self.test_scenario.name}-dse-report.html" + with report_path.open("w") as f: + f.write(report) + + logging.info(f"Generated scenario report at {report_path}") + + def report_best_dse_config(self): + """Persist the highest-reward configuration for each DSE test case iteration.""" + for tr in self.test_scenario.test_runs: + if not tr.test.is_dse_job: + continue + + tr_root = self.results_root / tr.name / f"{tr.current_iteration}" + trajectory_file = tr_root / "trajectory.csv" + if not trajectory_file.is_file(): + logging.warning("No trajectory file found for %s at %s", tr.name, trajectory_file) + continue + + df = lazy.pd.read_csv(trajectory_file) + best_step = df.loc[df["reward"].idxmax()]["step"] + best_step_details = tr_root / f"{best_step}" / CommandGenStrategy.TEST_RUN_DUMP_FILE_NAME + if not best_step_details.is_file(): + logging.warning("No best step found for %s at %s", tr.name, best_step_details) + continue + + with best_step_details.open() as f: + try: + trd = TestRunDetails.model_validate(toml.load(f)) + except Exception as exc: + logging.warning("Failed to validate test run for %s: %s", tr.name, exc, exc_info=True) + continue + + best_config_path = tr_root / f"{tr.name}.toml" + logging.info("Writing best config for %s to %s", tr.name, best_config_path) + with best_config_path.open("w") as f: + toml.dump(trd.test_definition.model_dump(), f) + + class TarballReporter(Reporter): """Creates tarballs of results for failed test runs.""" diff --git a/src/cloudai/util/base-report.jinja2 b/src/cloudai/util/base-report.jinja2 index 80cff35e2..87b28de82 100644 --- a/src/cloudai/util/base-report.jinja2 +++ b/src/cloudai/util/base-report.jinja2 @@ -3,50 +3,95 @@ {{ name }} {% block extra_head %}{% endblock %} diff --git a/src/cloudai/util/dse-report.jinja2 b/src/cloudai/util/dse-report.jinja2 new file mode 100644 index 000000000..4852dfd32 --- /dev/null +++ b/src/cloudai/util/dse-report.jinja2 @@ -0,0 +1,700 @@ +{% extends "base-report.jinja2" %} + +{% block extra_head %} +{% if dse_cases %} + +{% endif %} + +{% if dse_cases %} + +{% endif %} +{% endblock %} + +{% block content %} +{% if dse_cases %} +
+
+

DSE Cases

+

Switch between DSE test case reports in this scenario.

+
+
+
+ {% for summary in dse_cases %} + + {% endfor %} +
+ + {% for summary in dse_cases %} +
+
+
+
+

Overview

+
+ +
+
+
+
+
+
Saved Time
+
{{ summary.saved_time }}
+
+
+
Saved GPU-Hours
+
{{ summary.saved_gpu_hours }}
+
+
+
Estimated Savings
+
{{ summary.saved_usd }}
+
+
+
+
+ GPU Label + {{ summary.gpu_label }} +
+
+ Avg Step Runtime + {{ summary.avg_step_runtime }} +
+
+ Observed Runtime + {{ summary.observed_runtime }} +
+
+
+
+
+
+

Exploration Efficiency

+
+
+
+
{{ summary.efficiency_ratio }}
+
reduction in search space
+
+
{{ summary.efficiency_steps }}
+
+
+
+ +
+ +
+

Exploration Space

+ + + + + + + + + {% for row in summary.parameter_rows %} + + + + + {% endfor %} + +
ParameterValues
{{ row.name }} +
+ {% for value in row.values %} + {{ value.text }} + {% endfor %} +
+
+ {% if summary.best_config_toml %} +
+ + Best Config TOML + + + + +
{{ summary.best_config_toml }}
+
+ {% endif %} +
+ +
+

Reward Over Steps

+ {% if summary.reward_chart_data %} +
+ +
+ +

Interactive chart unavailable. Numeric report details remain available above.

+ + {% else %} +

No reward data available.

+ {% endif %} +
+
+
+ {% endfor %} +
+
+{% else %} +

No DSE results found.

+{% endif %} +{% endblock %} diff --git a/src/cloudai/util/general-report.jinja2 b/src/cloudai/util/general-report.jinja2 index 696e6dcda..7f3f88cc0 100644 --- a/src/cloudai/util/general-report.jinja2 +++ b/src/cloudai/util/general-report.jinja2 @@ -6,6 +6,7 @@ Test Description Results + Nodes {% for item in report_items %} @@ -16,6 +17,11 @@ {% else %} no logs {% endif %} + {% if item.nodes %} + {{ item.nodes }} + {% else %} + No nodes info + {% endif %} {% endfor %} diff --git a/src/cloudai/util/general-slurm-report.jinja2 b/src/cloudai/util/general-slurm-report.jinja2 deleted file mode 100644 index c37b0aa0f..000000000 --- a/src/cloudai/util/general-slurm-report.jinja2 +++ /dev/null @@ -1,24 +0,0 @@ -{% extends "base-report.jinja2" %} - -{% block content %} - - - - - - - - {% for item in report_items %} - - - - {% if item.logs_path %} - - {% else %} - - {% endif %} - - - {% endfor %} -
TestDescriptionResultsNodes
{{ item.name }}{{ item.description }}logsno logs{{ item.nodes }}
-{% endblock %} diff --git a/tests/test_init.py b/tests/test_init.py index db998c0e1..fa442e53d 100644 --- a/tests/test_init.py +++ b/tests/test_init.py @@ -16,7 +16,7 @@ from cloudai.core import Registry -from cloudai.reporter import PerTestReporter, StatusReporter, TarballReporter +from cloudai.reporter import DSEReporter, PerTestReporter, StatusReporter, TarballReporter from cloudai.systems.kubernetes import KubernetesSystem from cloudai.systems.lsf import LSFInstaller, LSFSystem from cloudai.systems.runai import RunAIInstaller, RunAISystem @@ -260,6 +260,7 @@ def test_scenario_reports(): assert list(scenario_reports.keys()) == [ "per_test", "status", + "dse", "tarball", "nixl_bench_summary", "nccl_comparison", @@ -268,6 +269,7 @@ def test_scenario_reports(): assert list(scenario_reports.values()) == [ PerTestReporter, StatusReporter, + DSEReporter, TarballReporter, NIXLBenchComparisonReport, NcclComparisonReport, @@ -280,6 +282,7 @@ def test_report_configs(): assert list(configs.keys()) == [ "per_test", "status", + "dse", "tarball", "nixl_bench_summary", "nccl_comparison", diff --git a/tests/test_reporter.py b/tests/test_reporter.py index 547c588c7..95acd8ac9 100644 --- a/tests/test_reporter.py +++ b/tests/test_reporter.py @@ -15,17 +15,21 @@ # limitations under the License. import copy +import csv import tarfile +from dataclasses import asdict from pathlib import Path +from typing import Any import pytest import toml from cloudai import TestRun, TestScenario from cloudai.cli.handlers import generate_reports -from cloudai.core import Registry, Reporter, System -from cloudai.models.scenario import ReportConfig -from cloudai.reporter import PerTestReporter, SlurmReportItem, StatusReporter, TarballReporter +from cloudai.core import CommandGenStrategy, Registry, Reporter, System +from cloudai.models.scenario import ReportConfig, TestRunDetails +from cloudai.report_generator.dse_report import build_dse_summaries +from cloudai.reporter import DSEReporter, PerTestReporter, ReportItem, StatusReporter, TarballReporter from cloudai.systems.slurm.slurm_metadata import ( MetadataCUDA, MetadataMPI, @@ -33,6 +37,8 @@ MetadataNetwork, MetadataSlurm, MetadataSystem, + SlurmJobMetadata, + SlurmStepMetadata, SlurmSystemMetadata, ) from cloudai.systems.slurm.slurm_system import SlurmSystem @@ -92,20 +98,6 @@ def test_create_tarball_preserves_full_name(tmp_path: Path, slurm_system: SlurmS assert f"{results_dir.name}/dummy.txt" in tar.getnames() -def test_best_dse_config(dse_tr: TestRun, slurm_system: SlurmSystem) -> None: - reporter = StatusReporter( - slurm_system, TestScenario(name="test_scenario", test_runs=[dse_tr]), slurm_system.output_path, ReportConfig() - ) - reporter.report_best_dse_config() - best_config_path = ( - reporter.results_root / dse_tr.name / f"{dse_tr.current_iteration}" / reporter.best_dse_config_file_name(dse_tr) - ) - assert best_config_path.exists() - nccl = NCCLTestDefinition.model_validate(toml.load(best_config_path)) - assert isinstance(nccl.cmd_args, NCCLCmdArgs) - assert nccl.agent_steps == 12 - - @pytest.mark.parametrize( "system", [ @@ -265,26 +257,61 @@ class TestSlurmReportItem: def test_no_metadata_folder(self, slurm_system: SlurmSystem) -> None: run_dir = slurm_system.output_path / "run_dir" run_dir.mkdir(parents=True, exist_ok=True) + tr = TestRun( + name="run_dir", + test=NCCLTestDefinition( + name="nccl", + description="NCCL test", + test_template_name="NcclTest", + cmd_args=NCCLCmdArgs(docker_image_url="fake://url/nccl"), + ), + num_nodes=1, + nodes=["node1"], + output_path=run_dir, + ) - meta = SlurmReportItem.get_metadata(run_dir, slurm_system.output_path) - assert meta is None + [report_item] = ReportItem.from_test_runs([tr], slurm_system.output_path) + assert report_item.nodes is None def test_no_metadata_files(self, slurm_system: SlurmSystem) -> None: run_dir = slurm_system.output_path / "run_dir" (run_dir / "metadata").mkdir(parents=True, exist_ok=True) + tr = TestRun( + name="run_dir", + test=NCCLTestDefinition( + name="nccl", + description="NCCL test", + test_template_name="NcclTest", + cmd_args=NCCLCmdArgs(docker_image_url="fake://url/nccl"), + ), + num_nodes=1, + nodes=["node1"], + output_path=run_dir, + ) - meta = SlurmReportItem.get_metadata(run_dir, slurm_system.output_path) - assert meta is None + [report_item] = ReportItem.from_test_runs([tr], slurm_system.output_path) + assert report_item.nodes is None def test_metadata_file_in_run_dir(self, slurm_system: SlurmSystem, slurm_metadata: SlurmSystemMetadata) -> None: run_dir = slurm_system.output_path / "run_dir" (run_dir / "metadata").mkdir(parents=True, exist_ok=True) with open(run_dir / "metadata" / "node-0.toml", "w") as f: toml.dump(slurm_metadata.model_dump(), f) + tr = TestRun( + name="run_dir", + test=NCCLTestDefinition( + name="nccl", + description="NCCL test", + test_template_name="NcclTest", + cmd_args=NCCLCmdArgs(docker_image_url="fake://url/nccl"), + ), + num_nodes=1, + nodes=["node1"], + output_path=run_dir, + ) - meta = SlurmReportItem.get_metadata(run_dir, slurm_system.output_path) - assert meta is not None - assert meta.slurm.node_list == slurm_metadata.slurm.node_list + [report_item] = ReportItem.from_test_runs([tr], slurm_system.output_path) + assert report_item.nodes == slurm_metadata.slurm.node_list def test_metadata_for_single_sbatch(self, slurm_system: SlurmSystem, slurm_metadata: SlurmSystemMetadata) -> None: run_dir = slurm_system.output_path / "run_dir" @@ -292,14 +319,209 @@ def test_metadata_for_single_sbatch(self, slurm_system: SlurmSystem, slurm_metad (slurm_system.output_path / "metadata").mkdir(parents=True, exist_ok=True) with open(slurm_system.output_path / "metadata" / "node-0.toml", "w") as f: toml.dump(slurm_metadata.model_dump(), f) + tr = TestRun( + name="run_dir", + test=NCCLTestDefinition( + name="nccl", + description="NCCL test", + test_template_name="NcclTest", + cmd_args=NCCLCmdArgs(docker_image_url="fake://url/nccl"), + ), + num_nodes=1, + nodes=["node1"], + output_path=run_dir, + ) - meta = SlurmReportItem.get_metadata(run_dir, slurm_system.output_path) - assert meta is not None - assert meta.slurm.node_list == slurm_metadata.slurm.node_list + [report_item] = ReportItem.from_test_runs([tr], slurm_system.output_path) + assert report_item.nodes == slurm_metadata.slurm.node_list def test_report_order() -> None: reports = Registry().ordered_scenario_reports() assert reports[0][0] == "per_test" - assert reports[-2][0] == "status" + assert reports[-3][0] == "status" + assert reports[-2][0] == "dse" assert reports[-1][0] == "tarball" + + +def _write_slurm_job(step_dir: Path, elapsed_time_sec: int) -> None: + metadata = SlurmJobMetadata( + job_id=12345, + name=step_dir.name, + state="COMPLETED", + start_time="2026-03-24T12:00:00", + end_time="2026-03-24T12:05:00", + elapsed_time_sec=elapsed_time_sec, + exit_code="0:0", + srun_cmd="srun echo test", + test_cmd="echo test", + is_single_sbatch=False, + job_root=step_dir, + job_steps=[ + SlurmStepMetadata( + job_id=12345, + step_id="0", + name=step_dir.name, + state="COMPLETED", + start_time="2026-03-24T12:00:00", + end_time="2026-03-24T12:05:00", + elapsed_time_sec=elapsed_time_sec, + exit_code="0:0", + submit_line="srun echo test", + ) + ], + ) + with (step_dir / "slurm-job.toml").open("w") as f: + toml.dump(metadata.model_dump(mode="json"), f) + + +def _write_slurm_system_metadata(step_dir: Path, slurm_metadata: SlurmSystemMetadata) -> None: + metadata_dir = step_dir / "metadata" + metadata_dir.mkdir(parents=True, exist_ok=True) + with (metadata_dir / "node-0.toml").open("w") as f: + toml.dump(slurm_metadata.model_dump(), f) + + +def _create_dse_iteration( + case: TestRun, + iteration: int, + results_root: Path, + slurm_metadata: SlurmSystemMetadata, + steps: list[dict[str, Any]], +) -> None: + iteration_dir = results_root / case.name / str(iteration) + iteration_dir.mkdir(parents=True, exist_ok=True) + + with (iteration_dir / "trajectory.csv").open("w", newline="") as f: + writer = csv.writer(f) + writer.writerow(["step", "action", "reward", "observation"]) + for step in steps: + step_no = step["step"] + writer.writerow([step_no, step["action"], step["reward"], step["observation"]]) + + step_dir = iteration_dir / str(step_no) + step_dir.mkdir(parents=True, exist_ok=True) + _write_slurm_job(step_dir, int(step["elapsed_time_sec"])) + _write_slurm_system_metadata(step_dir, slurm_metadata) + + # NCCLTestDefinition.was_run_successful + (step_dir / "stdout.txt").write_text("# Out of bounds values# Avg bus bandwidth") + + step_tr = case.apply_params_set(step["action"]) + step_tr.current_iteration = iteration + step_tr.step = step_no + step_tr.output_path = step_dir + with (step_dir / CommandGenStrategy.TEST_RUN_DUMP_FILE_NAME).open("w") as dump_file: + toml.dump(TestRunDetails.from_test_run(step_tr, "", "").model_dump(mode="json"), dump_file) + + +def test_dse_reporter( + slurm_system: SlurmSystem, + slurm_metadata: SlurmSystemMetadata, +) -> None: + slurm_metadata.system.gpu_arch_type = "NVIDIA H100 80GB HBM3" + + dse_case = TestRun( + name="dse-case", + test=NCCLTestDefinition( + name="nccl", + description="NCCL case", + test_template_name="NcclTest", + cmd_args=NCCLCmdArgs(docker_image_url="fake://url/nccl", ngpus=[1, 2]), + extra_env_vars={"VAR1": ["value1", "value2"]}, + agent_steps=3, + ), + num_nodes=1, + nodes=["node1"], + iterations=1, + ) + + steps = [ + { + "step": 0, + "action": {"ngpus": 1, "extra_env_vars.VAR1": "value1"}, + "reward": -10.0, + "observation": [10], + "elapsed_time_sec": 60, + }, + { + "step": 1, + "action": {"ngpus": 2, "extra_env_vars.VAR1": "value1"}, + "reward": -5.0, + "observation": [5], + "elapsed_time_sec": 120, + }, + { + "step": 2, + "action": {"ngpus": 2, "extra_env_vars.VAR1": "value2"}, + "reward": -7.0, + "observation": [7], + "elapsed_time_sec": 180, + }, + ] + _create_dse_iteration( + dse_case, + iteration=0, + results_root=slurm_system.output_path, + slurm_metadata=slurm_metadata, + steps=steps, + ) + + scenario = TestScenario( + name="single-dse-scenario", + test_runs=[dse_case], + ) + reporter = DSEReporter(slurm_system, scenario, slurm_system.output_path, ReportConfig()) + reporter.load_test_runs() + + summaries = build_dse_summaries( + system=slurm_system, + results_root=slurm_system.output_path, + loaded_test_runs=reporter.trs, + test_cases=scenario.test_runs, + ) + + best_tr = dse_case.apply_params_set({"ngpus": 2, "extra_env_vars.VAR1": "value1"}) + best_tr.current_iteration = 0 + best_tr.step = 1 + expected = { + "name": "dse-case-0", + "saved_time": "2m", + "saved_gpu_hours": "0.27", + "saved_usd": "$0.80", + "gpu_label": "NVIDIA H100 80GB HBM3", + "avg_step_runtime": "2m", + "observed_runtime": "6m", + "efficiency_ratio": "~1.3x", + "efficiency_steps": "3 / 4 steps", + "best_config_toml": toml.dumps(TestRunDetails.from_test_run(best_tr, "", "").test_definition.model_dump()), + "parameter_rows": [ + { + "name": "ngpus", + "values": [ + {"text": "1", "is_best": False}, + {"text": "2", "is_best": True}, + ], + }, + { + "name": "extra_env_vars.VAR1", + "values": [ + {"text": "value1", "is_best": True}, + {"text": "value2", "is_best": False}, + ], + }, + ], + "reward_chart_data": { + "labels": [0, 1, 2], + "rewards": [-10.0, -5.0, -7.0], + "observations": ["10", "5", "7"], + "best_index": 0, + }, + } + assert len(summaries) == 1 + assert asdict(summaries[0]) == expected + + reporter.generate() + + assert (slurm_system.output_path / "single-dse-scenario-dse-report.html").exists() + assert (slurm_system.output_path / dse_case.name / "0" / f"{dse_case.name}.toml").exists()