diff --git a/src/cloudai/systems/kubernetes/kubernetes_system.py b/src/cloudai/systems/kubernetes/kubernetes_system.py index 5cd1569b7..ceb0b9e28 100644 --- a/src/cloudai/systems/kubernetes/kubernetes_system.py +++ b/src/cloudai/systems/kubernetes/kubernetes_system.py @@ -16,6 +16,7 @@ from __future__ import annotations +import json import logging import subprocess import time @@ -39,6 +40,7 @@ class KubernetesSystem(System): scheduler: str = "kubernetes" monitor_interval: int = 1 gpus_per_node: int = 1 + use_host_network: bool | None = None _core_v1: Optional[k8s.client.CoreV1Api] = None _batch_v1: Optional[k8s.client.BatchV1Api] = None _custom_objects_api: Optional[k8s.client.CustomObjectsApi] = None @@ -131,6 +133,45 @@ def update(self) -> None: """ pass + def get_network_attachment_definitions(self) -> list[str]: + """Return all NetworkAttachmentDefinitions in the cluster as 'namespace/name' strings.""" + cmd = ["kubectl", "get", "network-attachment-definitions", "--all-namespaces", "-o", "json"] + try: + result = subprocess.run(cmd, capture_output=True, text=True, check=True) + except subprocess.CalledProcessError as e: + logging.debug("Failed to list NetworkAttachmentDefinitions: %s", e.stderr) + return [] + + try: + data = json.loads(result.stdout) + except json.JSONDecodeError as e: + logging.debug("Failed to parse NetworkAttachmentDefinitions output: %s", e) + return [] + + return [f"{item['metadata']['namespace']}/{item['metadata']['name']}" for item in data.get("items", [])] + + def resolve_cni_networks(self) -> list[str] | None: + """ + Determine which CNI networks to use, or None if host networking should be used. + + - use_host_network=True: always use hostNetwork, skip CNI discovery. + - use_host_network=False: require CNI; raise if none are found. + - use_host_network=None: try CNI, fall back to hostNetwork if none found. + """ + if self.use_host_network is True: + return None + + if networks := self.get_network_attachment_definitions(): + return networks + + if self.use_host_network is False: + raise RuntimeError( + "use_host_network=False but no NetworkAttachmentDefinitions were found in the cluster. " + "Ensure the CNI operator is installed and net-attach-defs are configured." + ) + + return None # auto mode: fall back to hostNetwork + def is_job_running(self, job: BaseJob) -> bool: k_job: KubernetesJob = cast(KubernetesJob, job) return self._is_job_running(k_job) diff --git a/src/cloudai/workloads/nccl_test/kubernetes_json_gen_strategy.py b/src/cloudai/workloads/nccl_test/kubernetes_json_gen_strategy.py index 5d051cd62..b829f199d 100644 --- a/src/cloudai/workloads/nccl_test/kubernetes_json_gen_strategy.py +++ b/src/cloudai/workloads/nccl_test/kubernetes_json_gen_strategy.py @@ -38,6 +38,7 @@ def ssh_port(self) -> int: def gen_json(self) -> dict[Any, Any]: k8s_system = cast(KubernetesSystem, self.system) job_name = self.sanitize_k8s_job_name(self.test_run.name) + cni_networks = k8s_system.resolve_cni_networks() deployment = { "apiVersion": "kubeflow.org/v2beta1", @@ -50,8 +51,8 @@ def gen_json(self) -> dict[Any, Any]: "slotsPerWorker": k8s_system.gpus_per_node, "runPolicy": {"cleanPodPolicy": "Running"}, "mpiReplicaSpecs": { - "Launcher": self._create_launcher_spec(), - "Worker": self._create_worker_spec(), + "Launcher": self._create_launcher_spec(cni_networks), + "Worker": self._create_worker_spec(cni_networks), }, }, } @@ -66,56 +67,53 @@ def container_url(self) -> str: tdef: NCCLTestDefinition = cast(NCCLTestDefinition, self.test_run.test) return tdef.cmd_args.docker_image_url.replace("#", "/") - def _create_launcher_spec(self) -> dict[str, Any]: + def _create_launcher_spec(self, cni_networks: list[str] | None) -> dict[str, Any]: env_vars = self._get_merged_env_vars() - return { - "replicas": 1, - "template": { - "spec": { - "hostNetwork": True, - "containers": [ - { - "image": self.container_url, - "name": "nccl-test-launcher", - "imagePullPolicy": "IfNotPresent", - "securityContext": {"privileged": True}, - "env": self._generate_env_list(env_vars), - "command": ["/bin/bash", "-c"], - "args": [self._generate_launcher_command()], - "resources": self._prepare_launcher_resources(), - } - ], - }, - }, + spec: dict[str, Any] = { + "containers": [ + { + "image": self.container_url, + "name": "nccl-test-launcher", + "imagePullPolicy": "IfNotPresent", + "securityContext": {"privileged": True}, + "env": self._generate_env_list(env_vars), + "command": ["/bin/bash", "-c"], + "args": [self._generate_launcher_command()], + "resources": self._prepare_launcher_resources(), + } + ], } + if not cni_networks: + spec["hostNetwork"] = True + return {"replicas": 1, "template": {"spec": spec}} - def _create_worker_spec(self) -> dict[str, Any]: + def _create_worker_spec(self, cni_networks: list[str] | None) -> dict[str, Any]: env_vars = self._get_merged_env_vars() - return { - "replicas": self.test_run.nnodes, - "template": { - "spec": { - "hostNetwork": True, - "containers": [ - { - "image": self.container_url, - "name": "nccl-test-worker", - "ports": [{"containerPort": self.ssh_port, "name": "ssh"}], - "imagePullPolicy": "IfNotPresent", - "securityContext": {"privileged": True}, - "env": self._generate_env_list(env_vars), - "command": ["/bin/bash", "-c"], - "args": [self._generate_worker_command()], - "resources": self._prepare_worker_resources(), - "volumeMounts": [ - {"mountPath": "/dev/shm", "name": "dev-shm"}, - ], - } + spec: dict[str, Any] = { + "containers": [ + { + "image": self.container_url, + "name": "nccl-test-worker", + "ports": [{"containerPort": self.ssh_port, "name": "ssh"}], + "imagePullPolicy": "IfNotPresent", + "securityContext": {"privileged": True}, + "env": self._generate_env_list(env_vars), + "command": ["/bin/bash", "-c"], + "args": [self._generate_worker_command()], + "resources": self._prepare_worker_resources(cni_networks), + "volumeMounts": [ + {"mountPath": "/dev/shm", "name": "dev-shm"}, ], - "volumes": [{"name": "dev-shm", "emptyDir": {"medium": "Memory", "sizeLimit": "1Gi"}}], - }, - }, + } + ], + "volumes": [{"name": "dev-shm", "emptyDir": {"medium": "Memory", "sizeLimit": "1Gi"}}], } + if not cni_networks: + spec["hostNetwork"] = True + template: dict[str, Any] = {"spec": spec} + if cni_networks: + template["metadata"] = {"annotations": {"k8s.v1.cni.cncf.io/networks": ",".join(cni_networks)}} + return {"replicas": self.test_run.nnodes, "template": template} def _generate_worker_command(self) -> str: """ @@ -209,7 +207,16 @@ def _prepare_launcher_resources(self) -> Dict[str, Dict[str, str]]: "limits": {"cpu": "2", "memory": "8Gi"}, } - def _prepare_worker_resources(self) -> Dict[str, Dict[str, str]]: + def _prepare_worker_resources(self, cni_networks: list[str] | None) -> Dict[str, Dict[str, str]]: k8s_system = cast(KubernetesSystem, self.system) gpu_count = str(k8s_system.gpus_per_node) - return {"requests": {"nvidia.com/gpu": gpu_count}, "limits": {"nvidia.com/gpu": gpu_count}} + resources: Dict[str, Dict[str, str]] = { + "requests": {"nvidia.com/gpu": gpu_count}, + "limits": {"nvidia.com/gpu": gpu_count}, + } + if cni_networks: + for net in cni_networks: + nic_name = net.split("/", 1)[1] + resources["requests"][f"nvidia.com/{nic_name}"] = "1" + resources["limits"][f"nvidia.com/{nic_name}"] = "1" + return resources diff --git a/tests/conftest.py b/tests/conftest.py index 06a693366..01eb238f7 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -92,6 +92,7 @@ def k8s_system(tmp_path: Path) -> KubernetesSystem: global_env_vars={}, monitor_interval=1, gpus_per_node=8, + use_host_network=True, default_namespace="test-namespace", kube_config_path=kube_config, ) diff --git a/tests/systems/kubernetes/test_system.py b/tests/systems/kubernetes/test_system.py index 37879604b..ca3111c22 100644 --- a/tests/systems/kubernetes/test_system.py +++ b/tests/systems/kubernetes/test_system.py @@ -14,8 +14,10 @@ # See the License for the specific language governing permissions and # limitations under the License. +import json +import subprocess from pathlib import Path -from typing import Dict, List +from typing import ClassVar, Dict, List from unittest.mock import MagicMock, Mock, patch import pytest @@ -188,3 +190,74 @@ def test_delete_job(k8s_system: KubernetesSystem, job_kind: str): assert k8s_system._delete_dynamo_graph_deployment.called else: raise AssertionError(f"Unhandled job kind in test: {job_kind}") + + +def _net_attach_def_json(*pairs: tuple[str, str]) -> str: + items = [{"metadata": {"namespace": ns, "name": n}} for ns, n in pairs] + return json.dumps({"items": items}) + + +class TestGetNetworkAttachmentDefinitions: + @pytest.mark.parametrize( + "pairs, expected", + [ + ([], []), + ([("default", "rail-0"), ("default", "rail-1")], ["default/rail-0", "default/rail-1"]), + ( + [("default", "nic0-rail0-plane0"), ("kube-system", "nic1-rail1-plane0")], + ["default/nic0-rail0-plane0", "kube-system/nic1-rail1-plane0"], + ), + ], + ids=["empty", "same-namespace", "multiple-namespaces"], + ) + def test_success(self, k8s_system: KubernetesSystem, pairs: list, expected: list) -> None: + with patch("subprocess.run") as mock_run: + mock_run.return_value = MagicMock(stdout=_net_attach_def_json(*pairs), returncode=0) + assert k8s_system.get_network_attachment_definitions() == expected + + @pytest.mark.parametrize( + "side_effect, stdout", + [ + (subprocess.CalledProcessError(1, "kubectl", stderr="No CRD found"), None), + (None, "not valid json"), + ], + ids=["kubectl-failure", "invalid-json"], + ) + def test_returns_empty_on_error( + self, k8s_system: KubernetesSystem, side_effect: Exception | None, stdout: str | None + ) -> None: + with patch("subprocess.run") as mock_run: + if side_effect: + mock_run.side_effect = side_effect + else: + mock_run.return_value = MagicMock(stdout=stdout, returncode=0) + assert k8s_system.get_network_attachment_definitions() == [] + + +class TestResolveCniNetworks: + NETS: ClassVar[list[str]] = ["default/rail-0", "default/rail-1"] + + @pytest.mark.parametrize( + "use_host_network, discovered, expected", + [ + (True, NETS, None), # forced host network — ignores discovered nets + (None, NETS, NETS), # auto — uses CNI when available + (None, [], None), # auto — falls back to host network when none found + (False, NETS, NETS), # require CNI — uses discovered nets + ], + ids=["forced-host", "auto-cni", "auto-fallback", "require-cni"], + ) + def test_resolve( + self, k8s_system: KubernetesSystem, use_host_network: bool | None, discovered: list, expected: list | None + ) -> None: + k8s_system.use_host_network = use_host_network + with patch.object(KubernetesSystem, "get_network_attachment_definitions", return_value=discovered): + assert k8s_system.resolve_cni_networks() == expected + + def test_require_cni_raises_when_none_found(self, k8s_system: KubernetesSystem) -> None: + k8s_system.use_host_network = False + with ( + patch.object(KubernetesSystem, "get_network_attachment_definitions", return_value=[]), + pytest.raises(RuntimeError, match="NetworkAttachmentDefinitions"), + ): + k8s_system.resolve_cni_networks() diff --git a/tests/workloads/nccl_test/test_json_gen_strategy_kubernetes.py b/tests/workloads/nccl_test/test_json_gen_strategy_kubernetes.py index e61d54320..705e63250 100644 --- a/tests/workloads/nccl_test/test_json_gen_strategy_kubernetes.py +++ b/tests/workloads/nccl_test/test_json_gen_strategy_kubernetes.py @@ -15,6 +15,9 @@ # limitations under the License. +from typing import ClassVar +from unittest.mock import patch + import pytest from cloudai.core import TestRun @@ -130,3 +133,53 @@ def test_launcher_command_generation(self, test_run: TestRun, k8s_system: Kubern assert cmd_args.subtest_name in launcher_args assert f"--nthreads {cmd_args.nthreads}" in launcher_args assert f"--ngpus {cmd_args.ngpus}" in launcher_args + + +class TestNcclCniNetworking: + CNI_NETS: ClassVar[list[str]] = ["default/nic0-rail0-plane0", "default/nic0-rail0-plane1"] + + @pytest.fixture + def test_run(self, k8s_system: KubernetesSystem) -> TestRun: + cmd_args = NCCLCmdArgs.model_validate({"subtest_name": "all_reduce_perf", "docker_image_url": "img"}) + nccl = NCCLTestDefinition(name="n", description="d", test_template_name="NcclTest", cmd_args=cmd_args) + tr = TestRun(name="t", test=nccl, nodes=["n1", "n2"], num_nodes=2, output_path=k8s_system.output_path / "t") + tr.output_path.mkdir(parents=True, exist_ok=True) + return tr + + def gen_json(self, k8s_system: KubernetesSystem, test_run: TestRun, cni_nets: list[str] | None) -> dict: + with patch.object(KubernetesSystem, "resolve_cni_networks", return_value=cni_nets): + return NcclTestKubernetesJsonGenStrategy(k8s_system, test_run).gen_json() + + def test_cni_mode_worker_has_annotations(self, k8s_system: KubernetesSystem, test_run: TestRun) -> None: + payload = self.gen_json(k8s_system, test_run, self.CNI_NETS) + worker = payload["spec"]["mpiReplicaSpecs"]["Worker"] + annotations = worker["template"]["metadata"]["annotations"] + assert annotations["k8s.v1.cni.cncf.io/networks"] == ",".join(self.CNI_NETS) + + def test_cni_mode_worker_has_nic_resources(self, k8s_system: KubernetesSystem, test_run: TestRun) -> None: + payload = self.gen_json(k8s_system, test_run, self.CNI_NETS) + resources = payload["spec"]["mpiReplicaSpecs"]["Worker"]["template"]["spec"]["containers"][0]["resources"] + for net in self.CNI_NETS: + nic = net.split("/", 1)[1] + assert resources["requests"][f"nvidia.com/{nic}"] == "1" + assert resources["limits"][f"nvidia.com/{nic}"] == "1" + + def test_cni_mode_no_host_network(self, k8s_system: KubernetesSystem, test_run: TestRun) -> None: + payload = self.gen_json(k8s_system, test_run, self.CNI_NETS) + worker_spec = payload["spec"]["mpiReplicaSpecs"]["Worker"]["template"]["spec"] + launcher_spec = payload["spec"]["mpiReplicaSpecs"]["Launcher"]["template"]["spec"] + assert "hostNetwork" not in worker_spec + assert "hostNetwork" not in launcher_spec + + def test_host_network_mode_no_annotations(self, k8s_system: KubernetesSystem, test_run: TestRun) -> None: + payload = self.gen_json(k8s_system, test_run, None) # resolve_cni_networks returns None → host network + worker = payload["spec"]["mpiReplicaSpecs"]["Worker"] + assert "metadata" not in worker["template"] + assert worker["template"]["spec"]["hostNetwork"] is True + + def test_launcher_never_gets_nic_resources(self, k8s_system: KubernetesSystem, test_run: TestRun) -> None: + payload = self.gen_json(k8s_system, test_run, self.CNI_NETS) + launcher_resources = payload["spec"]["mpiReplicaSpecs"]["Launcher"]["template"]["spec"]["containers"][0][ + "resources" + ] + assert not any(k.startswith("nvidia.com/nic") for k in launcher_resources["requests"])