Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 105 additions & 1 deletion sigma/cli/check.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
from collections import Counter
from sys import stderr
from textwrap import fill
import xml.etree.ElementTree as ET
import sys

from dataclasses import fields

Expand All @@ -19,6 +21,61 @@

severity_color = {"low": "green", "medium": "yellow", "high": "red"}

# JUnit-specific icons covering PySigma validation and parsing possibilities
SEVERITY_ICONS = {
"critical": "💥",
"high": "🔴",
"medium": "🟠",
"low": "🟡",
"informational": "ℹ️",
"parsing_error": "🚫",
"condition_error": "❌",
"error": "❌", # Fallback for other SigmaError subclasses
"ok": "✅"
}

def generate_junit_report(results, output_file):
"""Generates JUnit XML grouped by PySigma error/issue types."""
root = ET.Element("testsuites", name="Sigma Rule Validation")

suites = {}
for res in results:
suite_name = res.get('issue_type', 'Validation Success')
if suite_name not in suites:
suites[suite_name] = []
suites[suite_name].append(res)

for suite_name, tests in suites.items():
failures = len([t for t in tests if t['status'] == 'failed'])
test_suite = ET.SubElement(
root, "testsuite",
name=suite_name,
tests=str(len(tests)),
failures=str(failures)
)

for res in tests:
# Map the severity string to the icon mapping, fallback to error
severity = res.get('severity', 'ok').lower()
icon = SEVERITY_ICONS.get(severity, SEVERITY_ICONS['error'])

test_case = ET.SubElement(
test_suite, "testcase",
name=f"{icon} {res['rule_name']}",
classname=suite_name,
file=res.get('file_path', 'unknown')
)

if res['status'] == 'failed':
failure = ET.SubElement(
test_case, "failure",
message=f"{res.get('issue_type')}: {res.get('severity', '').upper()}"
)
failure.text = res.get('description', '')

tree = ET.ElementTree(root)
with open(output_file, "wb") as f:
tree.write(f, encoding="utf-8", xml_declaration=True)

@click.command()
@click.option(
Expand Down Expand Up @@ -48,6 +105,11 @@
show_default=True,
help="Fail on Sigma rule validation issues.",
)
@click.option(
"--junitxml",
type=click.Path(path_type=pathlib.Path),
help="Output results in JUnit XML format to the specified file."
)
@click.option(
"--exclude",
"-x",
Expand All @@ -63,7 +125,7 @@
type=click.Path(exists=True, allow_dash=True, path_type=pathlib.Path),
)
def check(
input, validation_config, file_pattern, fail_on_error, fail_on_issues, exclude
input, validation_config, file_pattern, fail_on_error, fail_on_issues, junitxml, exclude
):
"""Check Sigma rules for validity and best practices (not yet implemented)."""
if (
Expand Down Expand Up @@ -103,6 +165,8 @@ def check(
cond_errors = Counter()
check_rules = list()
first_error = True
junit_results = []

for rule in rule_collection.rules:
if (
len(rule.errors) > 0
Expand All @@ -114,6 +178,16 @@ def check(
for error in rule.errors:
click.echo(error)
rule_errors.update((error.__class__.__name__,))
if junitxml:
# Extract error type dynamically (SigmaParsingError vs SigmaValueError etc.)
error_type = error.__class__.__name__
rule_name = rule.title or str(rule.path)
file_path = str(rule.source) if rule.source else "unknown"
sev = "parsing_error" if "Parse" in error_type else "error"
junit_results.append({
"rule_name": rule_name, "file_path": file_path, "status": "failed",
"issue_type": error_type, "severity": sev, "description": str(error)
})
elif isinstance(rule, SigmaRule): # rule has no errors, parse condition
try:
for condition in rule.detection.parsed_condition:
Expand All @@ -125,6 +199,11 @@ def check(
f"Condition error in { str(condition.source) }:{ error }"
)
cond_errors.update((error,))
if junitxml:
junit_results.append({
"rule_name": rule_name, "file_path": file_path, "status": "failed",
"issue_type": e.__class__.__name__, "severity": "condition_error", "description": error
})
else:
check_rules.append(rule)

Expand All @@ -151,6 +230,17 @@ def check(
for rule in issue.rules
]
)
if junitxml:
for rule in issue.rules:
junit_results.append({
"rule_name": rule.title or str(rule.path),
"file_path": str(rule.source) if rule.source else "unknown",
"status": "failed",
"issue_type": type(issue).__name__,
"severity": issue.severity.name.lower(),
"description": str(issue.description)
})

additional_fields = " ".join(
[
f"{field.name}={click.style(issue.__getattribute__(field.name) or '-', bold=True, fg='blue')}"
Expand Down Expand Up @@ -244,6 +334,10 @@ def check(
else:
click.echo("No validation issues found.")

if junitxml:
generate_junit_report(junit_results, junitxml)
click.echo(f"\nJUnit report saved to: {junitxml}")

if (
fail_on_error
and (rule_error_count > 0 or cond_error_count > 0)
Expand All @@ -253,4 +347,14 @@ def check(
click.echo("Check failure")
click.get_current_context().exit(1)
except SigmaError as e:
# Handles total collection parsing crashes (e.g., SigmaCollectionError)
if junitxml:
generate_junit_report([{
"rule_name": "Global/Collection Loading Error",
"file_path": str(input),
"status": "failed",
"issue_type": e.__class__.__name__,
"severity": "error",
"description": str(e)
}], junitxml)
raise click.ClickException("Check error: " + str(e))