Skip to content

Commit fa955c8

Browse files
authored
Implement enhanced stress-ng jobfile support with robust handling (#3922)
* Enchance stressng to use jobfile. * Remove debug logs * fix black error * Addressing comments * nit fix
1 parent ba882ef commit fa955c8

File tree

2 files changed

+233
-24
lines changed

2 files changed

+233
-24
lines changed

lisa/tools/stress_ng.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
# Copyright (c) Microsoft Corporation.
22
# Licensed under the MIT license.
33

4+
from pathlib import Path
45
from typing import cast
56

67
from lisa.executable import Tool
@@ -58,7 +59,15 @@ def launch_cpu(self, num_cores: int = 0, timeout_in_seconds: int = 3600) -> None
5859
self.run(cmd, force_run=True, timeout=timeout_in_seconds)
5960

6061
def launch_job_async(self, job_file: str, sudo: bool = False) -> Process:
61-
return self.run_async(f"--job {job_file}", force_run=True, sudo=sudo)
62+
job_cmd = f"--job {job_file}"
63+
# filename without extension
64+
job_filename = Path(job_file).stem
65+
yaml_output_name = f"{job_filename}.yaml"
66+
# Create full path to YAML file in working directory
67+
yaml_output_path = self.node.working_path / yaml_output_name
68+
job_cmd += f" --yaml {yaml_output_path}"
69+
70+
return self.run_async(job_cmd, force_run=True, sudo=sudo)
6271

6372
def launch_class_async(
6473
self,

microsoft/testsuites/stress/stress_ng_suite.py

Lines changed: 223 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,19 @@
11
# Copyright (c) Microsoft Corporation.
22
# Licensed under the MIT license.
3+
import logging
34
from pathlib import Path, PurePath
4-
from typing import Any, Dict, List, cast
5+
from typing import Any, Dict, List, Tuple, cast
6+
7+
import yaml
58

69
from lisa import Environment, RemoteNode, TestCaseMetadata, TestSuite, TestSuiteMetadata
10+
from lisa.base_tools import Cat
711
from lisa.features import SerialConsole
812
from lisa.messages import TestStatus, send_sub_test_result_message
913
from lisa.testsuite import TestResult
1014
from lisa.tools import StressNg
1115
from lisa.util import SkippedException
16+
from lisa.util.logger import Logger
1217
from lisa.util.process import Process
1318

1419

@@ -34,14 +39,24 @@ class StressNgTestSuite(TestSuite):
3439
)
3540
def stress_ng_jobfile(
3641
self,
42+
log: Logger,
3743
variables: Dict[str, Any],
3844
environment: Environment,
3945
result: TestResult,
4046
) -> None:
4147
if self.CONFIG_VARIABLE in variables:
4248
jobs = variables[self.CONFIG_VARIABLE]
49+
50+
# Convert job file configuration to a list if needed
51+
if not isinstance(jobs, list):
52+
jobs = [job.strip() for job in str(jobs).split(",")]
53+
4354
for job_file in jobs:
44-
self._run_stress_ng_job(job_file, environment, result)
55+
try:
56+
self._run_stress_ng_job(job_file, environment, result, log)
57+
except Exception as e:
58+
log.error(f"Failed to run job file '{job_file}': {e}")
59+
raise
4560
else:
4661
raise SkippedException("No jobfile provided for stress-ng")
4762

@@ -112,34 +127,219 @@ def _run_stress_ng_job(
112127
job_file: str,
113128
environment: Environment,
114129
test_result: TestResult,
130+
log: Logger,
115131
) -> None:
132+
"""
133+
Execute a stress-ng job file on all nodes in the environment.
134+
135+
Args:
136+
job_file: Path to the stress-ng job file
137+
environment: Test environment containing target nodes
138+
test_result: Test result object for reporting
139+
log: Logger instance for detailed logging
140+
"""
141+
116142
nodes = [cast(RemoteNode, node) for node in environment.nodes.list()]
117-
procs: List[Process] = []
143+
stress_processes: List[Process] = []
118144
job_file_name = Path(job_file).name
119-
test_status = TestStatus.QUEUED
120-
test_msg = ""
145+
146+
execution_status = TestStatus.QUEUED
147+
execution_summary = ""
148+
121149
try:
122-
for node in nodes:
123-
remote_working_dir = node.working_path / "stress_ng_jobs"
124-
node.shell.mkdir(remote_working_dir, exist_ok=True)
125-
job_file_dest = remote_working_dir / job_file_name
126-
node.shell.copy(PurePath(job_file), job_file_dest)
127-
procs.append(node.tools[StressNg].launch_job_async(str(job_file_dest)))
128-
for proc in procs:
129-
proc.wait_result(expected_exit_code=0)
130-
test_status = TestStatus.PASSED
131-
except Exception as e:
132-
test_status = TestStatus.FAILED
133-
test_msg = repr(e)
134-
finally:
135-
send_sub_test_result_message(
136-
test_result=test_result,
137-
test_case_name=job_file_name,
138-
test_status=test_status,
139-
test_message=test_msg,
150+
self._deploy_and_launch_stress_jobs(
151+
nodes, job_file, job_file_name, stress_processes, log
152+
)
153+
154+
execution_status, execution_summary = self._monitor_stress_execution(
155+
stress_processes, nodes, log, job_file_name
156+
)
157+
158+
except Exception as execution_error:
159+
execution_status = TestStatus.FAILED
160+
execution_summary = (
161+
f"Error: {type(execution_error).__name__}: {str(execution_error)}"
140162
)
141163
self._check_panic(nodes)
164+
raise execution_error
165+
166+
finally:
167+
self._report_test_results(
168+
test_result, job_file_name, execution_status, execution_summary
169+
)
170+
171+
def _deploy_and_launch_stress_jobs(
172+
self,
173+
nodes: List[RemoteNode],
174+
job_file: str,
175+
job_file_name: str,
176+
stress_processes: List[Process],
177+
log: Logger,
178+
) -> None:
179+
"""
180+
Deploy job files to nodes and launch stress-ng processes.
181+
182+
Args:
183+
nodes: List of target nodes
184+
job_file: Local path to job file
185+
job_file_name: Name of the job file
186+
stress_processes: List to store launched processes
187+
log: Logger instance for detailed logging
188+
"""
189+
for node_index, node in enumerate(nodes):
190+
try:
191+
log.debug(f"Processing node {node_index + 1}/{len(nodes)}: {node.name}")
192+
193+
# Create dedicated workspace for stress-ng jobs
194+
remote_workspace = node.working_path / "stress_ng_jobs"
195+
node.shell.mkdir(remote_workspace, exist_ok=True)
196+
197+
# Deploy job file to remote node
198+
remote_job_file = remote_workspace / job_file_name
199+
node.shell.copy(PurePath(job_file), remote_job_file)
200+
201+
# Launch stress-ng with the job file
202+
stress_process = node.tools[StressNg].launch_job_async(
203+
str(remote_job_file),
204+
)
205+
stress_processes.append(stress_process)
206+
207+
except Exception as deployment_error:
208+
log.error(
209+
f"Failed to start stress job on node {node_index + 1}: "
210+
f"{deployment_error}"
211+
)
212+
if getattr(node, "log", None):
213+
node.log.error(f"Failed to start stress job: {deployment_error}")
214+
raise deployment_error
215+
216+
def _monitor_stress_execution(
217+
self,
218+
stress_processes: List[Process],
219+
nodes: List[RemoteNode],
220+
log: Logger,
221+
job_file_name: str,
222+
) -> Tuple[TestStatus, str]:
223+
"""
224+
Monitor stress-ng execution and capture stress-ng info output.
225+
226+
Returns:
227+
Tuple of (TestStatus, stress_ng_info_output)
228+
"""
229+
230+
failed_nodes = 0
231+
node_outputs = []
232+
exceptions_to_raise = []
233+
234+
# Wait for all processes and capture their output
235+
for i, process in enumerate(stress_processes):
236+
node_name = nodes[i].name
237+
try:
238+
process.wait_result(timeout=self.TIME_OUT, expected_exit_code=0)
239+
log.info(f"{node_name} completed successfully")
240+
241+
# Process YAML output if applicable
242+
node_output = self._process_yaml_output(nodes[i], job_file_name, log)
243+
244+
node_outputs.append(node_output)
245+
246+
except Exception as e:
247+
failed_nodes += 1
248+
error_output = f"=== {node_name} ===\nERROR: {str(e)}"
249+
node_outputs.append(error_output)
250+
log.error(f"{node_name} failed: {e}")
251+
# Store the exception to re-raise after collecting all outputs
252+
exceptions_to_raise.append(e)
253+
254+
# Combine all node outputs, including node names for clarity
255+
execution_summary = f"Job: {job_file_name}\n\n"
256+
for i, node_output in enumerate(node_outputs):
257+
node_name = nodes[i].name
258+
execution_summary += f"=== {node_name} ===\n{node_output}\n\n"
259+
260+
# If any processes failed, re-raise the first exception to fail the test
261+
if exceptions_to_raise:
262+
log.error(
263+
f"Stress-ng job failed on {failed_nodes} node(s). "
264+
f"Re-raising first exception to fail the test case."
265+
)
266+
raise exceptions_to_raise[0]
267+
268+
# Return status and stress-ng info output
269+
overall_status = TestStatus.PASSED if failed_nodes == 0 else TestStatus.FAILED
270+
return overall_status, execution_summary
271+
272+
def _report_test_results(
273+
self,
274+
test_result: TestResult,
275+
job_file_name: str,
276+
execution_status: TestStatus,
277+
execution_summary: str,
278+
) -> None:
279+
"""
280+
Report the stress test results through LISA's messaging system.
281+
282+
Args:
283+
test_result: Test result object for reporting
284+
job_file_name: Name of the executed job file
285+
execution_status: Final test status (PASSED/FAILED)
286+
execution_summary: Comprehensive execution summary
287+
"""
288+
send_sub_test_result_message(
289+
test_result=test_result,
290+
test_case_name=job_file_name,
291+
test_status=execution_status,
292+
test_message=execution_summary,
293+
)
142294

143295
def _check_panic(self, nodes: List[RemoteNode]) -> None:
144296
for node in nodes:
145297
node.features[SerialConsole].check_panic(saved_path=None, force_run=True)
298+
299+
def _process_yaml_output(
300+
self,
301+
node: RemoteNode,
302+
job_file_name: str,
303+
log: Logger,
304+
) -> str:
305+
"""
306+
Process YAML output file if it exists and return a concise summary string.
307+
Only extracts 'system-info' and 'times' sections if present.
308+
"""
309+
logging.getLogger("YamlManager").setLevel(logging.WARNING)
310+
311+
job_stem = Path(job_file_name).stem
312+
yaml_filename = f"{job_stem}.yaml"
313+
yaml_file_path = node.working_path / yaml_filename
314+
315+
if not node.shell.exists(yaml_file_path):
316+
return "No YAML output file found"
317+
318+
cat = node.tools[Cat]
319+
yaml_content = cat.read(str(yaml_file_path)).strip()
320+
if not yaml_content:
321+
return "YAML file is empty"
322+
323+
try:
324+
parsed_yaml = yaml.safe_load(yaml_content)
325+
except Exception as e:
326+
log.warning(f"Failed to parse YAML content: {e}")
327+
return "YAML parse error"
328+
329+
if not isinstance(parsed_yaml, dict):
330+
return str(parsed_yaml) if parsed_yaml else "YAML file is empty or invalid"
331+
332+
# Only extract 'system-info' and 'times' if present
333+
output_lines = []
334+
for key in ("system-info", "times"):
335+
if key in parsed_yaml:
336+
output_lines.append(f"{key}:")
337+
value = parsed_yaml[key]
338+
if isinstance(value, dict):
339+
for sub_k, sub_v in value.items():
340+
output_lines.append(f" {sub_k}: {sub_v}")
341+
else:
342+
output_lines.append(f" {value}")
343+
if not output_lines:
344+
return "No system-info or times in YAML"
345+
return "\n".join(output_lines)

0 commit comments

Comments
 (0)