diff --git a/sigma/cli/check.py b/sigma/cli/check.py index 33cc5fb..c72c8b3 100644 --- a/sigma/cli/check.py +++ b/sigma/cli/check.py @@ -2,6 +2,8 @@ from collections import Counter from sys import stderr from textwrap import fill +import xml.etree.ElementTree as ET +import sys from dataclasses import fields @@ -19,6 +21,61 @@ severity_color = {"low": "green", "medium": "yellow", "high": "red"} +# JUnit-specific icons covering PySigma validation and parsing possibilities +SEVERITY_ICONS = { + "critical": "đŸ’Ĩ", + "high": "🔴", + "medium": "🟠", + "low": "🟡", + "informational": "â„šī¸", + "parsing_error": "đŸšĢ", + "condition_error": "❌", + "error": "❌", # Fallback for other SigmaError subclasses + "ok": "✅" +} + +def generate_junit_report(results, output_file): + """Generates JUnit XML grouped by PySigma error/issue types.""" + root = ET.Element("testsuites", name="Sigma Rule Validation") + + suites = {} + for res in results: + suite_name = res.get('issue_type', 'Validation Success') + if suite_name not in suites: + suites[suite_name] = [] + suites[suite_name].append(res) + + for suite_name, tests in suites.items(): + failures = len([t for t in tests if t['status'] == 'failed']) + test_suite = ET.SubElement( + root, "testsuite", + name=suite_name, + tests=str(len(tests)), + failures=str(failures) + ) + + for res in tests: + # Map the severity string to the icon mapping, fallback to error + severity = res.get('severity', 'ok').lower() + icon = SEVERITY_ICONS.get(severity, SEVERITY_ICONS['error']) + + test_case = ET.SubElement( + test_suite, "testcase", + name=f"{icon} {res['rule_name']}", + classname=suite_name, + file=res.get('file_path', 'unknown') + ) + + if res['status'] == 'failed': + failure = ET.SubElement( + test_case, "failure", + message=f"{res.get('issue_type')}: {res.get('severity', '').upper()}" + ) + failure.text = res.get('description', '') + + tree = ET.ElementTree(root) + with open(output_file, "wb") as f: + tree.write(f, encoding="utf-8", xml_declaration=True) @click.command() @click.option( @@ -48,6 +105,11 @@ show_default=True, help="Fail on Sigma rule validation issues.", ) +@click.option( + "--junitxml", + type=click.Path(path_type=pathlib.Path), + help="Output results in JUnit XML format to the specified file." +) @click.option( "--exclude", "-x", @@ -63,7 +125,7 @@ type=click.Path(exists=True, allow_dash=True, path_type=pathlib.Path), ) def check( - input, validation_config, file_pattern, fail_on_error, fail_on_issues, exclude + input, validation_config, file_pattern, fail_on_error, fail_on_issues, junitxml, exclude ): """Check Sigma rules for validity and best practices (not yet implemented).""" if ( @@ -103,6 +165,8 @@ def check( cond_errors = Counter() check_rules = list() first_error = True + junit_results = [] + for rule in rule_collection.rules: if ( len(rule.errors) > 0 @@ -114,6 +178,16 @@ def check( for error in rule.errors: click.echo(error) rule_errors.update((error.__class__.__name__,)) + if junitxml: + # Extract error type dynamically (SigmaParsingError vs SigmaValueError etc.) + error_type = error.__class__.__name__ + rule_name = rule.title or str(rule.path) + file_path = str(rule.source) if rule.source else "unknown" + sev = "parsing_error" if "Parse" in error_type else "error" + junit_results.append({ + "rule_name": rule_name, "file_path": file_path, "status": "failed", + "issue_type": error_type, "severity": sev, "description": str(error) + }) elif isinstance(rule, SigmaRule): # rule has no errors, parse condition try: for condition in rule.detection.parsed_condition: @@ -125,6 +199,11 @@ def check( f"Condition error in { str(condition.source) }:{ error }" ) cond_errors.update((error,)) + if junitxml: + junit_results.append({ + "rule_name": rule_name, "file_path": file_path, "status": "failed", + "issue_type": e.__class__.__name__, "severity": "condition_error", "description": error + }) else: check_rules.append(rule) @@ -151,6 +230,17 @@ def check( for rule in issue.rules ] ) + if junitxml: + for rule in issue.rules: + junit_results.append({ + "rule_name": rule.title or str(rule.path), + "file_path": str(rule.source) if rule.source else "unknown", + "status": "failed", + "issue_type": type(issue).__name__, + "severity": issue.severity.name.lower(), + "description": str(issue.description) + }) + additional_fields = " ".join( [ f"{field.name}={click.style(issue.__getattribute__(field.name) or '-', bold=True, fg='blue')}" @@ -244,6 +334,10 @@ def check( else: click.echo("No validation issues found.") + if junitxml: + generate_junit_report(junit_results, junitxml) + click.echo(f"\nJUnit report saved to: {junitxml}") + if ( fail_on_error and (rule_error_count > 0 or cond_error_count > 0) @@ -253,4 +347,14 @@ def check( click.echo("Check failure") click.get_current_context().exit(1) except SigmaError as e: + # Handles total collection parsing crashes (e.g., SigmaCollectionError) + if junitxml: + generate_junit_report([{ + "rule_name": "Global/Collection Loading Error", + "file_path": str(input), + "status": "failed", + "issue_type": e.__class__.__name__, + "severity": "error", + "description": str(e) + }], junitxml) raise click.ClickException("Check error: " + str(e))