From 957594a6de61955c4079c55e39e9fe490491a922 Mon Sep 17 00:00:00 2001 From: Dehan Meng Date: Tue, 9 Dec 2025 15:05:01 +0800 Subject: [PATCH 1/2] win_guest_debugging_tool: Optimize and simplify script The test script for the Windows guest debugging tool was significantly refactored to improve its structure, eliminate redundancy, and increase maintainability. Key logical improvements include: - The complex three-class inheritance structure was consolidated into a single, self-contained WinDebugToolTest class, simplifying the overall architecture. - A centralized helper method, _run_script_and_get_paths, was created to handle all PowerShell script executions and output parsing. This removes significant code duplication that was present in each test variant. - Individual test methods were streamlined to focus purely on their specific validation logic, delegating common tasks like script execution and session management to helper functions. - VM session management was made more robust to ensure sessions are properly created, reused, and cleaned up. Signed-off-by: Dehan Meng --- qemu/tests/cfg/win_guest_debugging_tool.cfg | 10 +- qemu/tests/win_guest_debugging_tool.py | 1558 ++++++++----------- 2 files changed, 665 insertions(+), 903 deletions(-) diff --git a/qemu/tests/cfg/win_guest_debugging_tool.cfg b/qemu/tests/cfg/win_guest_debugging_tool.cfg index 9c51de7c03..8100676e09 100644 --- a/qemu/tests/cfg/win_guest_debugging_tool.cfg +++ b/qemu/tests/cfg/win_guest_debugging_tool.cfg @@ -13,10 +13,15 @@ cdrom_virtio = isos/windows/virtio-win.iso cmd_findstr_in_file = type %s | findstr "%s" include_sensitive_data = False + script_execution_timeout = 720 target_files = "msinfo32.txt,system.evtx,security.evtx,application.evtx,drv_list.csv,virtio_disk.txt,WindowsUpdate.log,Services.csv,WindowsUptime.txt,RunningProcesses.csv,InstalledApplications.csv,InstalledKBs.csv,NetworkInterfaces.txt,IPConfiguration.txt,setupapi.dev.log,setupapi.setup.log,setupapi.offline.log,ErrorWindowsUpdate.log,OutputWindowsUpdate.log,LocaleMetaData" target_dump_files = "MEMORY.DMP,Minidump" script_name = "CollectSystemInfo.ps1" cmd_search_file_global = powershell.exe -Command "Get-PSDrive -PSProvider FileSystem | ForEach-Object { Get-ChildItem -Path $_.Root -Recurse -Filter '%s' -ErrorAction SilentlyContinue } | ForEach-Object { Join-Path -Path $_.Directory.FullName -ChildPath $_.Name }" + Win2016: + script_execution_timeout = 1200 + cmd_disable_ie_esc_admin = powershell.exe -Command "Set-ItemProperty -Path 'HKLM:\SOFTWARE\Microsoft\Active Setup\Installed Components\{A509B1A7-37EF-4b3f-8CFC-4F3A74704073}' -Name 'IsInstalled' -Value 0" + cmd_disable_ie_esc_user = powershell.exe -Command "Set-ItemProperty -Path 'HKLM:\SOFTWARE\Microsoft\Active Setup\Installed Components\{A509B1A8-37EF-4b3f-8CFC-4F3A74704073}' -Name 'IsInstalled' -Value 0" variants: - check_script_execution: windegtool_check_type = script_execution @@ -29,7 +34,7 @@ - check_user_friendliness: windegtool_check_type = user_friendliness cmd_kill_powershell_process = taskkill /IM powershell.exe /F - cmd_kill_powershell_process1 = powershell.exe -Command "Stop-Process -Name msinfo32 -Force" + cmd_kill_powershell_process1 = powershell.exe -Command "Stop-Process -Name msinfo32 -Force -ErrorAction SilentlyContinue" invalid_params = "-invalidparam,IncludeSensitiveData,0000,hell,-H,-IncludeSensitiveData -h" expect_output_prompt = "Usage: .\CollectSystemInfo.ps1 [-IncludeSensitiveData] [-Help]" script_interrupt_signal_file = 'Collecting_Status.txt' @@ -67,6 +72,7 @@ setupapi_dev_file_path = "%s\setupapi.dev.log" target_driver = pvpanic cmd_query_oem_inf = powershell.exe -Command "pnputil.exe /enum-drivers | Select-String -Pattern '%s.inf' -Context 1,1 | ForEach-Object { if ($_ -match 'Published Name:\s+(oem\d+\.inf)') { $matches[1] } }" + cmd_disable_device = powershell.exe -Command "$d=Get-PnpDevice -ErrorAction SilentlyContinue | Where-Object {$_.FriendlyName -like '*%s*' -or $_.InstanceId -like '*%s*'}; if ($d) { $d | Disable-PnpDevice -Confirm:$false -ErrorAction SilentlyContinue }" cmd_install_driver = pnputil.exe /add-driver %s /install cmd_uninstall_driver = pnputil.exe /delete-driver %s /uninstall /force cmd_scan_device = pnputil.exe /scan-devices @@ -96,7 +102,7 @@ standard_docs = "README.md", "LICENSE", "CollectSystemInfo.ps1" target_doc = "README.md" query_cmd_from_file = powershell.exe -Command "Get-Content %s | Select-String '```powershell' -Context 0,1" - cmd_cp_file = powershell.exe -Command "cp %s %s" + cmd_cp_file = powershell.exe -Command "cp %s %s -Force" - check_IO_limits: windegtool_check_type = IO_limits cmd_get_io_folder = powershell.exe -Command "Get-ChildItem -Path %s -Recurse -Filter '*IOLimits_C*' | Select-Object -ExpandProperty FullName" diff --git a/qemu/tests/win_guest_debugging_tool.py b/qemu/tests/win_guest_debugging_tool.py index e502c0e406..12b06711da 100644 --- a/qemu/tests/win_guest_debugging_tool.py +++ b/qemu/tests/win_guest_debugging_tool.py @@ -2,1046 +2,802 @@ import re import threading import time -from distutils.util import strtobool from aexpect.exceptions import ShellTimeoutError -from virttest import ( - error_context, -) +from virttest import error_context LOG_JOB = logging.getLogger("avocado.test") -class BaseVirtTest(object): +class WinDebugToolTest: + """ + Test class for the Windows Guest Debugging Tool. + This class encapsulates the logic for testing the CollectSystemInfo.ps1 script. + """ + def __init__(self, test, params, env): self.test = test self.params = params self.env = env - - def initialize(self, test, params, env): - if test: - self.test = test - if params: - self.params = params - if env: - self.env = env - start_vm = self.params["start_vm"] - self.start_vm = start_vm - if self.start_vm == "yes": - vm = self.env.get_vm(params["main_vm"]) - vm.verify_alive() - self.vm = vm - - def setup(self, test, params, env): - if test: - self.test = test - if params: - self.params = params - if env: - self.env = env - - def run_once(self, test, params, env): - if test: - self.test = test - if params: - self.params = params - if env: - self.env = env - - def before_run_once(self, test, params, env): - pass - - def after_run_once(self, test, params, env): - pass - - def cleanup(self, test, params, env): - pass - - def execute(self, test, params, env): - self.initialize(test, params, env) - self.setup(test, params, env) - try: - self.before_run_once(test, params, env) - self.run_once(test, params, env) - self.after_run_once(test, params, env) - finally: - self.cleanup(test, params, env) - - -class WinDebugToolTest(BaseVirtTest): - def __init__(self, test, params, env): - super().__init__(test, params, env) - self._open_session_list = [] self.vm = None - self.script_dir = None - self.script_name = params.get("script_name", "") - self.script_path = params.get("script_path", "") - self.tmp_dir = params.get("test_tmp_dir", "") - - def _get_session(self, params, vm): - if not vm: - vm = self.vm - vm.verify_alive() - timeout = int(params.get("login_timeout", 360)) - session = vm.wait_for_login(timeout=timeout) - return session - - def _cleanup_open_session(self): - try: - for s in self._open_session_list: + self.session = None + self.script_name = self.params.get("script_name", "CollectSystemInfo.ps1") + self.script_path = "" + self.script_dir = "" + self.tmp_dir = self.params.get("test_tmp_dir", "${tmp_dir}\\testtmpdir") + self._open_sessions = [] + + def _get_session(self, new=False): + """ + Gets a new or existing session to the VM. + Manages sessions to avoid leaving them open. + """ + if self.session and not new: + # Ensure the existing session is tracked + if self.session not in self._open_sessions: + self._open_sessions.append(self.session) + return self.session + self.vm.verify_alive() + timeout = int(self.params.get("login_timeout", 360)) + session = self.vm.wait_for_login(timeout=timeout) + self._open_sessions.append(session) + if new: + return session + self.session = session + return self.session + + def _cleanup_sessions(self): + """Closes all tracked sessions.""" + for s in self._open_sessions: + try: if s: s.close() - except Exception: - pass - - def run_once(self, test, params, env): - BaseVirtTest.run_once(self, test, params, env) - if self.start_vm == "yes": - pass - - def cleanup(self, test, params, env): - self._cleanup_open_session() - - @error_context.context_aware - def setup(self, test, params, env): - BaseVirtTest.setup(self, test, params, env) - if self.start_vm == "yes": - session = self._get_session(params, self.vm) - self._open_session_list.append(session) - - error_context.context("Check whether debug tool exists.", LOG_JOB.info) - cmd_get_debug_tool_script = ( - params["cmd_search_file_global"] % self.script_name - ) - script_path = str( - session.cmd_output(cmd_get_debug_tool_script, timeout=360) - ).strip() - if script_path: - self.script_path = script_path - self.script_dir = self.script_path.replace(self.script_name, "").rstrip( - "\\" - ) - else: - test.error( - "The tool script file CollectSystemInfo.ps1 was not " - "found. Please check." - ) + except Exception as e: + LOG_JOB.warning("Failed to close session: %s", e) + self._open_sessions.clear() + def setup(self): + """ + Initial setup for the test environment on the guest VM. + """ + self.vm = self.env.get_vm(self.params["main_vm"]) + self.vm.verify_alive() + self.session = self._get_session() + + error_context.context("Locating debug tool script.", LOG_JOB.info) + cmd = self.params["cmd_search_file_global"] % self.script_name + raw = str(self.session.cmd_output(cmd, timeout=360)) + matches = [l.strip() for l in raw.splitlines() if l.strip()] + if not matches: + self.test.error(f"'{self.script_name}' not found on the guest.") + if len(matches) > 1: + self.test.log.info( + "Multiple '%s' found; using first: %r", + self.script_name, + matches[:5], + ) + self.script_path = matches[0] + self.script_dir = self.script_path.replace(self.script_name, "").rstrip("\\") + + error_context.context("Creating temporary work directory.", LOG_JOB.info) + self.session.cmd_output(self.params["cmd_create_dir"] % self.tmp_dir) + self.session.cmd(f'cd "{self.tmp_dir}"') + + # Disable IE Enhanced Security Configuration for Win2016 if configured + if self.params.get("cmd_disable_ie_esc_admin"): error_context.context( - "Create tmp work dir since testing would create lots of dir and files.", - LOG_JOB.info, + "Disabling IE Enhanced Security Configuration.", LOG_JOB.info ) - session.cmd_output(params["cmd_create_dir"] % self.tmp_dir) - - def _check_generated_files(self, session, path, sensitive_data=False): - output = str(session.cmd_output(f"dir /b {path}")).strip() - - target_files = ( - self.params["target_dump_files"] - if sensitive_data - else self.params["target_files"] - ).split(",") - - for target_file in target_files: - if target_file not in output: - self.test.error(f"{target_file} is not included, please check it.") - - def _get_path(self, output, session, sensitive_data=False): - log_folder_path = re.search(r"Log folder path: (.+)", output).group(1) - self._check_generated_files(session, log_folder_path) - log_zip_path = f"{log_folder_path}.zip" - - if sensitive_data: - dump_folder_match = re.search(r"Dump folder path: (.+)", output) - if dump_folder_match: - dump_folder_path = dump_folder_match.group(1) - self._check_generated_files( - session, dump_folder_path, sensitive_data=True - ) - dump_zip_path = f"{dump_folder_path}.zip" - return log_folder_path, log_zip_path, dump_folder_path, dump_zip_path + self.session.cmd(self.params["cmd_disable_ie_esc_admin"]) + self.session.cmd(self.params["cmd_disable_ie_esc_user"]) + + def _run_script_and_get_paths(self, extra_args=""): + """ + Runs the CollectSystemInfo.ps1 script and parses its output for file paths. - return log_folder_path, log_zip_path + :param extra_args: String of extra arguments to pass to the script. + :return: A dictionary containing paths for logs and dumps. + """ + cmd = ( + "powershell.exe -ExecutionPolicy Bypass " + f'-File "{self.script_path}" {extra_args}' + ) + script_timeout = int(self.params.get("script_execution_timeout", 720)) + status, output = self.session.cmd_status_output(cmd, timeout=script_timeout) + if status != 0: + self.test.fail(f"Script execution failed with status {status}") -class WinDebugToolTestBasicCheck(WinDebugToolTest): - def __init__(self, test, params, env): - super().__init__(test, params, env) + paths = { + "log_folder": None, + "log_zip": None, + "dump_folder": None, + "dump_zip": None, + } - @error_context.context_aware - def run_tool_scripts(self, session, return_zip_path=False): - error_context.context( - "Run Debug tool script to Query original info or value.", LOG_JOB.info - ) - # Running the PowerShell script on the VM - include_sensitive_data = bool(strtobool(self.params["include_sensitive_data"])) - sensitive_data_flag = "-IncludeSensitiveData" if include_sensitive_data else "" - - # Execute the command on the VM - cmd_run_deg_tool = ( - f"powershell.exe -ExecutionPolicy Bypass -File {self.script_path} " - f"{sensitive_data_flag}" - ) - s, o = session.cmd_status_output(cmd_run_deg_tool, timeout=360) - - paths = self._get_path(o, session, sensitive_data=include_sensitive_data) - if include_sensitive_data: - log_folder_path, log_zip_path, dump_folder_path, dump_zip_path = paths - if not all(paths): - self.test.fail("Debug tool run failed, please check it.") - return paths if return_zip_path else (log_folder_path, dump_folder_path) - else: - log_folder_path, log_zip_path = paths - if not all(paths): - self.test.fail("Debug tool run failed, please check it.") - return paths if return_zip_path else log_folder_path + log_folder_match = re.search(r"Log folder path: (.+)", output) + if log_folder_match: + paths["log_folder"] = log_folder_match.group(1).strip() + paths["log_zip"] = f"{paths['log_folder']}.zip" + self._check_generated_files(paths["log_folder"], is_dump=False) - @error_context.context_aware - def windegtool_check_script_execution(self, test, params, env): - """ - Verify basic script execution functionality: - 1. Launch Windows guest and execute debug tool script - 2. Verify log folder and zip file are generated correctly - 3. Check for any execution errors - - :param test: QEMU test object - :param params: Dictionary with test parameters - :param env: Dictionary with the test environment - """ - if not self.vm: - self.vm = env.get_vm(params["main_vm"]) - self.vm.verify_alive() + dump_folder_match = re.search(r"Dump folder path: (.+)", output) + if dump_folder_match: + paths["dump_folder"] = dump_folder_match.group(1).strip() + paths["dump_zip"] = f"{paths['dump_folder']}.zip" + self._check_generated_files(paths["dump_folder"], is_dump=True) - session = self._get_session(params, self.vm) - self._open_session_list.append(session) - session.cmd("cd %s" % self.tmp_dir) + if not paths["log_folder"]: + self.test.fail("Failed to get log folder path from script output.") - log_folder_path, log_zip_path = self.run_tool_scripts( - session, return_zip_path=True - ) - if not (log_zip_path and log_folder_path): - test.fail("debug tool run failed, please check it.") + return paths - @error_context.context_aware - def windegtool_check_zip_package(self, test, params, env): + def _check_generated_files(self, path, is_dump=False): """ - Verify zip package functionality: - 1. Run debug tool to generate logs - 2. Extract generated ZIP file - 3. Compare extracted folder with original log folder - 4. Verify folder sizes match - - :param test: QEMU test object - :param params: Dictionary with test parameters - :param env: Dictionary with the test environment + Verifies that the expected files are generated in the given path. """ - session = self._get_session(params, self.vm) - self._open_session_list.append(session) - session.cmd("cd %s" % self.tmp_dir) + output = str(self.session.cmd_output(f'dir /b "{path}"')).strip() + present = {line.strip() for line in output.splitlines() if line.strip()} + target_files_key = "target_dump_files" if is_dump else "target_files" + target_files = self.params[target_files_key].split(",") - log_folder_path, log_zip_path = self.run_tool_scripts( - session, return_zip_path=True - ) - if not (log_zip_path and log_folder_path): - test.fail("debug tool run failed, please check it.") - - error_context.context("Extract ZIP and check the data files.", LOG_JOB.info) - session.cmd("cd %s" % self.tmp_dir) - extract_folder = log_zip_path + "_extract" - s, o = session.cmd_status_output( - params["cmd_extract_zip"] % (log_zip_path, extract_folder) - ) - if s: - test.error("Extract ZIP failed, please take a look and check.") - - error_context.context("Compare the folders", LOG_JOB.info) - extract_folder_size = session.cmd_output( - params["cmd_check_folder_size"] % extract_folder - ) - log_folder_size = session.cmd_output( - params["cmd_check_folder_size"] % log_folder_path - ) - if log_folder_size != extract_folder_size: - test.fail( - "ZIP package have problem, since the size of it " - "is not same with the original log folder." + missing_files = [f for f in target_files if f not in present] + if missing_files: + self.test.error( + f"Missing generated files in '{path}': {', '.join(missing_files)}" ) - @error_context.context_aware - def windegtool_check_run_tools_multi_times(self, test, params, env): + def _execute_test_variant(self): """ - Verify script stability with multiple executions: - 1. Run debug tool multiple times in sequence - 2. Verify each execution succeeds and generates logs - 3. Clean up logs between runs - 4. Check for any errors or inconsistencies - - :param test: QEMU test object - :param params: Dictionary with test parameters - :param env: Dictionary with the test environment + Dynamically calls the test method based on the 'windegtool_check_type' + parameter from the configuration. """ - if not self.vm: - self.vm = env.get_vm(params["main_vm"]) - self.vm.verify_alive() + check_type = self.params.get("windegtool_check_type") + if not check_type: + self.test.error("'windegtool_check_type' not specified in config.") - session = self._get_session(params, self.vm) - self._open_session_list.append(session) - session.cmd("cd %s" % self.tmp_dir) + test_method_name = f"_check_{check_type}" + if not hasattr(self, test_method_name): + self.test.error(f"Unknown test type: '{check_type}'") + test_method = getattr(self, test_method_name) - error_context.context( - "Run scripts 100 times and check there is no problem", LOG_JOB.info - ) - for i in range(1, 5): - log_folder_path, log_zip_path = self.run_tool_scripts( - session, return_zip_path=True - ) - if not (log_zip_path and log_folder_path): - test.fail("debug tool run failed, please check it.") - cmd_remove_zip = params["cmd_remove_dir"] % log_zip_path - cmd_remove_folder = params["cmd_remove_dir"] % log_folder_path - cmd_clean_dir = "%s && %s" % (cmd_remove_folder, cmd_remove_zip) - session.cmd(cmd_clean_dir) + LOG_JOB.info("--- Running test variant: %s ---", check_type) + test_method() + LOG_JOB.info("--- Finished test variant: %s ---", check_type) @error_context.context_aware - def windegtool_check_user_friendliness(self, test, params, env): + def _check_script_execution(self): """ - Test script's user-friendly features: - 1. Test invalid parameter handling and error messages - 2. Test script interruption handling - 3. Verify interrupt signal file generation - 4. Test script recovery after interruption - - :param test: QEMU test object - :param params: Dictionary with test parameters - :param env: Dictionary with the test environment + Verifies basic script execution and that log/zip files are created. """ - - def _run_script(session): - cmd_run_deg_tool = ( - f"powershell.exe -ExecutionPolicy Bypass -File {self.script_path}" + paths = self._run_script_and_get_paths() + if not (paths["log_folder"] and paths["log_zip"]): + self.test.fail( + "Basic script execution failed to produce log folder or zip." ) - try: - session.cmd_output(cmd_run_deg_tool, timeout=5) - except ShellTimeoutError as e: - LOG_JOB.info("Script execution timed out as expected: %s", str(e)) - pass - except Exception as e: - LOG_JOB.error("Unexpected error during script execution: %s", str(e)) - test.error(f"Script execution failed with unexpected error: {str(e)}") - - def _kill_powershell_process(session1): - session1.cmd(params["cmd_kill_powershell_process"]) - session1.cmd(params["cmd_kill_powershell_process1"]) - - if not self.vm: - self.vm = env.get_vm(params["main_vm"]) - self.vm.verify_alive() - session = self._get_session(params, self.vm) - session1 = self._get_session(params, self.vm) - self._open_session_list.extend([session, session1]) + @error_context.context_aware + def _check_zip_package(self): + """ + Verifies that the generated zip package is valid and matches the source folder. + """ + paths = self._run_script_and_get_paths() + log_folder, log_zip = paths["log_folder"], paths["log_zip"] error_context.context( - "Run script with various of invalid parameters.", LOG_JOB.info + "Extracting ZIP and comparing folder sizes.", LOG_JOB.info ) - session.cmd("cd %s" % self.tmp_dir) - session.cmd("rd /S /Q %s" % self.tmp_dir) - session.cmd_output(params["cmd_create_dir"] % self.tmp_dir) - invalid_params = list(params["invalid_params"].split(",")) - expect_output_prompt = params["expect_output_prompt"] - for invalid_param in invalid_params: - cmd_run_deg_tool = ( - f"powershell.exe -ExecutionPolicy Bypass -File {self.script_path} " - f"{invalid_param}" + extract_folder = f"{log_folder}_extracted" + cmd_extract = self.params["cmd_extract_zip"] % (log_zip, extract_folder) + status, _ = self.session.cmd_status_output(cmd_extract) + if status != 0: + self.test.error("Failed to extract the generated ZIP file.") + + size_cmd = self.params["cmd_check_folder_size"] + original_size_str = self.session.cmd_output(size_cmd % log_folder).strip() + extracted_size_str = self.session.cmd_output(size_cmd % extract_folder).strip() + + try: + original_size = int(original_size_str.replace(",", "")) + extracted_size = int(extracted_size_str.replace(",", "")) + except ValueError: + self.test.error( + "Failed to parse folder sizes: " + f"original='{original_size_str}', " + f"extracted='{extracted_size_str}'" ) - o = session.cmd_output(cmd_run_deg_tool, timeout=360) - if expect_output_prompt not in o: - test.fail( - "There should be friendly reminder output %s, telling " - "users how to run the script with reasonable parameters" - "please check it." % expect_output_prompt - ) - error_context.context( - "Interrupt script when it's running to check the signal file.", - LOG_JOB.info, - ) - script_thread = threading.Thread(target=_run_script(session)) - kill_thread = threading.Thread(target=_kill_powershell_process(session1)) - script_thread.start() - time.sleep(5) - kill_thread.start() - script_thread.join() - kill_thread.join() - - script_interrupt_signal_file = params["script_interrupt_signal_file"] - log_path = session.cmd_output(params["cmd_query_path"]).split()[0] - session.cmd("cd %s" % log_path) - output = session.cmd_output("dir") - if script_interrupt_signal_file not in output: - test.fail( - f"There should be one {script_interrupt_signal_file} once the script" - f" is interrupted, but there isn't. Please check it." + if original_size != extracted_size: + self.test.fail( + "Size of original log folder and extracted folder do not " + f"match. Original: {original_size_str}, Extracted: {extracted_size_str}" ) + @error_context.context_aware + def _check_run_tools_multi_times(self): + """ + Verifies script stability by running it multiple times in a loop. + """ error_context.context( - "Clean invalid files and re-run script again to check " - "whether it could be run well.", - LOG_JOB.info, + "Running script 5 times for stability check.", LOG_JOB.info ) - session.cmd("cd ../") - session.cmd(params["cmd_dir_del"] % log_path) - self.run_tool_scripts(session) + for i in range(5): + LOG_JOB.info("Running iteration %d...", i + 1) + paths = self._run_script_and_get_paths() + # Clean up for the next iteration + self.session.cmd(self.params["cmd_remove_dir"] % paths["log_folder"]) + self.session.cmd(self.params["cmd_remove_dir"] % paths["log_zip"]) @error_context.context_aware - def windegtool_check_disk_registry_collection(self, test, params, env): + def _check_user_friendliness(self): """ - Test disk and registry information collection: - 1. Collect initial registry values - 2. Compare values between registry and collected file - 3. Modify registry entries and verify changes are captured - 4. Test registry key creation and deletion - 5. Verify accurate collection of modified values - - :param test: QEMU test object - :param params: Dictionary with test parameters - :param env: Dictionary with the test environment + Tests user-friendly features like invalid parameter handling and + interruption recovery. """ - session = self._get_session(params, self.vm) - self._open_session_list.append(session) - session.cmd("cd %s" % self.tmp_dir) + # 1. Test invalid parameter handling + error_context.context("Testing invalid parameter handling.", LOG_JOB.info) + invalid_params = self.params["invalid_params"].split(",") + expected_prompt = self.params["expect_output_prompt"] + for param in invalid_params: + cmd = ( + "powershell.exe -ExecutionPolicy Bypass " + f"-File {self.script_path} {param}" + ) + output = self.session.cmd_output(cmd, timeout=360) + if expected_prompt not in output: + self.test.fail( + "Script did not show expected friendly prompt for " + f"invalid param '{param}'." + ) - error_context.context( - "Run script first to collect 'virtio_disk.txt'.", LOG_JOB.info - ) - log_folder_path = self.run_tool_scripts(session) - virtio_disk_filepath = params["virtio_disk_filepath"] % log_folder_path - exist_reg_item = params["exist_reg_item"] - reg_subkey1, reg_subkey2 = params["reg_subkey1"], params["reg_subkey2"] - iotimeoutvalue_file = int( - session.cmd_output( - params["cmd_findstr_in_file"] - % (virtio_disk_filepath, (exist_reg_item + "\\" + reg_subkey1)) - ) - .split(":")[-1] - .strip() + # 2. Test interruption handling + error_context.context("Testing script interruption handling.", LOG_JOB.info) + session2 = self._get_session(new=True) + + def run_and_interrupt(): + try: + # This is expected to time out when the process is terminated + cmd = f"powershell.exe -ExecutionPolicy Bypass -File {self.script_path}" + self.session.cmd_output(cmd, timeout=10) + except ShellTimeoutError: + LOG_JOB.info( + "Script execution timed out as expected after interruption." + ) + except Exception as e: + self.test.error(f"Unexpected error during script execution: {e}") + + script_thread = threading.Thread(target=run_and_interrupt) + script_thread.start() + time.sleep(5) # Let the script run for a bit + + # Terminate the process from another session. Use cmd_status_output to + # ignore non-zero exit codes, as processes may not exist. + LOG_JOB.debug("Attempting to terminate powershell.exe process.") + status1, output1 = session2.cmd_status_output( + self.params["cmd_kill_powershell_process"] ) - timeoutvalue_file = int( - session.cmd_output( - params["cmd_findstr_in_file"] - % (virtio_disk_filepath, (exist_reg_item + "\\" + reg_subkey2)) - ) - .split(":")[-1] - .strip() + LOG_JOB.debug( + "Terminate powershell result - status: %s, output: %r", + status1, + output1, ) - iotimeoutvalue_cmd = int( - session.cmd_output(params["cmd_reg_query"] % (exist_reg_item, reg_subkey1)) + + LOG_JOB.debug("Attempting to terminate msinfo32.exe process.") + status2, output2 = session2.cmd_status_output( + self.params["cmd_kill_powershell_process1"] ) - timeoutvalue_cmd = int( - session.cmd_output(params["cmd_reg_query"] % (exist_reg_item, reg_subkey2)) + LOG_JOB.debug( + "Terminate msinfo32 result - status: %s, output: %r", status2, output2 ) - if ( - iotimeoutvalue_cmd != iotimeoutvalue_file - or timeoutvalue_cmd != timeoutvalue_file - ): - test.error( - "The value of %s and %s is not same between %s and cmd, Please " - "have a check" % (reg_subkey1, reg_subkey2, virtio_disk_filepath) - ) + script_thread.join() - error_context.context( - "Edit exist value and create new sub-key for " - "vioscsi/viostor for non-exist value", - LOG_JOB.info, + # Verify signal file exists + # Find the latest SystemInfo directory + find_cmd = ( + 'powershell.exe -Command "' + "Get-ChildItem -Directory -Filter SystemInfo_* | " + "Sort-Object LastWriteTime -Descending | " + "Select-Object -First 1 -ExpandProperty Name" + '"' ) - new_reg_item = params["new_reg_item"] - cmd_reg_add_item = params["cmd_reg_add_item"] % (new_reg_item, new_reg_item) - cmd_reg_add_item_key = params["cmd_reg_add_item_key"] - cmd_reg_set_value = params["cmd_reg_set_value"] - try: - key_value1, key_value2 = ( - int(params["key_value1"]), - int(params["key_value2"]), - ) + query_output = self.session.cmd_output(find_cmd) + output_lines = query_output.splitlines() - s, o = session.cmd_status_output(cmd_reg_add_item) - if not s: - session.cmd_output( - cmd_reg_add_item_key % (new_reg_item, new_reg_item, reg_subkey1) - ) - session.cmd_output( - cmd_reg_add_item_key % (new_reg_item, new_reg_item, reg_subkey2) - ) - else: - test.error( - "Add register item for vioscsi/viostor failed, please help check." - ) + if not output_lines or not output_lines[0].strip(): + # Fallback to original command if the new one fails + query_cmd = self.params["cmd_query_path"] + query_output = self.session.cmd_output(query_cmd) + output_lines = query_output.splitlines() - session.cmd_output( - cmd_reg_set_value % (exist_reg_item, reg_subkey1, key_value1) - ) - session.cmd_output( - cmd_reg_set_value % (new_reg_item, reg_subkey1, key_value1) - ) - session.cmd_output( - cmd_reg_set_value % (new_reg_item, reg_subkey2, key_value2) - ) + if not output_lines or not output_lines[0].strip(): + self.test.error("Failed to find SystemInfo directory.") - error_context.context("Re-run guest debug tool script", LOG_JOB.info) - new_log_folder_path = self.run_tool_scripts(session) - new_virtio_disk_filepath = ( - params["virtio_disk_filepath"] % new_log_folder_path - ) - cmd_findstr_in_file = params["cmd_findstr_in_file"] - exist_iotimeoutvalue_file = int( - session.cmd_output( - cmd_findstr_in_file - % (new_virtio_disk_filepath, (exist_reg_item + "\\" + reg_subkey1)) - ) - .split(":")[-1] - .strip() - ) - new_iotimeoutvalue_file = int( - session.cmd_output( - cmd_findstr_in_file - % (new_virtio_disk_filepath, (new_reg_item + "\\" + reg_subkey1)) - ) - .split(":")[-1] - .strip() - ) - new_timeoutvalue_file = int( - session.cmd_output( - cmd_findstr_in_file - % (new_virtio_disk_filepath, (new_reg_item + "\\" + reg_subkey2)) - ) - .split(":")[-1] - .strip() - ) + # Construct the full path to the log directory + log_path = output_lines[0].strip() + if not log_path.startswith("C:\\") and not log_path.startswith(self.tmp_dir): + log_path = f"{self.tmp_dir}\\{log_path}" - cmd_reg_query = params["cmd_reg_query"] - exist_iotimeoutvalue_cmd = int( - session.cmd_output(cmd_reg_query % (exist_reg_item, reg_subkey1)) - ) - new_iotimeoutvalue_cmd = int( - session.cmd_output(cmd_reg_query % (new_reg_item, reg_subkey1)) - ) - new_timeoutvalue_cmd = int( - session.cmd_output(cmd_reg_query % (new_reg_item, reg_subkey2)) - ) - if ( - exist_iotimeoutvalue_cmd != exist_iotimeoutvalue_file - or new_iotimeoutvalue_cmd != new_iotimeoutvalue_file - or new_timeoutvalue_cmd != new_timeoutvalue_file - ): - test.error( - "The value of %s and %s is not same between %s and cmd, Please " - "have a check" % (reg_subkey1, reg_subkey2, new_log_folder_path) - ) - finally: - session.cmd_output(params["cmd_reg_del"] % new_reg_item) - session.cmd_output( - cmd_reg_set_value % (exist_reg_item, reg_subkey1, iotimeoutvalue_cmd) + signal_file = self.params["script_interrupt_signal_file"] + dir_output = self.session.cmd_output(f'dir "{log_path}"') + + if signal_file not in dir_output: + self.test.fail( + f"Interruption signal file '{signal_file}' was not created. " + f"Directory contents: {dir_output}" ) - @error_context.context_aware - def windegtool_check_includeSensitiveData_collection(self, test, params, env): - """ - Test sensitive data collection functionality: - 1. Trigger BSOD using specified method - 2. Verify memory dump files are generated - 3. Run debug tool with sensitive data collection - 4. Verify all dump files are properly collected - - :param test: QEMU test object - :param params: Dictionary with test parameters - :param env: Dictionary with the test environment - """ - session = self._get_session(params, self.vm) - self._open_session_list.append(session) - - error_context.context("Trigger BSOD situation.", LOG_JOB.info) - crash_method = params["crash_method"] - if crash_method == "nmi": - timeout = int(params.get("timeout", 360)) - self.vm.monitor.nmi() - time.sleep(timeout) - self.vm.reboot(session, method=params["reboot_method"], timeout=timeout) - - session = self._get_session(params, self.vm) - self._open_session_list.append(session) - session.cmd("cd %s" % self.tmp_dir) - - error_context.context("Check the dmp files are existed.", LOG_JOB.info) - cmd_check_dmp_files = params["cmd_check_files"] % params["memory_dmp_file"] - cmd_check_minidmp_dir = params["cmd_check_files"] % params["mini_dmp_folder"] - output = session.cmd_output( - "%s && %s" % (cmd_check_dmp_files, cmd_check_minidmp_dir) + # 3. Test recovery + error_context.context( + "Testing script recovery after interruption.", LOG_JOB.info ) - if "dmp" not in output: - test.error("Dump file should be existed, please have a check") - else: - self.run_tool_scripts(session) + self.session.cmd(self.params["cmd_dir_del"] % log_path) + self._check_script_execution() # A simple run to confirm it works again @error_context.context_aware - def windegtool_check_trigger_driver_msinfo_collection(self, test, params, env): + def _check_trigger_driver_msinfo_collection(self): """ - Test system and driver information collection: - 1. Collect initial system name and driver info - 2. Change system name and modify driver state - 3. Verify changes are captured in subsequent collection - 4. Check setupapi logs for driver operations - 5. Verify accurate reflection of system changes - - :param test: QEMU test object - :param params: Dictionary with test parameters - :param env: Dictionary with the test environment + Tests collection of dynamically changing system and driver info. """ - session = self._get_session(params, self.vm) - self._open_session_list.append(session) - - error_context.context( - "Run script firstly to check system name and driver info.", LOG_JOB.info - ) - session.cmd("cd %s" % self.tmp_dir) - log_path = self.run_tool_scripts(session) - msinfo_file_path = params["msinfo_file_path"] % log_path - drv_list_file_path = params["drv_list_file_path"] % log_path - target_driver = params["target_driver"] - - old_systemname = str( - session.cmd_output( - params["cmd_query_from_file"] % (msinfo_file_path, "System Name:") - ).split(":")[1] + # 1. Baseline + paths = self._run_script_and_get_paths() + guest_sysname = self.session.cmd_output( + self.params["cmd_check_systemname"] ).strip() - systemname_guest = str( - session.cmd_output(params["cmd_check_systemname"]) - ).strip() - if old_systemname != systemname_guest: - test.error( - "The system name are different between cmd and file that" - " collect by tool, Please have a check" - ) - drvinfo_output = session.cmd_output( - params["cmd_query_from_file"] % (drv_list_file_path, target_driver) + msinfo_file = f"{paths['log_folder']}\\msinfo32.txt" + + # Extract system name from the first line that contains a colon + raw_query_output = self.session.cmd_output( + self.params["cmd_query_from_file"] % (msinfo_file, "System Name") ) - if target_driver not in drvinfo_output: - test.error( - "%s doesn't installed, there is no info about it, " - "Please have a check and change another target driver to test" + file_sysname_raw = "" + for line in raw_query_output.splitlines(): + if ":" in line: + file_sysname_raw = line.split(":")[-1].strip() + break + + # Clean both values to compare only alphanumeric parts and hyphens + clean_guest_sysname = re.sub(r"[^A-Z0-9-]", "", guest_sysname.upper()) + clean_file_sysname = re.sub(r"[^A-Z0-9-]", "", file_sysname_raw.upper()) + + if clean_guest_sysname != clean_file_sysname: + self.test.error( + f"Initial system name mismatch between guest and collected file. " + f"Guest: '{clean_guest_sysname}', File: '{clean_file_sysname}'" ) + # 2. Modify system error_context.context( - "Change the system name and uninstall certain driver.", LOG_JOB.info + "Changing system name and uninstalling driver.", LOG_JOB.info ) - new_system_name = params["new_system_name"] - session.cmd(params["cmd_change_systemname"] % new_system_name) - win_ver = str( - session.cmd_output(params["cmd_query_ver_vm"], timeout=60) + new_name = self.params["new_system_name"] + self.session.cmd(self.params["cmd_change_systemname"] % new_name) + target_driver = self.params["target_driver"] + oem_inf = self.session.cmd_output( + self.params["cmd_query_oem_inf"] % target_driver ).strip() - if "2016" not in win_ver: - driver_oem_file = session.cmd_output( - params["cmd_query_oem_inf"] % target_driver - ).strip() - cmd_uninstall_driver = params["cmd_uninstall_driver"] % driver_oem_file - s, o = session.cmd_status_output(cmd_uninstall_driver) + driver_uninstalled = False + if oem_inf: + # Disable the device first before uninstalling driver + if self.params.get("cmd_disable_device"): + LOG_JOB.info( + "Disabling device '%s' before driver uninstall.", target_driver + ) + status, output = self.session.cmd_status_output( + self.params["cmd_disable_device"] % (target_driver, target_driver) + ) + if status != 0: + LOG_JOB.warning( + "Failed to disable device '%s': %s", target_driver, output + ) + + # Uninstall the driver + status, output = self.session.cmd_status_output( + self.params["cmd_uninstall_driver"] % oem_inf + ) + if status != 0: + LOG_JOB.warning( + "Failed to uninstall driver '%s' (oem_inf: %s): %s", + target_driver, + oem_inf, + output, + ) + else: + driver_uninstalled = True + LOG_JOB.info("Successfully uninstalled driver '%s'.", target_driver) else: - cmd_get_2k16inf = params["cmd_search_2k16_inf_file_global"] % ( - target_driver + ".inf" + LOG_JOB.warning( + "Could not find OEM INF for '%s', skipping uninstall.", + target_driver, ) - w2k16_inf = str(session.cmd_output(cmd_get_2k16inf, timeout=60)).strip() - cmd_install_driver = params["cmd_install_driver"] % w2k16_inf - s, o = session.cmd_status_output(cmd_install_driver) - if s or "Fail" in o: - test.error("Fail to execute %s driver, please check it." % target_driver) - error_context.context( - "Re-Run script after rebooting to check whether system" - " name and driver info changed.", - LOG_JOB.info, - ) - session.cmd_output(params["reboot_command"]) - session = self._get_session(params, self.vm) - self._open_session_list.append(session) - - session.cmd("cd %s" % self.tmp_dir) - new_log_path = self.run_tool_scripts(session) - new_msinfo_file_path = params["msinfo_file_path"] % new_log_path - new_drv_list_file_path = params["drv_list_file_path"] % new_log_path - new_setupapi_dev_file_path = params["setupapi_dev_file_path"] % new_log_path - new_systemname_file = params["cmd_query_from_file"] % ( - new_msinfo_file_path, - "System Name:", + # 3. Reboot and verify + self.session.cmd_output(self.params["reboot_command"]) + self.session = self._get_session(new=True) + self.session.cmd(f'cd "{self.tmp_dir}"') + + new_paths = self._run_script_and_get_paths() + new_msinfo_file = f"{new_paths['log_folder']}\\msinfo32.txt" + new_query_output = self.session.cmd_output( + self.params["cmd_query_from_file"] % (new_msinfo_file, "System Name") ) - new_systemname = session.cmd_output(new_systemname_file).split(":")[1].strip() - if new_system_name.upper() not in str(new_systemname): - test.fail("New systemname wasn't captured by tool, Please have a check") - new_drvinfo_output = session.cmd_output( - params["cmd_query_from_file"] % (new_drv_list_file_path, target_driver) + + # Extract system name from the first line that contains a colon + new_file_sysname = "" + for line in new_query_output.splitlines(): + if ":" in line: + new_file_sysname = line.split(":")[-1].strip() + break + if new_name.upper() not in new_file_sysname.upper(): + self.test.fail("System name change was not captured by the script.") + + drv_list_file = f"{new_paths['log_folder']}\\drv_list.csv" + drv_list_output = self.session.cmd_output( + self.params["cmd_query_from_file"] % (drv_list_file, target_driver) ) - if target_driver in new_drvinfo_output: - test.fail( - "Driver should be uninstalled, but tool wasn't captured this " - "situation, Please have a check" - ) - if "2016" not in win_ver: - regex_cmd = cmd_uninstall_driver.replace(" ", r"\s+").replace(".", r"\.") - setupapi_output = session.cmd_output( - params["cmd_query_from_file"] % (new_setupapi_dev_file_path, regex_cmd) - ) - if "/delete-driver" not in setupapi_output: - test.fail( - "Driver execution operation was not captured, Please have a check." - ) - else: - setupapi_output = session.cmd_output( - params["cmd_query_from_file"] - % (new_setupapi_dev_file_path, "pnputil.exe") + if driver_uninstalled and target_driver in drv_list_output: + self.test.fail( + "Uninstalled driver is still present in the collected driver list." ) - if "/add-driver" not in setupapi_output: - test.fail( - "Driver execution operation was not captured, Please have a check." - ) @error_context.context_aware - def windegtool_check_networkadapter_collection(self, test, params, env): + def _check_networkadapter_collection(self): """ - Test network adapter information collection: - 1. Collect baseline network adapter settings - 2. Modify DNS and Jumbo Packet settings - 3. Verify changes are captured in collected data - 4. Test network setting restoration - 5. Verify accuracy of collected network information - - :param test: QEMU test object - :param params: Dictionary with test parameters - :param env: Dictionary with the test environment + Tests collection of network adapter settings. """ - session = self._get_session(params, self.vm) - self._open_session_list.append(session) - session.cmd("cd %s" % self.tmp_dir) - - error_context.context( - "Run tool script firstly, and conducting raw data comparison tests", - LOG_JOB.info, + adapter_name = self.session.cmd_output( + self.params["check_adapter_name"] + ).strip() + original_jp_value = int( + self.session.cmd_output(self.params["check_adapter_jp_info"] % adapter_name) ) - adapter_name = session.cmd_output(params["check_adapter_name"]).strip() - original_log_path = self.run_tool_scripts(session) - original_networkfile_path = params["networkfile_path"] % original_log_path - raw_jp_value_from_file = session.cmd_output( - params["cmd_findstr_in_file"] % (original_networkfile_path, "Jumbo Packet") - ) - original_jp_value_from_file = int(raw_jp_value_from_file.split()[5]) - original_jp_value_from_cmd = int( - session.cmd_output(params["check_adapter_jp_info"] % adapter_name) - ) - if original_jp_value_from_cmd != original_jp_value_from_file: - test.error( - "Network info collection seems have problem, the value of" - "Jumbo Packet is not same between file and cmd." + # 1. Modify Jumbo Packet + error_context.context("Modifying 'Jumbo Packet' setting.", LOG_JOB.info) + new_jp_value = 9014 + try: + self.session.cmd_output( + self.params["cmd_set_adapter_jp_info"] % (adapter_name, new_jp_value), + timeout=360, + ) + except ShellTimeoutError: + LOG_JOB.warning( + "Timeout occurred while setting Jumbo Packet. Verifying change..." ) + # Re-establish session if it was dropped + self.session = self._get_session(new=True) + self.session.cmd(f'cd "{self.tmp_dir}"') - ipconfigfile_path = params["ipconfigfile_path"] % original_log_path - original_ipconfig = session.cmd_output( - params["cmd_findstr_in_file"] % (ipconfigfile_path, adapter_name) + current_jp_value = int( + self.session.cmd_output(self.params["check_adapter_jp_info"] % adapter_name) ) - dns_info = session.cmd_output(params["cmd_get_dns"]).split() - for dns_info_item in dns_info: - if dns_info_item not in original_ipconfig: - test.error( - "DNS %s wasn't captured, please have a check" % dns_info_item - ) + if current_jp_value != new_jp_value: + self.test.error(f"Failed to set Jumbo Packet value to {new_jp_value}.") - error_context.context( - "Change the dhcp and check whether the file changed", LOG_JOB.info - ) - static_dns = params["static_dns"] - session.cmd_output( - params["cmd_set_dns"] % (adapter_name, static_dns), timeout=360 + # 2. Run script and verify + paths = self._run_script_and_get_paths() + network_file = f"{paths['log_folder']}\\NetworkInterfaces.txt" + jp_from_file_raw = self.session.cmd_output( + self.params["cmd_findstr_in_file"] % (network_file, "Jumbo Packet") ) - log_folder_path_new = self.run_tool_scripts(session) - ipconfigfile_path = params["ipconfigfile_path"] % log_folder_path_new - new_ipconfig_output = session.cmd_output( - params["cmd_findstr_in_file"] % (ipconfigfile_path, static_dns) - ) - if static_dns not in new_ipconfig_output: - test.fail("DNS should be changed but it's not, please check it.") - else: - error_context.context( - "Checkpoint is pass, Re-enable adapter for next checkpoint.", - LOG_JOB.info, + # Split by multiple spaces to get columns from the first line + # Format: AdapterName DisplayName DisplayValue RegistryName RegistryValue + first_line = jp_from_file_raw.splitlines()[0] if jp_from_file_raw else "" + columns = re.split(r"\s{2,}", first_line.strip()) + + # The DisplayValue should be the 3rd column (index 2) + jp_from_file = 0 + if len(columns) >= 3: + jp_from_file_str = re.sub(r"\D", "", columns[2]) + jp_from_file = int(jp_from_file_str) if jp_from_file_str else 0 + + if jp_from_file != new_jp_value: + self.test.fail( + f"Jumbo Packet change not captured. Expected {new_jp_value}, " + f"got {jp_from_file}." ) - session.cmd(params["cmd_set_dns_dhcp"] % adapter_name) + # 3. Cleanup error_context.context( - "Changed the 'Jumbo Packet' value and compare.", LOG_JOB.info + "Restoring original 'Jumbo Packet' setting.", LOG_JOB.info ) try: - session.cmd_output( - params["cmd_set_adapter_jp_info"] % (adapter_name, 9014), timeout=360 + self.session.cmd_output( + self.params["cmd_set_adapter_jp_info"] + % (adapter_name, original_jp_value), + timeout=360, ) - except ShellTimeoutError as e: - LOG_JOB.warning("Network adapter setting timed out: %s", str(e)) - # Verify the setting was actually changed - cmd = params["check_adapter_jp_info"] % adapter_name - actual_value = int(session.cmd_output(cmd)) - if actual_value != 9014: - test.error("Failed to change Jumbo Packet value") - - # Create new session only if needed - try: - session = self._get_session(params, self.vm) - self._open_session_list.append(session) - session.cmd("cd %s" % self.tmp_dir) - except Exception as e: - test.error( - "Failed to create new session after network change: %s" % str(e) - ) + except ShellTimeoutError: + LOG_JOB.warning( + "Timeout occurred while restoring Jumbo Packet. Verifying change..." + ) + self.session = self._get_session(new=True) - log_folder_path_new = self.run_tool_scripts(session) - networkfile_path = params["networkfile_path"] % log_folder_path_new - raw_jp_value = session.cmd_output( - params["cmd_findstr_in_file"] % (networkfile_path, "Jumbo Packet") + final_jp_value = int( + self.session.cmd_output(self.params["check_adapter_jp_info"] % adapter_name) ) - new_jp_value = int(raw_jp_value.split()[5]) - if original_jp_value_from_file == new_jp_value: - test.fail( - "Jumbo Packet should not be same with the original one,Please check it." - ) - if new_jp_value != 9014: - test.error( - "Jumbo Packet should be the new value, but it's not somehow," - "Please check it." - ) + if final_jp_value != original_jp_value: + self.test.error(f"Failed to restore Jumbo Packet to {original_jp_value}.") - error_context.context("Recover the env.", LOG_JOB.info) - try: - session.cmd( - params["cmd_set_adapter_jp_info"] - % (adapter_name, original_jp_value_from_file) + @error_context.context_aware + def _check_documentation(self): + """ + Verifies the tool's documentation is complete and its examples are accurate. + """ + error_context.context( + "Checking for standard documentation files.", LOG_JOB.info + ) + dir_output = self.session.cmd_output(f'dir "{self.script_dir}"') + standard_docs = [ + doc.strip().strip('"') for doc in self.params["standard_docs"].split(",") + ] + for doc in standard_docs: + if doc not in dir_output: + self.test.error(f"Standard documentation file '{doc}' is missing.") + + error_context.context("Testing command examples from README.md.", LOG_JOB.info) + readme_path = f"{self.script_dir}\\{self.params['target_doc']}" + + # Extract PowerShell command from README + cmd_output = self.session.cmd_output( + self.params["query_cmd_from_file"] % readme_path + ) + + # Find the command that runs the .ps1 script + command_line = "" + lines = cmd_output.splitlines() + for i, line in enumerate(lines): + if "```powershell" in line and i + 1 < len(lines): + next_line = lines[i + 1].strip() + if ".ps1" in next_line: + command_line = next_line + break + if not command_line: + self.test.fail("Could not find a valid .ps1 command example in README.md.") + + # Copy script to tmp_dir to run the example command + self.session.cmd(self.params["cmd_cp_file"] % (self.script_path, ".")) + + # Convert PowerShell syntax to cmd.exe compatible format + if command_line.startswith(".\\"): + ps_script_and_args = command_line[2:] + full_command = ( + f"powershell.exe -ExecutionPolicy Bypass -File {ps_script_and_args}" ) - except ShellTimeoutError as e: - LOG_JOB.warning("Network adapter recovery setting timed out: %s", str(e)) - # Verify if the setting was actually changed despite timeout - try: - change_back_jp_value = int( - session.cmd_output(params["check_adapter_jp_info"] % adapter_name) - ) - if change_back_jp_value != original_jp_value_from_file: - err_msg = ( - f"expected {original_jp_value_from_file}, " - f"got {change_back_jp_value}" - ) - test.error(err_msg) + else: + full_command = command_line - # Only create new session if verification passed - try: - session = self._get_session(params, self.vm) - self._open_session_list.append(session) - session.cmd("cd %s" % self.tmp_dir) - except Exception as e: - test.error(f"Failed to create new session after recovery: {str(e)}") - except Exception as e: - test.error(f"Failed to verify network adapter recovery: {str(e)}") + # Execute the command from documentation + status, output = self.session.cmd_status_output(full_command, timeout=360) - # Final verification - change_back_jp_value = int( - session.cmd_output(params["check_adapter_jp_info"] % adapter_name) - ) - if change_back_jp_value != original_jp_value_from_file: - test.error( - "Please have a check, the value wasn't changed back to original." + # Verify it ran successfully by checking for output paths + log_folder_match = re.search(r"Log folder path: (.+)", output) + if not log_folder_match: + self.test.fail( + f"Command from documentation failed to execute successfully. " + f"Status: {status}, Output: {output}" ) @error_context.context_aware - def windegtool_check_documentation(self, test, params, env): + def _check_disk_registry_collection(self): """ - Verify documentation completeness and accuracy: - 1. Check presence of all required documentation files - 2. Verify command examples in documentation - 3. Test executable commands from documentation - 4. Verify command output matches documentation - - :param test: QEMU test object - :param params: Dictionary with test parameters - :param env: Dictionary with the test environment + Tests the accuracy of disk and registry information collection. """ + # 1. Get baseline + paths = self._run_script_and_get_paths() + virtio_disk_file = self.params["virtio_disk_filepath"] % paths["log_folder"] + exist_reg = self.params["exist_reg_item"] + reg_subkey1, reg_subkey2 = ( + self.params["reg_subkey1"], + self.params["reg_subkey2"], + ) + + def get_reg_value_from_file(file, item, subkey): + # Use PowerShell Select-String to handle potential line wrapping and + # partial matches + # Context 0,1 gets the matching line and the following line + cmd = ( + f"powershell.exe -Command \"Select-String -Path '{file}' " + f"-Pattern '{subkey}' -Context 0,1 | " + f'ForEach-Object {{ $_.Line; $_.Context.PostContext }}"' + ) + output = self.session.cmd_output(cmd).strip() + + if not output: + self.test.error( + f"Registry key '{subkey}' not found in file '{file}'. " + f"Please check if the file contains registry information." + ) - def _clean_output(output): + # Parse the output lines = output.splitlines() - cleaned_lines = [] + for i, line in enumerate(lines): + if item in line and ":" in line: + # Ensure subkey is a distinct word to avoid partial matches + # e.g., 'TimeoutValue' matching 'IoTimeoutValue' + if not re.search(rf"\b{re.escape(subkey)}\b", line): + continue + + parts = line.rsplit(":", 1) + value_str = parts[1].strip() + + if value_str.isdigit(): + return int(value_str) + elif not value_str and i + 1 < len(lines): + # Check next line for wrapped value + next_line = lines[i + 1].strip() + if next_line.isdigit(): + return int(next_line) + + self.test.error( + f"Could not find registry value for '{item}\\{subkey}'. " + f"Search results: {output}" + ) + + def get_reg_value_from_guest(item, subkey): + cmd = self.params["cmd_reg_query"] % (item, subkey) + return int(self.session.cmd_output(cmd)) + + val1_file = get_reg_value_from_file(virtio_disk_file, exist_reg, reg_subkey1) + val1_guest = get_reg_value_from_guest(exist_reg, reg_subkey1) + if val1_file != val1_guest: + self.test.error(f"Initial registry value mismatch for {reg_subkey1}.") + + # 2. Modify registry + error_context.context("Modifying registry for verification.", LOG_JOB.info) + new_reg = self.params["new_reg_item"] + key_val1 = int(self.params["key_value1"]) + key_val2 = int(self.params["key_value2"]) - for line in lines: - line = line.strip() - if not line or "powershell" in line: - continue - cleaned_lines.append(line) - return "\n".join(cleaned_lines) + try: + # Create new keys and set values + self.session.cmd(self.params["cmd_reg_add_item"] % (new_reg, new_reg)) + self.session.cmd( + self.params["cmd_reg_add_item_key"] % (new_reg, new_reg, reg_subkey1) + ) + self.session.cmd( + self.params["cmd_reg_add_item_key"] % (new_reg, new_reg, reg_subkey2) + ) + self.session.cmd( + self.params["cmd_reg_set_value"] % (exist_reg, reg_subkey1, key_val1) + ) + self.session.cmd( + self.params["cmd_reg_set_value"] % (new_reg, reg_subkey1, key_val1) + ) + self.session.cmd( + self.params["cmd_reg_set_value"] % (new_reg, reg_subkey2, key_val2) + ) - session = self._get_session(params, self.vm) - self._open_session_list.append(session) + # 3. Re-run and verify changes + new_paths = self._run_script_and_get_paths() + new_file = self.params["virtio_disk_filepath"] % new_paths["log_folder"] - error_context.context( - "Check all relevant official documents to ensure they are complete", - LOG_JOB.info, - ) - output = str(session.cmd_output("dir %s\\" % self.script_dir)).strip() - standard_docs = (params["standard_docs"]).split(",") - for standard_doc in standard_docs: - if standard_doc not in output: - test.error( - f"There is no file {standard_doc}, please contact with vendor." - ) + if get_reg_value_from_file(new_file, exist_reg, reg_subkey1) != key_val1: + self.test.fail("Modification of existing registry key not captured.") + if get_reg_value_from_file(new_file, new_reg, reg_subkey1) != key_val1: + self.test.fail("Creation of new registry key (1) not captured.") + if get_reg_value_from_file(new_file, new_reg, reg_subkey2) != key_val2: + self.test.fail("Creation of new registry key (2) not captured.") + + finally: + # 4. Cleanup + error_context.context("Cleaning up registry changes.", LOG_JOB.info) + self.session.cmd(self.params["cmd_reg_del"] % new_reg) + self.session.cmd( + self.params["cmd_reg_set_value"] % (exist_reg, reg_subkey1, val1_guest) + ) + + @error_context.context_aware + def _check_includeSensitiveData_collection(self): + """ + Tests the collection of sensitive data (crash dumps). + """ + error_context.context("Triggering BSOD via NMI.", LOG_JOB.info) + self.vm.monitor.nmi() + # Wait for the VM to crash and reboot + time.sleep(int(self.params.get("timeout", 360))) + self.vm.reboot(self.session, method=self.params["reboot_method"]) + self.session = self._get_session(new=True) + self.session.cmd(f'cd "{self.tmp_dir}"') error_context.context( - "Address usable commands and execute them to ensure " - "they are executable and accurate", - LOG_JOB.info, - ) - target_doc_path = "%s\\%s" % (self.script_dir, params["target_doc"]) - target_output = str( - session.cmd_output(params["query_cmd_from_file"] % target_doc_path) - ).strip() - executable_cmds = _clean_output(target_output).splitlines() - executable_cmd_final = 'powershell.exe -Command "' - for executable_cmd in executable_cmds: - executable_cmd_final += executable_cmd + "; " - executable_cmd_final = executable_cmd_final.rstrip("; ") + '"' - session.cmd(params["cmd_cp_file"] % (self.script_path, self.tmp_dir)) - session.cmd("cd %s" % self.tmp_dir) - s, o = session.cmd_status_output(executable_cmd_final, timeout=360) - - include_sensitive_data = ( - True if "-IncludeSensitiveData" in executable_cmd_final else False + "Verifying dump file existence post-reboot.", LOG_JOB.info ) - paths = self._get_path(o, session, sensitive_data=include_sensitive_data) - if not all(paths): - test.fail("Debug tool run failed, please check it.") + dmp_file = self.params["memory_dmp_file"] + minidmp_folder = self.params["mini_dmp_folder"] + output = self.session.cmd_output(f'dir "{dmp_file}" && dir "{minidmp_folder}"') + if "dmp" not in output.lower(): + self.test.error("Memory dump files were not created after BSOD.") + + error_context.context("Running script to collect sensitive data.", LOG_JOB.info) + paths = self._run_script_and_get_paths(extra_args="-IncludeSensitiveData") + if not (paths["dump_folder"] and paths["dump_zip"]): + self.test.fail( + "Script failed to collect dump files with -IncludeSensitiveData." + ) @error_context.context_aware - def windegtool_check_IO_limits(self, test, params, env): + def _check_IO_limits(self): """ - Test IO metrics collection functionality: - 1. Collect baseline IO metrics - 2. Generate IO load in background - 3. Collect metrics during IO load - 4. Compare metrics to verify increased IO activity - 5. Verify accuracy of IO metrics collection - - :param test: QEMU test object - :param params: Dictionary with test parameters - :param env: Dictionary with the test environment + Tests the collection of IO metrics under load. """ + # 1. Get baseline metrics + error_context.context("Collecting baseline IO metrics.", LOG_JOB.info) + paths = self._run_script_and_get_paths() + io_file_path_cmd = self.params["cmd_get_io_folder"] % paths["log_folder"] + baseline_io_file = self.session.cmd_output(io_file_path_cmd).strip() + if not baseline_io_file: + self.test.error("Could not find baseline IO metrics file.") + + baseline_output = self.session.cmd_output( + self.params["cmd_cat_io"] % baseline_io_file + ) + baseline_metrics = { + m[0]: float(m[1].split()[0].replace(",", "")) if len(m) >= 2 else 0.0 + for line in baseline_output.splitlines() + if "Disk" in line and len(m := line.split(":")[-2:]) >= 2 + } + + # 2. Generate IO load and get new metrics + error_context.context( + "Generating IO load and collecting new metrics.", LOG_JOB.info + ) + session2 = self._get_session(new=True) + stop_event = threading.Event() - def _run_continuous_io(session, stop_event): - """Helper function to continuously create and delete files""" - i = 0 + def io_task(): + """Continuously create and delete a large file.""" while not stop_event.is_set(): try: - session.cmd(params["cmd_fsutil"], timeout=30) - session.cmd(params["cmd_del_file"], timeout=30) - i += 1 - if i >= 10: # Limit iterations to prevent infinite loop - break + session2.cmd(self.params["cmd_fsutil"], timeout=60) + session2.cmd(self.params["cmd_del_file"], timeout=60) except Exception as e: - LOG_JOB.error("Error in IO task: %s", str(e)) + LOG_JOB.error("Error in IO task: %s", e) break - session = self._get_session(params, self.vm) - session1 = self._get_session(params, self.vm) - self._open_session_list.extend([session, session1]) - session.cmd("cd %s" % self.tmp_dir) - - error_context.context( - "Run script first time to collect baseline IO metrics", LOG_JOB.info - ) - first_log_path = self.run_tool_scripts(session) - - # Get first IO limits file - cmd_get_io = params["cmd_get_io_folder"] % first_log_path - first_io_file = session.cmd_output(cmd_get_io).strip() - if not first_io_file: - test.error("Cannot find IO limits file in %s" % first_log_path) - - # Read first IO metrics - first_metrics = {} - first_output = session.cmd_output(params["cmd_cat_io"] % first_io_file) - for line in first_output.splitlines(): - if "Disk" in line and ":" in line: - key = line.split(":")[1].strip() - value = float(line.split(":")[2].strip().split()[0].replace(",", "")) - first_metrics[key] = value + io_thread = threading.Thread(target=io_task) + io_thread.start() + time.sleep(5) # Allow IO to start - error_context.context( - "Run script second time with continuous IO in background", LOG_JOB.info - ) + try: + new_paths = self._run_script_and_get_paths() + finally: + stop_event.set() + io_thread.join(timeout=60) - # Start continuous IO in background - stop_event = threading.Event() - io_thread = threading.Thread( - target=_run_continuous_io, args=(session1, stop_event) + # 3. Compare metrics + new_io_file_path_cmd = ( + self.params["cmd_get_io_folder"] % new_paths["log_folder"] ) - io_thread.start() + new_io_file = self.session.cmd_output(new_io_file_path_cmd).strip() + new_output = self.session.cmd_output(self.params["cmd_cat_io"] % new_io_file) + new_metrics = { + m[0]: float(m[1].split()[0].replace(",", "")) if len(m) >= 2 else 0.0 + for line in new_output.splitlines() + if "Disk" in line and len(m := line.split(":")[-2:]) >= 2 + } - # Wait briefly for IO to start - time.sleep(3) + if not any(new_metrics.get(k, 0) > v for k, v in baseline_metrics.items()): + self.test.fail("IO metrics did not increase under load.") + LOG_JOB.info("IO metrics increased as expected.") - # Run debug tool while IO is running + @error_context.context_aware + def execute(self): + """ + Main execution flow for the test. + """ try: - second_log_path = self.run_tool_scripts(session) + self.setup() + self._execute_test_variant() finally: - # Stop IO thread - stop_event.set() - io_thread.join(timeout=30) - - # Get second IO limits file - cmd_get_io = params["cmd_get_io_folder"] % second_log_path - second_io_file = session.cmd_output(cmd_get_io).strip() - if not second_io_file: - test.error("Cannot find IO limits file in %s" % second_log_path) - - # Read second IO metrics and compare - second_output = session.cmd_output(params["cmd_cat_io"] % second_io_file) - any_increased = False - for line in second_output.splitlines(): - if "Disk" in line and ":" in line: - key = line.split(":")[1].strip() - value = float(line.split(":")[2].strip().split()[0].replace(",", "")) - if key in first_metrics: - if value > first_metrics[key]: - any_increased = True - LOG_JOB.info( - "%s increased from %f to %f", key, first_metrics[key], value - ) - - if not any_increased: - test.fail("No IO metrics increased during continuous IO test") - - def run_once(self, test, params, env): - WinDebugToolTest.run_once(self, test, params, env) - - windegtool_check_type = self.params["windegtool_check_type"] - chk_type = "windegtool_check_%s" % windegtool_check_type - if hasattr(self, chk_type): - func = getattr(self, chk_type) - func(test, params, env) - else: - test.error("Could not find matching test, check your config file") + self._cleanup_sessions() def run(test, params, env): """ - Test CollectSystemInfo.ps1 tool, this case will: - 1) Start VM with virtio-win rpm package. - 2) Execute CollectSystemInfo.ps1 with&without param - "-IncludeSensitiveData". - 3) Run some basic test for CollectSystemInfo.ps1. - - :param test: kvm test object - :param params: Dictionary with the test parameters - :param env: Dictionary with test environmen. + Entry point for the test. """ - - collectinfotool_test = WinDebugToolTestBasicCheck(test, params, env) - collectinfotool_test.execute(test, params, env) + # The old BaseVirtTest class is no longer needed. + # We instantiate our new, self-contained test class and run it. + win_debug_test = WinDebugToolTest(test, params, env) + win_debug_test.execute() From b4e4f2c1899f3cb251dddf895a796fd6e6f23b07 Mon Sep 17 00:00:00 2001 From: Dehan Meng Date: Tue, 16 Dec 2025 21:49:29 +0800 Subject: [PATCH 2/2] win_guest_debugging_tool.py: Add new case for MTV Firstboot log New requirement from customer that collect MTV Firstboot log.txt by our script 'CollectSystemInfo.ps1'. Test steps are: 1. Check the 'C:\Program Files\Guestfs\Firstboot\log.txt' existed or not, if not, will create a corresponding exact file. 2. Run script 3. Check the file existed or not under generated directory. Signed-off-by: Dehan Meng --- qemu/tests/cfg/win_guest_debugging_tool.cfg | 9 +++ qemu/tests/win_guest_debugging_tool.py | 65 +++++++++++++++++++++ 2 files changed, 74 insertions(+) diff --git a/qemu/tests/cfg/win_guest_debugging_tool.cfg b/qemu/tests/cfg/win_guest_debugging_tool.cfg index 8100676e09..f6e02075e8 100644 --- a/qemu/tests/cfg/win_guest_debugging_tool.cfg +++ b/qemu/tests/cfg/win_guest_debugging_tool.cfg @@ -109,3 +109,12 @@ cmd_fsutil = fsutil file createnew testfile.tmp 1000000000 cmd_cat_io = type %s cmd_del_file = del testfile.tmp + - check_MTV_firstboot_log_collection: + windegtool_check_type = MTV_firstboot_log_collection + firstboot_dir = "C:\Program Files\Guestfs\Firstboot" + firstboot_file = "log.txt" + firstboot_content = "Firstboot log content: all drivers are installed successfully" + collected_file_name = "MTV_Firstboot_log.txt" + cmd_create_firstboot_dir = powershell.exe -Command "New-Item -Path '%s' -ItemType Directory -Force" + cmd_create_firstboot_file = powershell.exe -Command "Set-Content -Path '%s' -Value '%s'" + cmd_check_file_content = powershell.exe -Command "Get-Content -Path '%s'" diff --git a/qemu/tests/win_guest_debugging_tool.py b/qemu/tests/win_guest_debugging_tool.py index 12b06711da..fe1bd4ebc8 100644 --- a/qemu/tests/win_guest_debugging_tool.py +++ b/qemu/tests/win_guest_debugging_tool.py @@ -781,6 +781,71 @@ def io_task(): self.test.fail("IO metrics did not increase under load.") LOG_JOB.info("IO metrics increased as expected.") + @error_context.context_aware + def _check_MTV_firstboot_log_collection(self): + """ + Verifies that the Firstboot log is correctly collected and renamed. + """ + firstboot_dir = self.params.get("firstboot_dir") + firstboot_file = self.params.get("firstboot_file") + full_path = f"{firstboot_dir}\\{firstboot_file}" + content = self.params.get("firstboot_content") + + # 1. Check and prepare the environment + error_context.context( + "Checking and preparing Firstboot log file.", LOG_JOB.info + ) + # Check if file exists + check_cmd = f"powershell.exe -Command \"Test-Path '{full_path}'\"" + exists = self.session.cmd_output(check_cmd).strip().lower() == "true" + + if not exists: + LOG_JOB.info("Firstboot log not found. Creating it...") + self.session.cmd(self.params["cmd_create_firstboot_dir"] % firstboot_dir) + self.session.cmd( + self.params["cmd_create_firstboot_file"] % (full_path, content) + ) + else: + LOG_JOB.info("Firstboot log found. Updating content...") + self.session.cmd( + self.params["cmd_create_firstboot_file"] % (full_path, content) + ) + + # 2. Run script + paths = self._run_script_and_get_paths() + + # 3. Verify collection + error_context.context( + "Verifying Firstboot log collection and renaming.", LOG_JOB.info + ) + collected_name = self.params.get("collected_file_name") + collected_path = f"{paths['log_folder']}\\{collected_name}" + + # Check existence + check_collected_cmd = ( + f"powershell.exe -Command \"Test-Path '{collected_path}'\"" + ) + collected_exists = ( + self.session.cmd_output(check_collected_cmd).strip().lower() == "true" + ) + + if not collected_exists: + self.test.fail( + f"Collected file '{collected_name}' not found in log folder." + ) + + # Check content + collected_content = self.session.cmd_output( + self.params["cmd_check_file_content"] % collected_path + ).strip() + + if content not in collected_content: + self.test.fail( + "Collected file content does not match expected content.\n" + f"Expected: {content}\n" + f"Actual: {collected_content}" + ) + @error_context.context_aware def execute(self): """