From 268aae37adf893b21b579b347c208182b519b3fa Mon Sep 17 00:00:00 2001 From: tduhamel42 Date: Sun, 2 Nov 2025 14:28:32 +0100 Subject: [PATCH 01/16] feat: Add native findings format and fix critical ID bug Priority 1 implementation: - Created native FuzzForge findings format schema with full support for: - 5-level severity (critical/high/medium/low/info) - Confidence levels - CWE and OWASP categorization - found_by attribution (module, tool, type) - LLM context tracking (model, prompt, temperature) - Updated ModuleFinding model with new fields: - Added rule_id for pattern identification - Added found_by for detection attribution - Added llm_context for LLM-detected findings - Added confidence, cwe, owasp, references - Added column_start/end for precise location - Updated create_finding() helper with new required fields - Enhanced _generate_summary() with confidence and source tracking - Fixed critical ID bug in CLI: - Changed 'ff finding show' to use --id (unique) instead of --rule - Added new show_findings_by_rule() function to show ALL findings matching a rule - Updated display_finding_detail() to support both native and SARIF formats - Now properly handles multiple findings with same rule_id Breaking changes: - create_finding() now requires rule_id and found_by parameters - show_finding() now uses --id instead of --rule flag --- backend/src/models/finding_schema.py | 166 +++++++++++++ backend/toolbox/modules/base.py | 93 ++++++- cli/src/fuzzforge_cli/commands/findings.py | 267 ++++++++++++++++----- 3 files changed, 461 insertions(+), 65 deletions(-) create mode 100644 backend/src/models/finding_schema.py diff --git a/backend/src/models/finding_schema.py b/backend/src/models/finding_schema.py new file mode 100644 index 00000000..a58b8299 --- /dev/null +++ b/backend/src/models/finding_schema.py @@ -0,0 +1,166 @@ +""" +FuzzForge Native Finding Format Schema + +This module defines the native finding format used internally by FuzzForge. +This format is more expressive than SARIF and optimized for security testing workflows. +""" + +# Copyright (c) 2025 FuzzingLabs +# +# Licensed under the Business Source License 1.1 (BSL). See the LICENSE file +# at the root of this repository for details. +# +# After the Change Date (four years from publication), this version of the +# Licensed Work will be made available under the Apache License, Version 2.0. +# See the LICENSE-APACHE file or http://www.apache.org/licenses/LICENSE-2.0 +# +# Additional attribution and requirements are provided in the NOTICE file. + +from typing import Dict, Any, List, Optional, Literal +from pydantic import BaseModel, Field +from datetime import datetime + + +class FoundBy(BaseModel): + """Information about who/what found the vulnerability""" + module: str = Field(..., description="FuzzForge module that detected the finding (e.g., 'semgrep_scanner', 'llm_analysis')") + tool_name: str = Field(..., description="Name of the underlying tool (e.g., 'Semgrep', 'Claude-3.5-Sonnet', 'MobSF')") + tool_version: str = Field(..., description="Version of the tool") + type: Literal["llm", "tool", "fuzzer", "manual"] = Field(..., description="Type of detection method") + + +class LLMContext(BaseModel): + """Context information for LLM-detected findings""" + model: str = Field(..., description="LLM model used (e.g., 'claude-3-5-sonnet-20250129')") + prompt: str = Field(..., description="Prompt or analysis instructions used") + temperature: Optional[float] = Field(None, description="Temperature parameter used for generation") + + +class Location(BaseModel): + """Location information for a finding""" + file: str = Field(..., description="File path relative to workspace root") + line_start: Optional[int] = Field(None, description="Starting line number (1-indexed)") + line_end: Optional[int] = Field(None, description="Ending line number (1-indexed)") + column_start: Optional[int] = Field(None, description="Starting column number (1-indexed)") + column_end: Optional[int] = Field(None, description="Ending column number (1-indexed)") + snippet: Optional[str] = Field(None, description="Code snippet at the location") + + +class Finding(BaseModel): + """Individual security finding""" + id: str = Field(..., description="Unique finding identifier (UUID)") + rule_id: str = Field(..., description="Rule/pattern identifier (e.g., 'sql_injection', 'hardcoded_secret')") + found_by: FoundBy = Field(..., description="Detection attribution") + llm_context: Optional[LLMContext] = Field(None, description="LLM-specific context (only if found_by.type == 'llm')") + + title: str = Field(..., description="Short finding title") + description: str = Field(..., description="Detailed description of the finding") + + severity: Literal["critical", "high", "medium", "low", "info"] = Field(..., description="Severity level") + confidence: Literal["high", "medium", "low"] = Field(..., description="Confidence level in the finding") + + category: str = Field(..., description="Finding category (e.g., 'injection', 'authentication', 'cryptography')") + cwe: Optional[str] = Field(None, description="CWE identifier (e.g., 'CWE-89')") + owasp: Optional[str] = Field(None, description="OWASP category (e.g., 'A03:2021-Injection')") + + location: Optional[Location] = Field(None, description="Location of the finding in source code") + + recommendation: Optional[str] = Field(None, description="Remediation recommendation") + references: List[str] = Field(default_factory=list, description="External references (URLs, documentation)") + + metadata: Dict[str, Any] = Field(default_factory=dict, description="Additional metadata") + + +class FindingsSummary(BaseModel): + """Summary statistics for findings""" + total_findings: int = Field(..., description="Total number of findings") + by_severity: Dict[str, int] = Field(default_factory=dict, description="Count by severity level") + by_confidence: Dict[str, int] = Field(default_factory=dict, description="Count by confidence level") + by_category: Dict[str, int] = Field(default_factory=dict, description="Count by category") + by_source: Dict[str, int] = Field(default_factory=dict, description="Count by detection source (module name)") + by_type: Dict[str, int] = Field(default_factory=dict, description="Count by detection type (llm/tool/fuzzer)") + affected_files: int = Field(0, description="Number of unique files with findings") + + +class FuzzForgeFindingsReport(BaseModel): + """Native FuzzForge findings report format""" + version: str = Field(default="1.0.0", description="Format version") + run_id: str = Field(..., description="Workflow run identifier") + workflow: str = Field(..., description="Workflow name") + timestamp: datetime = Field(default_factory=datetime.utcnow, description="Report generation timestamp") + + findings: List[Finding] = Field(default_factory=list, description="List of security findings") + summary: FindingsSummary = Field(..., description="Summary statistics") + + metadata: Dict[str, Any] = Field(default_factory=dict, description="Additional report metadata") + + +# JSON Schema export for documentation +FINDING_SCHEMA_VERSION = "1.0.0" + +def get_json_schema() -> Dict[str, Any]: + """Get JSON schema for the FuzzForge findings format""" + return FuzzForgeFindingsReport.model_json_schema() + + +def validate_findings_report(data: Dict[str, Any]) -> FuzzForgeFindingsReport: + """ + Validate a findings report against the schema + + Args: + data: Dictionary containing findings report data + + Returns: + Validated FuzzForgeFindingsReport object + + Raises: + ValidationError: If data doesn't match schema + """ + return FuzzForgeFindingsReport(**data) + + +def create_summary(findings: List[Finding]) -> FindingsSummary: + """ + Generate summary statistics from a list of findings + + Args: + findings: List of Finding objects + + Returns: + FindingsSummary with aggregated statistics + """ + summary = FindingsSummary( + total_findings=len(findings), + by_severity={}, + by_confidence={}, + by_category={}, + by_source={}, + by_type={}, + affected_files=0 + ) + + affected_files = set() + + for finding in findings: + # Count by severity + summary.by_severity[finding.severity] = summary.by_severity.get(finding.severity, 0) + 1 + + # Count by confidence + summary.by_confidence[finding.confidence] = summary.by_confidence.get(finding.confidence, 0) + 1 + + # Count by category + summary.by_category[finding.category] = summary.by_category.get(finding.category, 0) + 1 + + # Count by source (module) + summary.by_source[finding.found_by.module] = summary.by_source.get(finding.found_by.module, 0) + 1 + + # Count by type + summary.by_type[finding.found_by.type] = summary.by_type.get(finding.found_by.type, 0) + 1 + + # Track affected files + if finding.location and finding.location.file: + affected_files.add(finding.location.file) + + summary.affected_files = len(affected_files) + + return summary diff --git a/backend/toolbox/modules/base.py b/backend/toolbox/modules/base.py index dcef98d1..2990edeb 100644 --- a/backend/toolbox/modules/base.py +++ b/backend/toolbox/modules/base.py @@ -35,18 +35,48 @@ class ModuleMetadata(BaseModel): requires_workspace: bool = Field(True, description="Whether module requires workspace access") +class FoundBy(BaseModel): + """Information about who/what found the vulnerability""" + module: str = Field(..., description="FuzzForge module that detected the finding") + tool_name: str = Field(..., description="Name of the underlying tool") + tool_version: str = Field(..., description="Version of the tool") + type: str = Field(..., description="Type of detection method (llm, tool, fuzzer, manual)") + + +class LLMContext(BaseModel): + """Context information for LLM-detected findings""" + model: str = Field(..., description="LLM model used") + prompt: str = Field(..., description="Prompt or analysis instructions used") + temperature: Optional[float] = Field(None, description="Temperature parameter used for generation") + + class ModuleFinding(BaseModel): """Individual finding from a module""" - id: str = Field(..., description="Unique finding ID") + id: str = Field(..., description="Unique finding ID (UUID)") + rule_id: str = Field(..., description="Rule/pattern identifier") + found_by: FoundBy = Field(..., description="Detection attribution") + llm_context: Optional[LLMContext] = Field(None, description="LLM-specific context") + title: str = Field(..., description="Finding title") description: str = Field(..., description="Detailed description") - severity: str = Field(..., description="Severity level (info, low, medium, high, critical)") + + severity: str = Field(..., description="Severity level (critical, high, medium, low, info)") + confidence: str = Field(default="medium", description="Confidence level (high, medium, low)") + category: str = Field(..., description="Finding category") + cwe: Optional[str] = Field(None, description="CWE identifier (e.g., 'CWE-89')") + owasp: Optional[str] = Field(None, description="OWASP category") + file_path: Optional[str] = Field(None, description="Affected file path relative to workspace") line_start: Optional[int] = Field(None, description="Starting line number") line_end: Optional[int] = Field(None, description="Ending line number") + column_start: Optional[int] = Field(None, description="Starting column number") + column_end: Optional[int] = Field(None, description="Ending column number") code_snippet: Optional[str] = Field(None, description="Relevant code snippet") + recommendation: Optional[str] = Field(None, description="Remediation recommendation") + references: List[str] = Field(default_factory=list, description="External references") + metadata: Dict[str, Any] = Field(default_factory=dict, description="Additional metadata") @@ -140,20 +170,32 @@ def validate_workspace(self, workspace: Path) -> bool: def create_finding( self, + rule_id: str, title: str, description: str, severity: str, category: str, + found_by: FoundBy, + confidence: str = "medium", + llm_context: Optional[LLMContext] = None, + cwe: Optional[str] = None, + owasp: Optional[str] = None, **kwargs ) -> ModuleFinding: """ Helper method to create a standardized finding. Args: + rule_id: Rule/pattern identifier title: Finding title description: Detailed description - severity: Severity level + severity: Severity level (critical, high, medium, low, info) category: Finding category + found_by: Detection attribution (FoundBy object) + confidence: Confidence level (high, medium, low) + llm_context: Optional LLM context information + cwe: Optional CWE identifier + owasp: Optional OWASP category **kwargs: Additional finding fields Returns: @@ -164,10 +206,16 @@ def create_finding( return ModuleFinding( id=finding_id, + rule_id=rule_id, + found_by=found_by, + llm_context=llm_context, title=title, description=description, severity=severity, + confidence=confidence, category=category, + cwe=cwe, + owasp=owasp, **kwargs ) @@ -226,29 +274,62 @@ def _generate_summary(self, findings: List[ModuleFinding]) -> Dict[str, Any]: Summary dictionary """ severity_counts = { - "info": 0, - "low": 0, + "critical": 0, + "high": 0, "medium": 0, + "low": 0, + "info": 0 + } + + confidence_counts = { "high": 0, - "critical": 0 + "medium": 0, + "low": 0 } category_counts = {} + source_counts = {} + type_counts = {} + affected_files = set() for finding in findings: # Count by severity if finding.severity in severity_counts: severity_counts[finding.severity] += 1 + # Count by confidence + if finding.confidence in confidence_counts: + confidence_counts[finding.confidence] += 1 + # Count by category if finding.category not in category_counts: category_counts[finding.category] = 0 category_counts[finding.category] += 1 + # Count by source (module) + module = finding.found_by.module + if module not in source_counts: + source_counts[module] = 0 + source_counts[module] += 1 + + # Count by type + detection_type = finding.found_by.type + if detection_type not in type_counts: + type_counts[detection_type] = 0 + type_counts[detection_type] += 1 + + # Track affected files + if finding.file_path: + affected_files.add(finding.file_path) + return { "total_findings": len(findings), "severity_counts": severity_counts, + "confidence_counts": confidence_counts, "category_counts": category_counts, + "source_counts": source_counts, + "type_counts": type_counts, + "affected_files": len(affected_files), "highest_severity": self._get_highest_severity(findings) } diff --git a/cli/src/fuzzforge_cli/commands/findings.py b/cli/src/fuzzforge_cli/commands/findings.py index 70585273..b5564153 100644 --- a/cli/src/fuzzforge_cli/commands/findings.py +++ b/cli/src/fuzzforge_cli/commands/findings.py @@ -152,12 +152,13 @@ def get_findings( def show_finding( run_id: str = typer.Argument(..., help="Run ID to get finding from"), - rule_id: str = typer.Option(..., "--rule", "-r", help="Rule ID of the specific finding to show") + finding_id: str = typer.Option(..., "--id", "-i", help="Unique ID of the specific finding to show") ): """ šŸ” Show detailed information about a specific finding This function is registered as a command in main.py under the finding (singular) command group. + Use the unique finding ID (shown in the findings table) to view details. """ try: require_project() @@ -173,91 +174,239 @@ def show_finding( with get_client() as client: console.print(f"šŸ” Fetching findings for run: {run_id}") findings = client.get_run_findings(run_id) - sarif_data = findings.sarif + findings_dict = findings.sarif # Will become native format else: - sarif_data = findings_data.sarif_data - - # Find the specific finding by rule_id - runs = sarif_data.get("runs", []) - if not runs: - console.print("āŒ No findings data available", style="red") - raise typer.Exit(1) + findings_dict = findings_data.sarif_data # Will become findings_data - run_data = runs[0] - results = run_data.get("results", []) - tool = run_data.get("tool", {}).get("driver", {}) - - # Search for matching finding + # Find the specific finding by unique ID + # For now, support both SARIF (old) and native format (new) matching_finding = None - for result in results: - if result.get("ruleId") == rule_id: - matching_finding = result - break + + # Try native format first + if "findings" in findings_dict: + for finding in findings_dict.get("findings", []): + if finding.get("id") == finding_id or finding.get("id", "").startswith(finding_id): + matching_finding = finding + break + # Fallback to SARIF format (for backward compatibility during transition) + elif "runs" in findings_dict: + runs = findings_dict.get("runs", []) + if runs: + run_data = runs[0] + results = run_data.get("results", []) + for result in results: + # Check if finding ID is in properties + props = result.get("properties", {}) + fid = props.get("findingId", "") + if fid == finding_id or fid.startswith(finding_id): + matching_finding = result + break if not matching_finding: - console.print(f"āŒ No finding found with rule ID: {rule_id}", style="red") + console.print(f"āŒ No finding found with ID: {finding_id}", style="red") console.print(f"šŸ’” Use [bold cyan]ff findings get {run_id}[/bold cyan] to see all findings", style="dim") raise typer.Exit(1) # Display detailed finding - display_finding_detail(matching_finding, tool, run_id) + display_finding_detail(matching_finding, run_id) except Exception as e: console.print(f"āŒ Failed to get finding: {e}", style="red") raise typer.Exit(1) -def display_finding_detail(finding: Dict[str, Any], tool: Dict[str, Any], run_id: str): - """Display detailed information about a single finding""" - rule_id = finding.get("ruleId", "unknown") - level = finding.get("level", "note") - message = finding.get("message", {}) - message_text = message.get("text", "No summary available") - message_markdown = message.get("markdown", message_text) - - # Get location - locations = finding.get("locations", []) - location_str = "Unknown location" - code_snippet = None - - if locations: - physical_location = locations[0].get("physicalLocation", {}) - artifact_location = physical_location.get("artifactLocation", {}) - region = physical_location.get("region", {}) - - file_path = artifact_location.get("uri", "") - if file_path: - location_str = file_path - if region.get("startLine"): - location_str += f":{region['startLine']}" - if region.get("startColumn"): - location_str += f":{region['startColumn']}" - - # Get code snippet if available - if region.get("snippet", {}).get("text"): - code_snippet = region["snippet"]["text"].strip() +def show_findings_by_rule( + run_id: str = typer.Argument(..., help="Run ID to get findings from"), + rule_id: str = typer.Option(..., "--rule", "-r", help="Rule ID to filter findings") +): + """ + šŸ” Show all findings matching a specific rule + + This command shows ALL findings that match the given rule ID. + Useful when you have multiple instances of the same vulnerability type. + """ + try: + require_project() + validate_run_id(run_id) + + # Try to get from database first, fallback to API + db = get_project_db() + findings_data = None + if db: + findings_data = db.get_findings(run_id) + + if not findings_data: + with get_client() as client: + console.print(f"šŸ” Fetching findings for run: {run_id}") + findings = client.get_run_findings(run_id) + findings_dict = findings.sarif + else: + findings_dict = findings_data.sarif_data + + # Find all findings matching the rule + matching_findings = [] + + # Try native format first + if "findings" in findings_dict: + for finding in findings_dict.get("findings", []): + if finding.get("rule_id") == rule_id: + matching_findings.append(finding) + # Fallback to SARIF format + elif "runs" in findings_dict: + runs = findings_dict.get("runs", []) + if runs: + run_data = runs[0] + results = run_data.get("results", []) + for result in results: + if result.get("ruleId") == rule_id: + matching_findings.append(result) + + if not matching_findings: + console.print(f"āŒ No findings found with rule ID: {rule_id}", style="red") + console.print(f"šŸ’” Use [bold cyan]ff findings get {run_id}[/bold cyan] to see all findings", style="dim") + raise typer.Exit(1) + + console.print(f"\nšŸ” Found {len(matching_findings)} finding(s) matching rule: [bold cyan]{rule_id}[/bold cyan]\n") + + # Display each finding + for i, finding in enumerate(matching_findings, 1): + console.print(f"[bold]Finding {i} of {len(matching_findings)}[/bold]") + display_finding_detail(finding, run_id) + if i < len(matching_findings): + console.print("\n" + "─" * 80 + "\n") + + except Exception as e: + console.print(f"āŒ Failed to get findings: {e}", style="red") + raise typer.Exit(1) + + +def display_finding_detail(finding: Dict[str, Any], run_id: str): + """Display detailed information about a single finding (supports both native and SARIF format)""" + + # Detect format and extract fields + is_native = "rule_id" in finding # Native format has rule_id, SARIF has ruleId + + if is_native: + # Native FuzzForge format + finding_id = finding.get("id", "unknown") + rule_id = finding.get("rule_id", "unknown") + title = finding.get("title", "No title") + description = finding.get("description", "No description") + severity = finding.get("severity", "info") + confidence = finding.get("confidence", "medium") + category = finding.get("category", "unknown") + cwe = finding.get("cwe") + owasp = finding.get("owasp") + recommendation = finding.get("recommendation") + + # Found by information + found_by = finding.get("found_by", {}) + module = found_by.get("module", "unknown") + tool_name = found_by.get("tool_name", "Unknown") + tool_version = found_by.get("tool_version", "unknown") + detection_type = found_by.get("type", "unknown") + + # LLM context if available + llm_context = finding.get("llm_context") + + # Location + location = finding.get("location", {}) + file_path = location.get("file", "") + line_start = location.get("line_start") + column_start = location.get("column_start") + code_snippet = location.get("snippet") + + location_str = file_path if file_path else "Unknown location" + if line_start: + location_str += f":{line_start}" + if column_start: + location_str += f":{column_start}" + + else: + # SARIF format (backward compatibility) + props = finding.get("properties", {}) + finding_id = props.get("findingId", "unknown") + rule_id = finding.get("ruleId", "unknown") + title = props.get("title", "No title") + severity = finding.get("level", "note") + confidence = "medium" # Not available in SARIF + category = "unknown" + cwe = None + owasp = None + + message = finding.get("message", {}) + description = message.get("text", "No description") + recommendation = None + + module = "unknown" + tool_name = "Unknown" + tool_version = "unknown" + detection_type = "tool" + llm_context = None + + # Location from SARIF + locations = finding.get("locations", []) + location_str = "Unknown location" + code_snippet = None + + if locations: + physical_location = locations[0].get("physicalLocation", {}) + artifact_location = physical_location.get("artifactLocation", {}) + region = physical_location.get("region", {}) + + file_path = artifact_location.get("uri", "") + if file_path: + location_str = file_path + if region.get("startLine"): + location_str += f":{region['startLine']}" + if region.get("startColumn"): + location_str += f":{region['startColumn']}" + + if region.get("snippet", {}).get("text"): + code_snippet = region["snippet"]["text"].strip() # Get severity style severity_color = { + "critical": "red", + "high": "red", + "medium": "yellow", + "low": "blue", + "info": "cyan", + # SARIF levels "error": "red", "warning": "yellow", - "note": "blue", - "info": "cyan" - }.get(level.lower(), "white") + "note": "blue" + }.get(severity.lower(), "white") # Build detailed content content_lines = [] + content_lines.append(f"[bold]Finding ID:[/bold] {finding_id}") content_lines.append(f"[bold]Rule ID:[/bold] {rule_id}") - content_lines.append(f"[bold]Severity:[/bold] [{severity_color}]{level.upper()}[/{severity_color}]") + content_lines.append(f"[bold]Title:[/bold] {title}") + content_lines.append(f"[bold]Severity:[/bold] [{severity_color}]{severity.upper()}[/{severity_color}] (Confidence: {confidence})") + + if cwe: + content_lines.append(f"[bold]CWE:[/bold] {cwe}") + if owasp: + content_lines.append(f"[bold]OWASP:[/bold] {owasp}") + + content_lines.append(f"[bold]Category:[/bold] {category}") content_lines.append(f"[bold]Location:[/bold] {location_str}") - content_lines.append(f"[bold]Tool:[/bold] {tool.get('name', 'Unknown')} v{tool.get('version', 'unknown')}") + content_lines.append(f"[bold]Found by:[/bold] {tool_name} v{tool_version} ({module}) [{detection_type}]") + + if llm_context: + model = llm_context.get("model", "unknown") + content_lines.append(f"[bold]LLM Model:[/bold] {model}") + content_lines.append(f"[bold]Run ID:[/bold] {run_id}") content_lines.append("") - content_lines.append("[bold]Summary:[/bold]") - content_lines.append(message_text) - content_lines.append("") content_lines.append("[bold]Description:[/bold]") - content_lines.append(message_markdown) + content_lines.append(description) + + if recommendation: + content_lines.append("") + content_lines.append("[bold]Recommendation:[/bold]") + content_lines.append(recommendation) if code_snippet: content_lines.append("") @@ -276,7 +425,7 @@ def display_finding_detail(finding: Dict[str, Any], tool: Dict[str, Any], run_id padding=(1, 2) )) console.print() - console.print(f"šŸ’” Export this run: [bold cyan]ff findings export {run_id} --format sarif[/bold cyan]") + console.print(f"šŸ’” Export this run: [bold cyan]ff findings export {run_id} --format native[/bold cyan]") def display_findings_table(sarif_data: Dict[str, Any]): From 4aa8187716cbdfdc705e7b5225089a5f23ba58c4 Mon Sep 17 00:00:00 2001 From: tduhamel42 Date: Sun, 2 Nov 2025 14:36:04 +0100 Subject: [PATCH 02/16] refactor: Update database to use native findings format - Renamed FindingRecord.sarif_data to findings_data - Updated database schema: sarif_data column -> findings_data column - Updated all database methods to work with native format: - save_findings() - get_findings() - list_findings() - get_all_findings() - get_aggregated_stats() - Updated SQL queries to use native format JSON paths: - Changed from SARIF paths ($.runs[0].results) to native paths ($.findings) - Updated severity filtering from SARIF levels (error/warning/note) to native (critical/high/medium/low/info) - Updated CLI commands to support both formats during transition: - get_findings command now extracts summary from both native and SARIF formats - show_finding and show_findings_by_rule updated to use findings_data field - Format detection to handle data from API (still SARIF) and database (native) Breaking changes: - Database schema changed - existing databases will need recreation - FindingRecord.sarif_data renamed to findings_data --- cli/src/fuzzforge_cli/commands/findings.py | 72 ++++++++++++++-------- cli/src/fuzzforge_cli/database.py | 54 ++++++++-------- 2 files changed, 73 insertions(+), 53 deletions(-) diff --git a/cli/src/fuzzforge_cli/commands/findings.py b/cli/src/fuzzforge_cli/commands/findings.py index b5564153..d0afcfe8 100644 --- a/cli/src/fuzzforge_cli/commands/findings.py +++ b/cli/src/fuzzforge_cli/commands/findings.py @@ -86,40 +86,62 @@ def get_findings( try: db = ensure_project_db() - # Extract summary from SARIF - sarif_data = findings.sarif - runs_data = sarif_data.get("runs", []) + # Get findings data (API returns .sarif for now, will be native format later) + findings_data = findings.sarif summary = {} - if runs_data: - results = runs_data[0].get("results", []) + # Support both native format and SARIF format + if "findings" in findings_data: + # Native FuzzForge format + findings_list = findings_data.get("findings", []) summary = { - "total_issues": len(results), + "total_issues": len(findings_list), "by_severity": {}, "by_rule": {}, - "tools": [] + "by_source": {} } - for result in results: - level = result.get("level", "note") - rule_id = result.get("ruleId", "unknown") + for finding in findings_list: + severity = finding.get("severity", "info") + rule_id = finding.get("rule_id", "unknown") + module = finding.get("found_by", {}).get("module", "unknown") - summary["by_severity"][level] = summary["by_severity"].get(level, 0) + 1 + summary["by_severity"][severity] = summary["by_severity"].get(severity, 0) + 1 summary["by_rule"][rule_id] = summary["by_rule"].get(rule_id, 0) + 1 + summary["by_source"][module] = summary["by_source"].get(module, 0) + 1 + + elif "runs" in findings_data: + # SARIF format (backward compatibility) + runs_data = findings_data.get("runs", []) + if runs_data: + results = runs_data[0].get("results", []) + summary = { + "total_issues": len(results), + "by_severity": {}, + "by_rule": {}, + "tools": [] + } + + for result in results: + level = result.get("level", "note") + rule_id = result.get("ruleId", "unknown") + + summary["by_severity"][level] = summary["by_severity"].get(level, 0) + 1 + summary["by_rule"][rule_id] = summary["by_rule"].get(rule_id, 0) + 1 - # Extract tool info - tool = runs_data[0].get("tool", {}) - driver = tool.get("driver", {}) - if driver.get("name"): - summary["tools"].append({ - "name": driver.get("name"), - "version": driver.get("version"), - "rules": len(driver.get("rules", [])) - }) + # Extract tool info + tool = runs_data[0].get("tool", {}) + driver = tool.get("driver", {}) + if driver.get("name"): + summary["tools"].append({ + "name": driver.get("name"), + "version": driver.get("version"), + "rules": len(driver.get("rules", [])) + }) finding_record = FindingRecord( run_id=run_id, - sarif_data=sarif_data, + findings_data=findings_data, summary=summary, created_at=datetime.now() ) @@ -174,9 +196,9 @@ def show_finding( with get_client() as client: console.print(f"šŸ” Fetching findings for run: {run_id}") findings = client.get_run_findings(run_id) - findings_dict = findings.sarif # Will become native format + findings_dict = findings.sarif # API still returns .sarif for now else: - findings_dict = findings_data.sarif_data # Will become findings_data + findings_dict = findings_data.findings_data # Find the specific finding by unique ID # For now, support both SARIF (old) and native format (new) @@ -239,9 +261,9 @@ def show_findings_by_rule( with get_client() as client: console.print(f"šŸ” Fetching findings for run: {run_id}") findings = client.get_run_findings(run_id) - findings_dict = findings.sarif + findings_dict = findings.sarif # API still returns .sarif for now else: - findings_dict = findings_data.sarif_data + findings_dict = findings_data.findings_data # Find all findings matching the rule matching_findings = [] diff --git a/cli/src/fuzzforge_cli/database.py b/cli/src/fuzzforge_cli/database.py index 3c8e86cf..88615f73 100644 --- a/cli/src/fuzzforge_cli/database.py +++ b/cli/src/fuzzforge_cli/database.py @@ -46,7 +46,7 @@ class FindingRecord(BaseModel): """Database record for findings""" id: Optional[int] = None run_id: str - sarif_data: Dict[str, Any] + findings_data: Dict[str, Any] # Native FuzzForge format summary: Dict[str, Any] = {} created_at: datetime @@ -81,7 +81,7 @@ class FuzzForgeDatabase: CREATE TABLE IF NOT EXISTS findings ( id INTEGER PRIMARY KEY AUTOINCREMENT, run_id TEXT NOT NULL, - sarif_data TEXT NOT NULL, + findings_data TEXT NOT NULL, summary TEXT DEFAULT '{}', created_at TIMESTAMP NOT NULL, FOREIGN KEY (run_id) REFERENCES runs (run_id) @@ -292,21 +292,21 @@ def update_run_status(self, run_id: str, status: str, completed_at: Optional[dat # Findings management methods def save_findings(self, finding: FindingRecord) -> int: - """Save findings and return the ID""" + """Save findings in native FuzzForge format and return the ID""" with self.connection() as conn: cursor = conn.execute(""" - INSERT INTO findings (run_id, sarif_data, summary, created_at) + INSERT INTO findings (run_id, findings_data, summary, created_at) VALUES (?, ?, ?, ?) """, ( finding.run_id, - json.dumps(finding.sarif_data), + json.dumps(finding.findings_data), json.dumps(finding.summary), finding.created_at )) return cursor.lastrowid def get_findings(self, run_id: str) -> Optional[FindingRecord]: - """Get findings for a run""" + """Get findings for a run in native FuzzForge format""" with self.connection() as conn: row = conn.execute( "SELECT * FROM findings WHERE run_id = ? ORDER BY created_at DESC LIMIT 1", @@ -317,14 +317,14 @@ def get_findings(self, run_id: str) -> Optional[FindingRecord]: return FindingRecord( id=row["id"], run_id=row["run_id"], - sarif_data=json.loads(row["sarif_data"]), + findings_data=json.loads(row["findings_data"]), summary=json.loads(row["summary"]), created_at=row["created_at"] ) return None def list_findings(self, limit: int = 50) -> List[FindingRecord]: - """List recent findings""" + """List recent findings in native FuzzForge format""" with self.connection() as conn: rows = conn.execute(""" SELECT * FROM findings @@ -336,7 +336,7 @@ def list_findings(self, limit: int = 50) -> List[FindingRecord]: FindingRecord( id=row["id"], run_id=row["run_id"], - sarif_data=json.loads(row["sarif_data"]), + findings_data=json.loads(row["findings_data"]), summary=json.loads(row["summary"]), created_at=row["created_at"] ) @@ -380,18 +380,17 @@ def get_all_findings(self, finding = FindingRecord( id=row["id"], run_id=row["run_id"], - sarif_data=json.loads(row["sarif_data"]), + findings_data=json.loads(row["findings_data"]), summary=json.loads(row["summary"]), created_at=row["created_at"] ) - # Filter by severity if specified + # Filter by severity if specified (native format) if severity: finding_severities = set() - if "runs" in finding.sarif_data: - for run in finding.sarif_data["runs"]: - for result in run.get("results", []): - finding_severities.add(result.get("level", "note").lower()) + if "findings" in finding.findings_data: + for f in finding.findings_data["findings"]: + finding_severities.add(f.get("severity", "info").lower()) if not any(sev.lower() in finding_severities for sev in severity): continue @@ -408,7 +407,7 @@ def get_findings_by_workflow(self, workflow: str) -> List[FindingRecord]: return self.get_all_findings(workflow=workflow) def get_aggregated_stats(self) -> Dict[str, Any]: - """Get aggregated statistics for all findings using SQL aggregation""" + """Get aggregated statistics for all findings using native format and SQL aggregation""" with self.connection() as conn: # Total findings and runs total_findings = conn.execute("SELECT COUNT(*) FROM findings").fetchone()[0] @@ -429,39 +428,38 @@ def get_aggregated_stats(self) -> Dict[str, Any]: WHERE created_at > datetime('now', '-7 days') """).fetchone()[0] - # Use SQL JSON functions to aggregate severity stats efficiently + # Use SQL JSON functions to aggregate severity stats efficiently (native format) # This avoids loading all findings into memory severity_stats = conn.execute(""" SELECT - SUM(json_array_length(json_extract(sarif_data, '$.runs[0].results'))) as total_issues, + SUM(json_array_length(json_extract(findings_data, '$.findings'))) as total_issues, COUNT(*) as finding_count FROM findings - WHERE json_extract(sarif_data, '$.runs[0].results') IS NOT NULL + WHERE json_extract(findings_data, '$.findings') IS NOT NULL """).fetchone() total_issues = severity_stats["total_issues"] or 0 - # Get severity distribution using SQL + # Get severity distribution using native format (critical/high/medium/low/info) # Note: This is a simplified version - for full accuracy we'd need JSON parsing # But it's much more efficient than loading all data into Python - severity_counts = {"error": 0, "warning": 0, "note": 0, "info": 0} + severity_counts = {"critical": 0, "high": 0, "medium": 0, "low": 0, "info": 0} # Sample the first N findings for severity distribution # This gives a good approximation without loading everything sample_findings = conn.execute(""" - SELECT sarif_data + SELECT findings_data FROM findings LIMIT ? """, (STATS_SAMPLE_SIZE,)).fetchall() for row in sample_findings: try: - data = json.loads(row["sarif_data"]) - if "runs" in data: - for run in data["runs"]: - for result in run.get("results", []): - level = result.get("level", "note").lower() - severity_counts[level] = severity_counts.get(level, 0) + 1 + data = json.loads(row["findings_data"]) + if "findings" in data: + for finding in data["findings"]: + severity = finding.get("severity", "info").lower() + severity_counts[severity] = severity_counts.get(severity, 0) + 1 except (json.JSONDecodeError, KeyError): continue From d327ff06abd26752d78728fbd1ff74adf0c68ead Mon Sep 17 00:00:00 2001 From: tduhamel42 Date: Sun, 2 Nov 2025 14:52:15 +0100 Subject: [PATCH 03/16] refactor: Convert reporter to native format and update backend models - Renamed sarif_reporter.py to native_reporter.py to reflect new functionality - Updated WorkflowFindings model to use native format - Field name 'sarif' kept for API compatibility but now contains native format - Updated docstring to reflect native format usage - Converted SARIFReporter to Native Reporter: - Module name changed from sarif_reporter to native_reporter (v2.0.0) - Updated metadata and input/output schemas - Removed SARIF-specific config (tool_name, include_code_flows) - Added native format config (workflow_name, run_id) - Implemented native report generation: - Added _generate_native_report() method - Generates native FuzzForge format with full field support: - Unique finding IDs - found_by attribution (module, tool, type) - LLM context when applicable - Full severity scale (critical/high/medium/low/info) - Confidence levels - CWE and OWASP mappings - Enhanced location info (columns, snippets) - References and metadata - Added _create_native_summary() for aggregated stats - Summary includes counts by severity, confidence, category, source, and type - Tracks affected files count - Kept old SARIF generation methods for reference - Will be moved to separate SARIF exporter module Breaking changes: - Reporter now outputs native format instead of SARIF - Existing workflows using sarif_reporter will need updates - Config parameters changed (tool_name -> workflow_name, etc.) --- backend/src/models/findings.py | 4 +- .../{sarif_reporter.py => native_reporter.py} | 184 ++++++++++++++---- 2 files changed, 150 insertions(+), 38 deletions(-) rename backend/toolbox/modules/reporter/{sarif_reporter.py => native_reporter.py} (66%) diff --git a/backend/src/models/findings.py b/backend/src/models/findings.py index b71a9b64..0eaea6bf 100644 --- a/backend/src/models/findings.py +++ b/backend/src/models/findings.py @@ -19,10 +19,10 @@ class WorkflowFindings(BaseModel): - """Findings from a workflow execution in SARIF format""" + """Findings from a workflow execution in native FuzzForge format""" workflow: str = Field(..., description="Workflow name") run_id: str = Field(..., description="Unique run identifier") - sarif: Dict[str, Any] = Field(..., description="SARIF formatted findings") + sarif: Dict[str, Any] = Field(..., description="Findings in native FuzzForge format (field name kept for API compatibility)") metadata: Dict[str, Any] = Field(default_factory=dict, description="Additional metadata") diff --git a/backend/toolbox/modules/reporter/sarif_reporter.py b/backend/toolbox/modules/reporter/native_reporter.py similarity index 66% rename from backend/toolbox/modules/reporter/sarif_reporter.py rename to backend/toolbox/modules/reporter/native_reporter.py index 2a8bec75..eb799f8d 100644 --- a/backend/toolbox/modules/reporter/sarif_reporter.py +++ b/backend/toolbox/modules/reporter/native_reporter.py @@ -1,5 +1,6 @@ """ -SARIF Reporter Module - Generates SARIF-formatted security reports +Native Reporter Module - Generates native FuzzForge format security reports +(Previously SARIF Reporter - now generates native format, SARIF export available separately) """ # Copyright (c) 2025 FuzzingLabs @@ -31,50 +32,46 @@ class SARIFReporter(BaseModule): """ - Generates SARIF (Static Analysis Results Interchange Format) reports. + Generates native FuzzForge format security reports. This module: - - Converts findings to SARIF format + - Converts findings to native FuzzForge format - Aggregates results from multiple modules - Adds metadata and context - Provides actionable recommendations + - (SARIF export available via separate exporter module) """ def get_metadata(self) -> ModuleMetadata: """Get module metadata""" return ModuleMetadata( - name="sarif_reporter", - version="1.0.0", - description="Generates SARIF-formatted security reports", + name="native_reporter", + version="2.0.0", + description="Generates native FuzzForge format security reports", author="FuzzForge Team", category="reporter", - tags=["reporting", "sarif", "output"], + tags=["reporting", "native", "output"], input_schema={ "findings": { "type": "array", "description": "List of findings to report", "required": True }, - "tool_name": { + "workflow_name": { "type": "string", - "description": "Name of the tool", + "description": "Name of the workflow", "default": "FuzzForge Security Assessment" }, - "tool_version": { + "run_id": { "type": "string", - "description": "Tool version", - "default": "1.0.0" - }, - "include_code_flows": { - "type": "boolean", - "description": "Include code flow information", - "default": False + "description": "Run identifier", + "required": True } }, output_schema={ - "sarif": { + "native": { "type": "object", - "description": "SARIF 2.1.0 formatted report" + "description": "Native FuzzForge findings format" } }, requires_workspace=False # Reporter doesn't need direct workspace access @@ -88,22 +85,21 @@ def validate_config(self, config: Dict[str, Any]) -> bool: async def execute(self, config: Dict[str, Any], workspace: Path = None) -> ModuleResult: """ - Execute the SARIF reporter module. + Execute the native reporter module. Args: config: Module configuration with findings workspace: Optional workspace path for context Returns: - ModuleResult with SARIF report + ModuleResult with native format report """ self.start_timer() self.validate_config(config) # Get configuration - tool_name = config.get("tool_name", "FuzzForge Security Assessment") - tool_version = config.get("tool_version", "1.0.0") - include_code_flows = config.get("include_code_flows", False) + workflow_name = config.get("workflow_name", "FuzzForge Security Assessment") + run_id = config.get("run_id", "unknown") # Collect findings from either direct findings or module results all_findings = [] @@ -123,16 +119,14 @@ async def execute(self, config: Dict[str, Any], workspace: Path = None) -> Modul elif hasattr(module_result, "findings"): all_findings.extend(module_result.findings) - logger.info(f"Generating SARIF report for {len(all_findings)} findings") + logger.info(f"Generating native format report for {len(all_findings)} findings") try: - # Generate SARIF report - sarif_report = self._generate_sarif( + # Generate native format report + native_report = self._generate_native_report( findings=all_findings, - tool_name=tool_name, - tool_version=tool_version, - include_code_flows=include_code_flows, - workspace_path=str(workspace) if workspace else None + workflow_name=workflow_name, + run_id=run_id ) # Create summary @@ -146,23 +140,141 @@ async def execute(self, config: Dict[str, Any], workspace: Path = None) -> Modul findings=[], # Reporter doesn't generate new findings summary=summary, metadata={ - "tool_name": tool_name, - "tool_version": tool_version, - "report_format": "SARIF 2.1.0", + "workflow_name": workflow_name, + "run_id": run_id, + "report_format": "Native FuzzForge 1.0.0", "total_findings": len(all_findings) }, error=None, - sarif=sarif_report # Add SARIF as custom field + sarif=native_report # Field name kept for API compatibility ) except Exception as e: - logger.error(f"SARIF reporter failed: {e}") + logger.error(f"Native reporter failed: {e}") return self.create_result( findings=[], status="failed", error=str(e) ) + def _generate_native_report( + self, + findings: List[ModuleFinding], + workflow_name: str, + run_id: str + ) -> Dict[str, Any]: + """ + Generate native FuzzForge format report. + + Args: + findings: List of findings to report + workflow_name: Name of the workflow + run_id: Run identifier + + Returns: + Native FuzzForge formatted dictionary + """ + # Convert ModuleFinding objects to native format dictionaries + findings_list = [] + for finding in findings: + finding_dict = { + "id": finding.id, + "rule_id": finding.rule_id, + "found_by": { + "module": finding.found_by.module, + "tool_name": finding.found_by.tool_name, + "tool_version": finding.found_by.tool_version, + "type": finding.found_by.type + }, + "title": finding.title, + "description": finding.description, + "severity": finding.severity, + "confidence": finding.confidence, + "category": finding.category, + "recommendation": finding.recommendation, + "references": finding.references + } + + # Add optional fields + if finding.cwe: + finding_dict["cwe"] = finding.cwe + if finding.owasp: + finding_dict["owasp"] = finding.owasp + if finding.llm_context: + finding_dict["llm_context"] = { + "model": finding.llm_context.model, + "prompt": finding.llm_context.prompt, + "temperature": finding.llm_context.temperature + } + + # Add location if available + if finding.file_path: + finding_dict["location"] = { + "file": finding.file_path, + "line_start": finding.line_start, + "line_end": finding.line_end, + "column_start": finding.column_start, + "column_end": finding.column_end, + "snippet": finding.code_snippet + } + + finding_dict["metadata"] = finding.metadata + findings_list.append(finding_dict) + + # Create summary + from datetime import datetime + summary = self._create_native_summary(findings) + + # Build native format structure + native_report = { + "version": "1.0.0", + "run_id": run_id, + "workflow": workflow_name, + "timestamp": datetime.utcnow().isoformat() + "Z", + "findings": findings_list, + "summary": summary + } + + return native_report + + def _create_native_summary(self, findings: List[ModuleFinding]) -> Dict[str, Any]: + """Create summary for native format""" + summary = { + "total_findings": len(findings), + "by_severity": {}, + "by_confidence": {}, + "by_category": {}, + "by_source": {}, + "by_type": {}, + "affected_files": 0 + } + + affected_files = set() + + for finding in findings: + # Count by severity + summary["by_severity"][finding.severity] = summary["by_severity"].get(finding.severity, 0) + 1 + + # Count by confidence + summary["by_confidence"][finding.confidence] = summary["by_confidence"].get(finding.confidence, 0) + 1 + + # Count by category + summary["by_category"][finding.category] = summary["by_category"].get(finding.category, 0) + 1 + + # Count by source (module) + summary["by_source"][finding.found_by.module] = summary["by_source"].get(finding.found_by.module, 0) + 1 + + # Count by type + summary["by_type"][finding.found_by.type] = summary["by_type"].get(finding.found_by.type, 0) + 1 + + # Track affected files + if finding.file_path: + affected_files.add(finding.file_path) + + summary["affected_files"] = len(affected_files) + return summary + + # Keep old SARIF methods for reference/future SARIF export module def _generate_sarif( self, findings: List[ModuleFinding], From 98f938932c784ffb9fef4439934376d6e4bdd57b Mon Sep 17 00:00:00 2001 From: tduhamel42 Date: Sun, 2 Nov 2025 15:09:33 +0100 Subject: [PATCH 04/16] feat: Enhanced CLI display with improved tables and details MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Priority 3 implementation: **Improved Table Display:** - Removed hardcoded 50-result limit - Added pagination with --limit and --offset parameters - New table columns: Finding ID (8 chars), Confidence (H/M/L), Found By - Supports both native and SARIF formats with auto-detection - Proper severity ordering (critical > high > medium > low > info) - Pagination footer showing "Showing X-Y of Z results" **Syntax Highlighting:** - Added syntax-highlighted code snippets using Rich's Syntax - Auto-detects language from file extension (20+ languages supported) - Line numbers with correct start line from finding location - Monokai theme for better readability **Enhanced Detail View:** - Confidence indicators with emoji (🟢 High, 🟔 Medium, šŸ”“ Low) - Type-specific badges (šŸ¤– LLM, šŸ”§ Tool, šŸŽÆ Fuzzer, šŸ‘¤ Manual) - LLM context display with model name and prompt preview - Better formatted found_by info with module and type - Added suggestion to view all findings with same rule - Cleaner recommendation display with šŸ’” icon **New Commands:** - Added 'ff findings by-rule' command to show all findings matching a rule - Registered as @app.command("by-rule") **Updated Related Commands:** - all_findings: Updated to use 5-level severity (critical/high/medium/low/info) - Table columns changed from Error/Warning/Note to Critical/High/Medium/Low - Summary panel updated with proper severity mapping - Support for both native and SARIF format findings_data **Breaking Changes:** - Severity display changed from 3-level (error/warning/note) to 5-level - Table structure modified with new columns - Old SARIF-only views deprecated in favor of format-agnostic displays --- cli/src/fuzzforge_cli/commands/findings.py | 306 +++++++++++++++------ 1 file changed, 228 insertions(+), 78 deletions(-) diff --git a/cli/src/fuzzforge_cli/commands/findings.py b/cli/src/fuzzforge_cli/commands/findings.py index d0afcfe8..72fbfc17 100644 --- a/cli/src/fuzzforge_cli/commands/findings.py +++ b/cli/src/fuzzforge_cli/commands/findings.py @@ -66,6 +66,14 @@ def get_findings( format: str = typer.Option( "table", "--format", "-f", help="Output format: table, json, sarif" + ), + limit: Optional[int] = typer.Option( + None, "--limit", "-l", + help="Maximum number of findings to display (no limit by default)" + ), + offset: int = typer.Option( + 0, "--offset", + help="Number of findings to skip (for pagination)" ) ): """ @@ -160,12 +168,12 @@ def get_findings( console.print(sarif_json) else: # table format - display_findings_table(findings.sarif) + display_findings_table(findings.sarif, limit=limit, offset=offset) # Suggest export command and show command - console.print(f"\nšŸ’” View full details of a finding: [bold cyan]ff finding show {run_id} --rule [/bold cyan]") - console.print(f"šŸ’” Export these findings: [bold cyan]ff findings export {run_id} --format sarif[/bold cyan]") - console.print(" Supported formats: [cyan]sarif[/cyan] (standard), [cyan]json[/cyan], [cyan]csv[/cyan], [cyan]html[/cyan]") + console.print(f"\nšŸ’” View full details of a finding: [bold cyan]ff finding show {run_id} --id [/bold cyan]") + console.print(f"šŸ’” Export these findings: [bold cyan]ff findings export {run_id} --format native[/bold cyan]") + console.print(" Supported formats: [cyan]native[/cyan] (default), [cyan]sarif[/cyan], [cyan]json[/cyan], [cyan]csv[/cyan], [cyan]html[/cyan]") except Exception as e: console.print(f"āŒ Failed to get findings: {e}", style="red") @@ -237,6 +245,7 @@ def show_finding( raise typer.Exit(1) +@app.command("by-rule") def show_findings_by_rule( run_id: str = typer.Argument(..., help="Run ID to get findings from"), rule_id: str = typer.Option(..., "--rule", "-r", help="Rule ID to filter findings") @@ -405,7 +414,15 @@ def display_finding_detail(finding: Dict[str, Any], run_id: str): content_lines.append(f"[bold]Finding ID:[/bold] {finding_id}") content_lines.append(f"[bold]Rule ID:[/bold] {rule_id}") content_lines.append(f"[bold]Title:[/bold] {title}") - content_lines.append(f"[bold]Severity:[/bold] [{severity_color}]{severity.upper()}[/{severity_color}] (Confidence: {confidence})") + + # Confidence indicator with emoji + confidence_indicators = { + "high": "🟢", + "medium": "🟔", + "low": "šŸ”“" + } + confidence_emoji = confidence_indicators.get(confidence.lower(), "⚪") + content_lines.append(f"[bold]Severity:[/bold] [{severity_color}]{severity.upper()}[/{severity_color}] [bold]Confidence:[/bold] {confidence_emoji} {confidence.capitalize()}") if cwe: content_lines.append(f"[bold]CWE:[/bold] {cwe}") @@ -414,11 +431,26 @@ def display_finding_detail(finding: Dict[str, Any], run_id: str): content_lines.append(f"[bold]Category:[/bold] {category}") content_lines.append(f"[bold]Location:[/bold] {location_str}") - content_lines.append(f"[bold]Found by:[/bold] {tool_name} v{tool_version} ({module}) [{detection_type}]") + # Enhanced found_by display with badge + type_badges = { + "llm": "šŸ¤–", + "tool": "šŸ”§", + "fuzzer": "šŸŽÆ", + "manual": "šŸ‘¤" + } + type_badge = type_badges.get(detection_type.lower(), "šŸ”") + content_lines.append(f"[bold]Found by:[/bold] {type_badge} {tool_name} v{tool_version} [dim]({module})[/dim] [[yellow]{detection_type}[/yellow]]") + + # LLM context details if llm_context: model = llm_context.get("model", "unknown") + prompt = llm_context.get("prompt", "") content_lines.append(f"[bold]LLM Model:[/bold] {model}") + if prompt: + # Show first 100 chars of prompt + prompt_preview = prompt[:100] + "..." if len(prompt) > 100 else prompt + content_lines.append(f"[bold]Prompt:[/bold] [dim]{prompt_preview}[/dim]") content_lines.append(f"[bold]Run ID:[/bold] {run_id}") content_lines.append("") @@ -427,14 +459,9 @@ def display_finding_detail(finding: Dict[str, Any], run_id: str): if recommendation: content_lines.append("") - content_lines.append("[bold]Recommendation:[/bold]") + content_lines.append("[bold]šŸ’” Recommendation:[/bold]") content_lines.append(recommendation) - if code_snippet: - content_lines.append("") - content_lines.append("[bold]Code Snippet:[/bold]") - content_lines.append(f"[dim]{code_snippet}[/dim]") - content = "\n".join(content_lines) # Display in panel @@ -446,106 +473,217 @@ def display_finding_detail(finding: Dict[str, Any], run_id: str): box=box.ROUNDED, padding=(1, 2) )) + + # Display code snippet with syntax highlighting (separate from panel for better rendering) + if code_snippet: + # Detect language from file path + language = "text" + if is_native and location: + file_path = location.get("file", "") + elif not is_native and locations: + file_path = locations[0].get("physicalLocation", {}).get("artifactLocation", {}).get("uri", "") + else: + file_path = "" + + if file_path: + ext = Path(file_path).suffix.lower() + language_map = { + ".py": "python", + ".js": "javascript", + ".ts": "typescript", + ".java": "java", + ".c": "c", + ".cpp": "cpp", + ".cc": "cpp", + ".h": "c", + ".hpp": "cpp", + ".go": "go", + ".rs": "rust", + ".rb": "ruby", + ".php": "php", + ".swift": "swift", + ".kt": "kotlin", + ".cs": "csharp", + ".html": "html", + ".xml": "xml", + ".json": "json", + ".yaml": "yaml", + ".yml": "yaml", + ".sh": "bash", + ".bash": "bash", + ".sql": "sql", + } + language = language_map.get(ext, "text") + + console.print("\n[bold]Code Snippet:[/bold]") + syntax = Syntax( + code_snippet, + language, + theme="monokai", + line_numbers=True, + start_line=line_start if is_native and location.get("line_start") else 1 + ) + console.print(syntax) + console.print() + console.print(f"šŸ’” View all findings with this rule: [bold cyan]ff findings by-rule {run_id} --rule {rule_id}[/bold cyan]") console.print(f"šŸ’” Export this run: [bold cyan]ff findings export {run_id} --format native[/bold cyan]") -def display_findings_table(sarif_data: Dict[str, Any]): - """Display SARIF findings in a rich table format""" - runs = sarif_data.get("runs", []) - if not runs: - console.print("ā„¹ļø No findings data available", style="dim") - return +def display_findings_table(findings_data: Dict[str, Any], limit: Optional[int] = None, offset: int = 0): + """Display findings in a rich table format (supports both native and SARIF formats)""" - run_data = runs[0] - results = run_data.get("results", []) - tool = run_data.get("tool", {}) - driver = tool.get("driver", {}) + # Detect format and extract findings + is_native = "findings" in findings_data + + if is_native: + # Native FuzzForge format + findings_list = findings_data.get("findings", []) + workflow = findings_data.get("workflow", "Unknown") + total_findings = len(findings_list) + else: + # SARIF format (backward compatibility) + runs = findings_data.get("runs", []) + if not runs: + console.print("ā„¹ļø No findings data available", style="dim") + return + + run_data = runs[0] + findings_list = run_data.get("results", []) + tool = run_data.get("tool", {}).get("driver", {}) + workflow = tool.get("name", "Unknown") + total_findings = len(findings_list) # Tool information console.print("\nšŸ” [bold]Security Analysis Results[/bold]") - if driver.get("name"): - console.print(f"Tool: {driver.get('name')} v{driver.get('version', 'unknown')}") + console.print(f"Workflow: {workflow}") - if not results: + if not findings_list: console.print("āœ… No security issues found!", style="green") return # Summary statistics summary_by_level = {} - for result in results: - level = result.get("level", "note") + for finding in findings_list: + if is_native: + level = finding.get("severity", "info") + else: + level = finding.get("level", "note") summary_by_level[level] = summary_by_level.get(level, 0) + 1 summary_table = Table(show_header=False, box=box.SIMPLE) summary_table.add_column("Severity", width=15, justify="left", style="bold") summary_table.add_column("Count", width=8, justify="right", style="bold") - for level, count in sorted(summary_by_level.items()): - # Create Rich Text object with color styling - level_text = level.upper() - severity_text = Text(level_text, style=severity_style(level)) + # Sort by severity order (critical > high > medium > low > info) + severity_order = {"critical": 0, "high": 1, "error": 1, "medium": 2, "warning": 2, "low": 3, "note": 3, "info": 4} + for level in sorted(summary_by_level.keys(), key=lambda x: severity_order.get(x, 99)): + count = summary_by_level[level] + severity_text = Text(level.upper(), style=severity_style(level)) count_text = Text(str(count)) - summary_table.add_row(severity_text, count_text) console.print( Panel.fit( summary_table, - title=f"šŸ“Š Summary ({len(results)} total issues)", + title=f"šŸ“Š Summary ({total_findings} total issues)", box=box.ROUNDED ) ) - # Detailed results - Rich Text-based table with proper emoji alignment + # Apply pagination + start_idx = offset + end_idx = start_idx + limit if limit else len(findings_list) + paginated_findings = findings_list[start_idx:end_idx] + + # Detailed results table with enhanced columns results_table = Table(box=box.ROUNDED) - results_table.add_column("Severity", width=12, justify="left", no_wrap=True) - results_table.add_column("Rule", justify="left", style="bold cyan", no_wrap=True) - results_table.add_column("Message", width=45, justify="left", no_wrap=True) - results_table.add_column("Location", width=20, justify="left", style="dim", no_wrap=True) + results_table.add_column("ID", width=10, justify="left", style="dim") + results_table.add_column("Severity", width=10, justify="left", no_wrap=True) + results_table.add_column("Conf", width=4, justify="center", no_wrap=True) # Confidence + results_table.add_column("Rule", width=18, justify="left", style="bold cyan", no_wrap=True) + results_table.add_column("Message", width=35, justify="left", no_wrap=True) + results_table.add_column("Found By", width=15, justify="left", style="yellow", no_wrap=True) + results_table.add_column("Location", width=18, justify="left", style="dim", no_wrap=True) + + for finding in paginated_findings: + if is_native: + # Native format + finding_id = finding.get("id", "")[:8] # First 8 chars + severity = finding.get("severity", "info") + confidence = finding.get("confidence", "medium")[0].upper() # H/M/L + rule_id = finding.get("rule_id", "unknown") + message = finding.get("title", "No message") + found_by_info = finding.get("found_by", {}) + found_by = found_by_info.get("module", "unknown") + + location = finding.get("location", {}) + file_path = location.get("file", "") + line_start = location.get("line_start") + location_str = "" + if file_path: + location_str = Path(file_path).name + if line_start: + location_str += f":{line_start}" + else: + # SARIF format + props = finding.get("properties", {}) + finding_id = props.get("findingId", "")[:8] if props.get("findingId") else "N/A" + severity = finding.get("level", "note") + confidence = "M" # Not available in SARIF + rule_id = finding.get("ruleId", "unknown") + message = finding.get("message", {}).get("text", "No message") + found_by = "unknown" + + locations = finding.get("locations", []) + location_str = "" + if locations: + physical_location = locations[0].get("physicalLocation", {}) + artifact_location = physical_location.get("artifactLocation", {}) + region = physical_location.get("region", {}) - for result in results[:50]: # Limit to first 50 results - level = result.get("level", "note") - rule_id = result.get("ruleId", "unknown") - message = result.get("message", {}).get("text", "No message") + file_path = artifact_location.get("uri", "") + if file_path: + location_str = Path(file_path).name + if region.get("startLine"): + location_str += f":{region['startLine']}" - # Extract location information - locations = result.get("locations", []) - location_str = "" - if locations: - physical_location = locations[0].get("physicalLocation", {}) - artifact_location = physical_location.get("artifactLocation", {}) - region = physical_location.get("region", {}) + # Create styled text objects + severity_text = Text(severity.upper(), style=severity_style(severity)) - file_path = artifact_location.get("uri", "") - if file_path: - location_str = Path(file_path).name - if region.get("startLine"): - location_str += f":{region['startLine']}" - if region.get("startColumn"): - location_str += f":{region['startColumn']}" + # Confidence badge with color + conf_color = {"H": "green", "M": "yellow", "L": "red"}.get(confidence, "white") + confidence_text = Text(confidence, style=f"bold {conf_color}") - # Create Rich Text objects with color styling - severity_text = Text(level.upper(), style=severity_style(level)) - severity_text.truncate(12, overflow="ellipsis") + # Truncate long text + rule_text = Text(rule_id) + rule_text.truncate(18, overflow="ellipsis") - # Show full rule ID without truncation message_text = Text(message) - message_text.truncate(45, overflow="ellipsis") + message_text.truncate(35, overflow="ellipsis") + + found_by_text = Text(found_by) + found_by_text.truncate(15, overflow="ellipsis") location_text = Text(location_str) - location_text.truncate(20, overflow="ellipsis") + location_text.truncate(18, overflow="ellipsis") results_table.add_row( + finding_id, severity_text, - rule_id, # Pass string directly to show full UUID + confidence_text, + rule_text, message_text, + found_by_text, location_text ) console.print("\nšŸ“‹ [bold]Detailed Results[/bold]") - if len(results) > 50: - console.print(f"Showing first 50 of {len(results)} results") + + # Pagination info + if limit and total_findings > limit: + console.print(f"Showing {start_idx + 1}-{min(end_idx, total_findings)} of {total_findings} results") + console.print() console.print(results_table) @@ -932,9 +1070,10 @@ def all_findings( [cyan]Recent Findings (7 days):[/cyan] {stats['recent_findings']} [bold]Severity Distribution:[/bold] - šŸ”“ Errors: {stats['severity_distribution'].get('error', 0)} - 🟔 Warnings: {stats['severity_distribution'].get('warning', 0)} - šŸ”µ Notes: {stats['severity_distribution'].get('note', 0)} + šŸ”“ Critical: {stats['severity_distribution'].get('critical', 0)} + 🟠 High: {stats['severity_distribution'].get('high', 0) + stats['severity_distribution'].get('error', 0)} + 🟔 Medium: {stats['severity_distribution'].get('medium', 0) + stats['severity_distribution'].get('warning', 0)} + šŸ”µ Low: {stats['severity_distribution'].get('low', 0) + stats['severity_distribution'].get('note', 0)} ā„¹ļø Info: {stats['severity_distribution'].get('info', 0)} [bold]By Workflow:[/bold]""" @@ -975,9 +1114,10 @@ def all_findings( table.add_column("Workflow", style="dim", width=20) table.add_column("Date", justify="center") table.add_column("Issues", justify="center", style="bold") - table.add_column("Errors", justify="center", style="red") - table.add_column("Warnings", justify="center", style="yellow") - table.add_column("Notes", justify="center", style="blue") + table.add_column("Critical", justify="center", style="red") + table.add_column("High", justify="center", style="red") + table.add_column("Medium", justify="center", style="yellow") + table.add_column("Low", justify="center", style="blue") # Get run info for each finding runs_info = {} @@ -996,19 +1136,29 @@ def all_findings( total_issues = summary.get("total_issues", 0) by_severity = summary.get("by_severity", {}) - # Count issues from SARIF data if summary is incomplete - if total_issues == 0 and "runs" in finding.sarif_data: - for run in finding.sarif_data["runs"]: - total_issues += len(run.get("results", [])) + # Count issues from findings_data if summary is incomplete + if total_issues == 0: + if "findings" in finding.findings_data: + total_issues = len(finding.findings_data.get("findings", [])) + elif "runs" in finding.findings_data: + for run in finding.findings_data["runs"]: + total_issues += len(run.get("results", [])) + + # Support both native (critical/high/medium/low) and SARIF (error/warning/note) severities + critical = by_severity.get("critical", 0) + high = by_severity.get("high", 0) + by_severity.get("error", 0) # Map error to high + medium = by_severity.get("medium", 0) + by_severity.get("warning", 0) # Map warning to medium + low = by_severity.get("low", 0) + by_severity.get("note", 0) # Map note to low table.add_row( run_id, # Show full Run ID workflow_name[:17] + "..." if len(workflow_name) > 20 else workflow_name, finding.created_at.strftime("%Y-%m-%d %H:%M"), str(total_issues), - str(by_severity.get("error", 0)), - str(by_severity.get("warning", 0)), - str(by_severity.get("note", 0)) + str(critical), + str(high), + str(medium), + str(low) ) console.print(table) From 99cee284b6b274a573ab3c85cfcd6c8cdb7a3a63 Mon Sep 17 00:00:00 2001 From: tduhamel42 Date: Sun, 2 Nov 2025 15:15:19 +0100 Subject: [PATCH 05/16] fix: Update reporter __init__.py to import from native_reporter Fixed broken import after renaming sarif_reporter.py to native_reporter.py --- backend/toolbox/modules/reporter/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/toolbox/modules/reporter/__init__.py b/backend/toolbox/modules/reporter/__init__.py index 7812ff12..38f1725d 100644 --- a/backend/toolbox/modules/reporter/__init__.py +++ b/backend/toolbox/modules/reporter/__init__.py @@ -9,6 +9,6 @@ # # Additional attribution and requirements are provided in the NOTICE file. -from .sarif_reporter import SARIFReporter +from .native_reporter import SARIFReporter __all__ = ["SARIFReporter"] \ No newline at end of file From fccd8f32ab599cfebfddff98e6eda190bac4c585 Mon Sep 17 00:00:00 2001 From: tduhamel42 Date: Sun, 2 Nov 2025 15:36:30 +0100 Subject: [PATCH 06/16] refactor: Update all modules to use new create_finding signature Updated 10 modules to use the new create_finding() signature with required rule_id and found_by parameters: - llm_analyzer.py: Added FoundBy and LLMContext for AI-detected findings - bandit_analyzer.py: Added tool attribution and moved CWE/confidence to proper fields - security_analyzer.py: Updated all three finding types (secrets, SQL injection, dangerous functions) - mypy_analyzer.py: Added tool attribution and moved column info to column_start - mobsf_scanner.py: Updated all 6 finding types (permissions, manifest, code analysis, behavior) with proper line number handling - opengrep_android.py: Added tool attribution, proper CWE/OWASP formatting, and confidence mapping - dependency_scanner.py: Added pip-audit attribution for CVE findings - file_scanner.py: Updated both sensitive file and enumeration findings - cargo_fuzzer.py: Added fuzzer type attribution for crash findings - atheris_fuzzer.py: Added fuzzer type attribution for Python crash findings All modules now properly track: - Finding source (module, tool name, version, type) - Confidence levels (high/medium/low) - CWE and OWASP mappings where applicable - LLM context for AI-detected issues --- .../modules/analyzer/bandit_analyzer.py | 30 ++++++-- .../toolbox/modules/analyzer/llm_analyzer.py | 61 +++++++++++++-- .../toolbox/modules/analyzer/mypy_analyzer.py | 19 ++++- .../modules/analyzer/security_analyzer.py | 40 +++++++++- .../toolbox/modules/android/mobsf_scanner.py | 77 ++++++++++++++++--- .../modules/android/opengrep_android.py | 31 ++++++-- .../toolbox/modules/fuzzer/atheris_fuzzer.py | 13 +++- .../toolbox/modules/fuzzer/cargo_fuzzer.py | 13 +++- .../modules/scanner/dependency_scanner.py | 17 +++- .../toolbox/modules/scanner/file_scanner.py | 20 ++++- 10 files changed, 275 insertions(+), 46 deletions(-) diff --git a/backend/toolbox/modules/analyzer/bandit_analyzer.py b/backend/toolbox/modules/analyzer/bandit_analyzer.py index ecf81a84..b5c20746 100644 --- a/backend/toolbox/modules/analyzer/bandit_analyzer.py +++ b/backend/toolbox/modules/analyzer/bandit_analyzer.py @@ -21,12 +21,12 @@ from typing import Dict, Any, List try: - from toolbox.modules.base import BaseModule, ModuleMetadata, ModuleResult, ModuleFinding + from toolbox.modules.base import BaseModule, ModuleMetadata, ModuleResult, ModuleFinding, FoundBy except ImportError: try: - from modules.base import BaseModule, ModuleMetadata, ModuleResult, ModuleFinding + from modules.base import BaseModule, ModuleMetadata, ModuleResult, ModuleFinding, FoundBy except ImportError: - from src.toolbox.modules.base import BaseModule, ModuleMetadata, ModuleResult, ModuleFinding + from src.toolbox.modules.base import BaseModule, ModuleMetadata, ModuleResult, ModuleFinding, FoundBy logger = logging.getLogger(__name__) @@ -237,12 +237,34 @@ def _convert_to_findings( except (ValueError, TypeError): rel_path = Path(filename).name + # Extract confidence and CWE + confidence = issue.get("issue_confidence", "LOW").lower() + cwe_info = issue.get("issue_cwe", {}) + cwe_id = f"CWE-{cwe_info.get('id')}" if cwe_info and cwe_info.get("id") else None + + # Create FoundBy attribution + # Try to get Bandit version from metrics, fall back to unknown + bandit_version = "unknown" + if "metrics" in bandit_result: + bandit_version = bandit_result["metrics"].get("_version", "unknown") + + found_by = FoundBy( + module="bandit_analyzer", + tool_name="Bandit", + tool_version=bandit_version, + type="tool" + ) + # Create finding finding = self.create_finding( + rule_id=test_id, title=f"{test_name} ({test_id})", description=issue_text, severity=severity, category="security-issue", + found_by=found_by, + confidence=confidence, + cwe=cwe_id, file_path=str(rel_path), line_start=line_number, line_end=line_number, @@ -251,8 +273,6 @@ def _convert_to_findings( metadata={ "test_id": test_id, "test_name": test_name, - "confidence": issue.get("issue_confidence", "LOW").lower(), - "cwe": issue.get("issue_cwe", {}).get("id") if issue.get("issue_cwe") else None, "more_info": issue.get("more_info", "") } ) diff --git a/backend/toolbox/modules/analyzer/llm_analyzer.py b/backend/toolbox/modules/analyzer/llm_analyzer.py index b3b13748..1d504cf7 100644 --- a/backend/toolbox/modules/analyzer/llm_analyzer.py +++ b/backend/toolbox/modules/analyzer/llm_analyzer.py @@ -18,12 +18,12 @@ from typing import Dict, Any, List try: - from toolbox.modules.base import BaseModule, ModuleMetadata, ModuleResult + from toolbox.modules.base import BaseModule, ModuleMetadata, ModuleResult, FoundBy, LLMContext except ImportError: try: - from modules.base import BaseModule, ModuleMetadata, ModuleResult + from modules.base import BaseModule, ModuleMetadata, ModuleResult, FoundBy, LLMContext except ImportError: - from src.toolbox.modules.base import BaseModule, ModuleMetadata, ModuleResult + from src.toolbox.modules.base import BaseModule, ModuleMetadata, ModuleResult, FoundBy, LLMContext logger = logging.getLogger(__name__) @@ -270,10 +270,14 @@ async def _analyze_file( return [] # Parse LLM response into findings + full_prompt = f"{system_prompt}\n\n{user_message}" findings = self._parse_llm_response( llm_response=llm_response, file_path=file_path, - workspace=workspace + workspace=workspace, + llm_model=llm_model, + llm_provider=llm_provider, + prompt=full_prompt ) return findings @@ -282,7 +286,10 @@ def _parse_llm_response( self, llm_response: str, file_path: Path, - workspace: Path + workspace: Path, + llm_model: str, + llm_provider: str, + prompt: str ) -> List: """Parse LLM response into structured findings""" @@ -302,7 +309,9 @@ def _parse_llm_response( if line.startswith("ISSUE:"): # Save previous issue if exists if current_issue: - findings.append(self._create_module_finding(current_issue, relative_path)) + findings.append(self._create_module_finding( + current_issue, relative_path, llm_model, llm_provider, prompt + )) current_issue = {"title": line.replace("ISSUE:", "").strip()} elif line.startswith("SEVERITY:"): @@ -320,11 +329,20 @@ def _parse_llm_response( # Save last issue if current_issue: - findings.append(self._create_module_finding(current_issue, relative_path)) + findings.append(self._create_module_finding( + current_issue, relative_path, llm_model, llm_provider, prompt + )) return findings - def _create_module_finding(self, issue: Dict[str, Any], file_path: str): + def _create_module_finding( + self, + issue: Dict[str, Any], + file_path: str, + llm_model: str, + llm_provider: str, + prompt: str + ): """Create a ModuleFinding from parsed issue""" severity_map = { @@ -334,12 +352,39 @@ def _create_module_finding(self, issue: Dict[str, Any], file_path: str): "info": "low" } + # Determine confidence based on severity (LLM is more confident on critical issues) + confidence_map = { + "error": "high", + "warning": "medium", + "note": "medium", + "info": "low" + } + + # Create FoundBy attribution + found_by = FoundBy( + module="llm_analyzer", + tool_name=f"{llm_provider}/{llm_model}", + tool_version="1.0.0", + type="llm" + ) + + # Create LLM context + llm_context = LLMContext( + model=llm_model, + prompt=prompt, + temperature=None # Not exposed in current config + ) + # Use base class helper to create proper ModuleFinding return self.create_finding( + rule_id=f"llm_security_{issue.get('severity', 'warning')}", title=issue.get("title", "Security issue detected"), description=issue.get("description", ""), severity=severity_map.get(issue.get("severity", "warning"), "medium"), category="security", + found_by=found_by, + confidence=confidence_map.get(issue.get("severity", "warning"), "medium"), + llm_context=llm_context, file_path=file_path, line_start=issue.get("line"), metadata={ diff --git a/backend/toolbox/modules/analyzer/mypy_analyzer.py b/backend/toolbox/modules/analyzer/mypy_analyzer.py index 9d3e39f4..e9823c53 100644 --- a/backend/toolbox/modules/analyzer/mypy_analyzer.py +++ b/backend/toolbox/modules/analyzer/mypy_analyzer.py @@ -21,12 +21,12 @@ from typing import Dict, Any, List try: - from toolbox.modules.base import BaseModule, ModuleMetadata, ModuleResult, ModuleFinding + from toolbox.modules.base import BaseModule, ModuleMetadata, ModuleResult, ModuleFinding, FoundBy except ImportError: try: - from modules.base import BaseModule, ModuleMetadata, ModuleResult, ModuleFinding + from modules.base import BaseModule, ModuleMetadata, ModuleResult, ModuleFinding, FoundBy except ImportError: - from src.toolbox.modules.base import BaseModule, ModuleMetadata, ModuleResult, ModuleFinding + from src.toolbox.modules.base import BaseModule, ModuleMetadata, ModuleResult, ModuleFinding, FoundBy logger = logging.getLogger(__name__) @@ -189,18 +189,29 @@ def _parse_mypy_output(self, output: str, workspace: Path) -> List[ModuleFinding title = f"Type error: {error_code or 'type-issue'}" description = message + # Create FoundBy attribution + found_by = FoundBy( + module="mypy_analyzer", + tool_name="Mypy", + tool_version="unknown", # Mypy doesn't include version in output + type="tool" + ) + finding = self.create_finding( + rule_id=error_code or "type-issue", title=title, description=description, severity=severity, category="type-error", + found_by=found_by, + confidence="high", # Mypy is highly confident in its type checking file_path=str(rel_path), line_start=int(line_num), line_end=int(line_num), + column_start=int(column) if column else None, recommendation="Review and fix the type inconsistency or add appropriate type annotations", metadata={ "error_code": error_code or "unknown", - "column": int(column) if column else None, "level": level } ) diff --git a/backend/toolbox/modules/analyzer/security_analyzer.py b/backend/toolbox/modules/analyzer/security_analyzer.py index 3b4a2ea1..0f537da1 100644 --- a/backend/toolbox/modules/analyzer/security_analyzer.py +++ b/backend/toolbox/modules/analyzer/security_analyzer.py @@ -19,12 +19,12 @@ from typing import Dict, Any, List try: - from toolbox.modules.base import BaseModule, ModuleMetadata, ModuleResult, ModuleFinding + from toolbox.modules.base import BaseModule, ModuleMetadata, ModuleResult, ModuleFinding, FoundBy except ImportError: try: - from modules.base import BaseModule, ModuleMetadata, ModuleResult, ModuleFinding + from modules.base import BaseModule, ModuleMetadata, ModuleResult, ModuleFinding, FoundBy except ImportError: - from src.toolbox.modules.base import BaseModule, ModuleMetadata, ModuleResult, ModuleFinding + from src.toolbox.modules.base import BaseModule, ModuleMetadata, ModuleResult, ModuleFinding, FoundBy logger = logging.getLogger(__name__) @@ -217,11 +217,22 @@ def _check_hardcoded_secrets( if self._is_false_positive_secret(match.group(0)): continue + # Create FoundBy attribution + found_by = FoundBy( + module="security_analyzer", + tool_name="Security Analyzer", + tool_version="1.0.0", + type="tool" + ) + findings.append(self.create_finding( + rule_id=f"hardcoded_{secret_type.lower().replace(' ', '_')}", title=f"Hardcoded {secret_type} detected", description=f"Found potential hardcoded {secret_type} in {file_path}", severity="high" if "key" in secret_type.lower() else "medium", category="hardcoded_secret", + found_by=found_by, + confidence="medium", file_path=str(file_path), line_start=line_num, code_snippet=line_content.strip()[:100], @@ -261,11 +272,23 @@ def _check_sql_injection( line_num = content[:match.start()].count('\n') + 1 line_content = lines[line_num - 1] if line_num <= len(lines) else "" + # Create FoundBy attribution + found_by = FoundBy( + module="security_analyzer", + tool_name="Security Analyzer", + tool_version="1.0.0", + type="tool" + ) + findings.append(self.create_finding( + rule_id=f"sql_injection_{vuln_type.lower().replace(' ', '_')}", title=f"Potential SQL Injection: {vuln_type}", description=f"Detected potential SQL injection vulnerability via {vuln_type}", severity="high", category="sql_injection", + found_by=found_by, + confidence="medium", + cwe="CWE-89", file_path=str(file_path), line_start=line_num, code_snippet=line_content.strip()[:100], @@ -323,11 +346,22 @@ def _check_dangerous_functions( line_num = content[:match.start()].count('\n') + 1 line_content = lines[line_num - 1] if line_num <= len(lines) else "" + # Create FoundBy attribution + found_by = FoundBy( + module="security_analyzer", + tool_name="Security Analyzer", + tool_version="1.0.0", + type="tool" + ) + findings.append(self.create_finding( + rule_id=f"dangerous_function_{func_name.replace('()', '').replace('.', '_')}", title=f"Dangerous function: {func_name}", description=f"Use of potentially dangerous function {func_name}: {risk_type}", severity="medium", category="dangerous_function", + found_by=found_by, + confidence="medium", file_path=str(file_path), line_start=line_num, code_snippet=line_content.strip()[:100], diff --git a/backend/toolbox/modules/android/mobsf_scanner.py b/backend/toolbox/modules/android/mobsf_scanner.py index 3b16e1b4..07073fe3 100644 --- a/backend/toolbox/modules/android/mobsf_scanner.py +++ b/backend/toolbox/modules/android/mobsf_scanner.py @@ -24,12 +24,12 @@ import aiohttp try: - from toolbox.modules.base import BaseModule, ModuleMetadata, ModuleFinding, ModuleResult + from toolbox.modules.base import BaseModule, ModuleMetadata, ModuleFinding, ModuleResult, FoundBy except ImportError: try: - from modules.base import BaseModule, ModuleMetadata, ModuleFinding, ModuleResult + from modules.base import BaseModule, ModuleMetadata, ModuleFinding, ModuleResult, FoundBy except ImportError: - from src.toolbox.modules.base import BaseModule, ModuleMetadata, ModuleFinding, ModuleResult + from src.toolbox.modules.base import BaseModule, ModuleMetadata, ModuleFinding, ModuleResult, FoundBy logger = logging.getLogger(__name__) @@ -278,6 +278,14 @@ def _parse_scan_results(self, scan_data: Dict[str, Any], apk_path: Path) -> List """Parse MobSF JSON results into standardized findings""" findings = [] + # Create FoundBy attribution for all MobSF findings + found_by = FoundBy( + module="mobsf_scanner", + tool_name="MobSF", + tool_version="3.9.7", + type="tool" + ) + # Parse permissions if 'permissions' in scan_data: for perm_name, perm_attrs in scan_data['permissions'].items(): @@ -287,10 +295,13 @@ def _parse_scan_results(self, scan_data: Dict[str, Any], apk_path: Path) -> List ) finding = self.create_finding( + rule_id=f"android_permission_{perm_name.replace('.', '_')}", title=f"Android Permission: {perm_name}", description=perm_attrs.get('description', 'No description'), severity=severity, category="android-permission", + found_by=found_by, + confidence="high", metadata={ 'permission': perm_name, 'status': perm_attrs.get('status'), @@ -307,13 +318,19 @@ def _parse_scan_results(self, scan_data: Dict[str, Any], apk_path: Path) -> List if isinstance(item, dict): severity = self.SEVERITY_MAP.get(item.get('severity', '').lower(), 'medium') + title = item.get('title') or item.get('name') or "Manifest Issue" + rule = item.get('rule') or "manifest_issue" + finding = self.create_finding( - title=item.get('title') or item.get('name') or "Manifest Issue", + rule_id=f"android_manifest_{rule.replace(' ', '_').replace('-', '_')}", + title=title, description=item.get('description', 'No description'), severity=severity, category="android-manifest", + found_by=found_by, + confidence="high", metadata={ - 'rule': item.get('rule'), + 'rule': rule, 'tool': 'mobsf', } ) @@ -335,16 +352,32 @@ def _parse_scan_results(self, scan_data: Dict[str, Any], apk_path: Path) -> List # Create a finding for each affected file if isinstance(files_dict, dict) and files_dict: for file_path, line_numbers in files_dict.items(): + # Extract first line number if available + line_start = None + if line_numbers: + try: + # Can be string like "28" or "65,81" + line_start = int(str(line_numbers).split(',')[0]) + except (ValueError, AttributeError): + pass + + # Extract CWE from metadata + cwe_value = metadata_dict.get('cwe') + cwe_id = f"CWE-{cwe_value}" if cwe_value else None + finding = self.create_finding( + rule_id=finding_name.replace(' ', '_').replace('-', '_'), title=finding_name, description=metadata_dict.get('description', 'No description'), severity=severity, category="android-code-analysis", + found_by=found_by, + confidence="medium", + cwe=cwe_id, + owasp=metadata_dict.get('owasp'), file_path=file_path, - line_number=line_numbers, # Can be string like "28" or "65,81" + line_start=line_start, metadata={ - 'cwe': metadata_dict.get('cwe'), - 'owasp': metadata_dict.get('owasp'), 'masvs': metadata_dict.get('masvs'), 'cvss': metadata_dict.get('cvss'), 'ref': metadata_dict.get('ref'), @@ -355,14 +388,21 @@ def _parse_scan_results(self, scan_data: Dict[str, Any], apk_path: Path) -> List findings.append(finding) else: # Fallback: create one finding without file info + # Extract CWE from metadata + cwe_value = metadata_dict.get('cwe') + cwe_id = f"CWE-{cwe_value}" if cwe_value else None + finding = self.create_finding( + rule_id=finding_name.replace(' ', '_').replace('-', '_'), title=finding_name, description=metadata_dict.get('description', 'No description'), severity=severity, category="android-code-analysis", + found_by=found_by, + confidence="medium", + cwe=cwe_id, + owasp=metadata_dict.get('owasp'), metadata={ - 'cwe': metadata_dict.get('cwe'), - 'owasp': metadata_dict.get('owasp'), 'masvs': metadata_dict.get('masvs'), 'cvss': metadata_dict.get('cvss'), 'ref': metadata_dict.get('ref'), @@ -389,13 +429,25 @@ def _parse_scan_results(self, scan_data: Dict[str, Any], apk_path: Path) -> List # Create a finding for each affected file if isinstance(files_dict, dict) and files_dict: for file_path, line_numbers in files_dict.items(): + # Extract first line number if available + line_start = None + if line_numbers: + try: + # Can be string like "28" or "65,81" + line_start = int(str(line_numbers).split(',')[0]) + except (ValueError, AttributeError): + pass + finding = self.create_finding( + rule_id=f"android_behavior_{key.replace(' ', '_').replace('-', '_')}", title=f"Behavior: {label}", description=metadata_dict.get('description', 'No description'), severity=severity, category="android-behavior", + found_by=found_by, + confidence="medium", file_path=file_path, - line_number=line_numbers, + line_start=line_start, metadata={ 'line_numbers': line_numbers, 'behavior_key': key, @@ -406,10 +458,13 @@ def _parse_scan_results(self, scan_data: Dict[str, Any], apk_path: Path) -> List else: # Fallback: create one finding without file info finding = self.create_finding( + rule_id=f"android_behavior_{key.replace(' ', '_').replace('-', '_')}", title=f"Behavior: {label}", description=metadata_dict.get('description', 'No description'), severity=severity, category="android-behavior", + found_by=found_by, + confidence="medium", metadata={ 'behavior_key': key, 'tool': 'mobsf', diff --git a/backend/toolbox/modules/android/opengrep_android.py b/backend/toolbox/modules/android/opengrep_android.py index 01e32c4e..d3ce9267 100644 --- a/backend/toolbox/modules/android/opengrep_android.py +++ b/backend/toolbox/modules/android/opengrep_android.py @@ -23,12 +23,12 @@ from typing import Dict, Any, List try: - from toolbox.modules.base import BaseModule, ModuleMetadata, ModuleFinding, ModuleResult + from toolbox.modules.base import BaseModule, ModuleMetadata, ModuleFinding, ModuleResult, FoundBy except ImportError: try: - from modules.base import BaseModule, ModuleMetadata, ModuleFinding, ModuleResult + from modules.base import BaseModule, ModuleMetadata, ModuleFinding, ModuleResult, FoundBy except ImportError: - from src.toolbox.modules.base import BaseModule, ModuleMetadata, ModuleFinding, ModuleResult + from src.toolbox.modules.base import BaseModule, ModuleMetadata, ModuleFinding, ModuleResult, FoundBy logger = logging.getLogger(__name__) @@ -302,23 +302,40 @@ def _parse_opengrep_output(self, output: str, workspace: Path, config: Dict[str, # Map severity to our standard levels finding_severity = self._map_severity(severity) + # Map confidence + confidence_map = {"HIGH": "high", "MEDIUM": "medium", "LOW": "low"} + finding_confidence = confidence_map.get(confidence, "medium") + + # Format CWE and OWASP + cwe_id = f"CWE-{cwe[0]}" if cwe and isinstance(cwe, list) and cwe else None + owasp_str = owasp[0] if owasp and isinstance(owasp, list) and owasp else None + + # Create FoundBy attribution + found_by = FoundBy( + module="opengrep_android", + tool_name="OpenGrep", + tool_version="1.45.0", + type="tool" + ) + # Create finding finding = self.create_finding( + rule_id=rule_id, title=f"Android Security: {rule_id}", description=message or f"OpenGrep rule {rule_id} triggered", severity=finding_severity, category=self._get_category(rule_id, extra), + found_by=found_by, + confidence=finding_confidence, + cwe=cwe_id, + owasp=owasp_str, file_path=path_info if path_info else None, line_start=start_line if start_line > 0 else None, line_end=end_line if end_line > 0 and end_line != start_line else None, code_snippet=lines.strip() if lines else None, recommendation=self._get_recommendation(rule_id, extra), metadata={ - "rule_id": rule_id, "opengrep_severity": severity, - "confidence": confidence, - "cwe": cwe, - "owasp": owasp, "fix": extra.get("fix", ""), "impact": extra.get("impact", ""), "likelihood": extra.get("likelihood", ""), diff --git a/backend/toolbox/modules/fuzzer/atheris_fuzzer.py b/backend/toolbox/modules/fuzzer/atheris_fuzzer.py index 3f0c42d6..68a1e6c1 100644 --- a/backend/toolbox/modules/fuzzer/atheris_fuzzer.py +++ b/backend/toolbox/modules/fuzzer/atheris_fuzzer.py @@ -19,7 +19,7 @@ import uuid import httpx -from modules.base import BaseModule, ModuleMetadata, ModuleResult, ModuleFinding +from modules.base import BaseModule, ModuleMetadata, ModuleResult, ModuleFinding, FoundBy logger = logging.getLogger(__name__) @@ -556,7 +556,16 @@ async def _generate_findings(self, target_path: Path) -> List[ModuleFinding]: # Encode crash input for storage crash_input_b64 = base64.b64encode(crash["input"]).decode() + # Create FoundBy attribution + found_by = FoundBy( + module="atheris_fuzzer", + tool_name="Atheris", + tool_version="unknown", + type="fuzzer" + ) + finding = self.create_finding( + rule_id=f"fuzzer_crash_{crash['exception_type'].lower().replace(' ', '_')}", title=f"Crash: {crash['exception_type']}", description=( f"Atheris found crash during fuzzing:\n" @@ -566,6 +575,8 @@ async def _generate_findings(self, target_path: Path) -> List[ModuleFinding]: ), severity="critical", category="crash", + found_by=found_by, + confidence="high", # Fuzzer-found crashes are highly reliable file_path=str(target_path), metadata={ "crash_input_base64": crash_input_b64, diff --git a/backend/toolbox/modules/fuzzer/cargo_fuzzer.py b/backend/toolbox/modules/fuzzer/cargo_fuzzer.py index c4fc746c..1f7522f0 100644 --- a/backend/toolbox/modules/fuzzer/cargo_fuzzer.py +++ b/backend/toolbox/modules/fuzzer/cargo_fuzzer.py @@ -13,7 +13,7 @@ from pathlib import Path from typing import Dict, Any, List, Optional, Callable -from modules.base import BaseModule, ModuleMetadata, ModuleResult, ModuleFinding +from modules.base import BaseModule, ModuleMetadata, ModuleResult, ModuleFinding, FoundBy logger = logging.getLogger(__name__) @@ -426,14 +426,25 @@ async def _analyze_crash( else: severity = "high" + # Create FoundBy attribution + found_by = FoundBy( + module="cargo_fuzz", + tool_name="cargo-fuzz", + tool_version="0.11.2", + type="fuzzer" + ) + # Create finding finding = self.create_finding( + rule_id=f"fuzzer_crash_{error_type.lower().replace(' ', '_')}", title=f"Crash: {error_type} in {target_name}", description=f"Cargo-fuzz discovered a crash in target '{target_name}'. " f"Error type: {error_type}. " f"Input size: {len(crash_input)} bytes.", severity=severity, category="crash", + found_by=found_by, + confidence="high", # Fuzzer-found crashes are highly reliable file_path=f"fuzz/fuzz_targets/{target_name}.rs", code_snippet=stack_trace[:500], recommendation="Review the crash details and fix the underlying bug. " diff --git a/backend/toolbox/modules/scanner/dependency_scanner.py b/backend/toolbox/modules/scanner/dependency_scanner.py index 4c7791c4..831b0059 100644 --- a/backend/toolbox/modules/scanner/dependency_scanner.py +++ b/backend/toolbox/modules/scanner/dependency_scanner.py @@ -21,12 +21,12 @@ from typing import Dict, Any, List try: - from toolbox.modules.base import BaseModule, ModuleMetadata, ModuleResult, ModuleFinding + from toolbox.modules.base import BaseModule, ModuleMetadata, ModuleResult, ModuleFinding, FoundBy except ImportError: try: - from modules.base import BaseModule, ModuleMetadata, ModuleResult, ModuleFinding + from modules.base import BaseModule, ModuleMetadata, ModuleResult, ModuleFinding, FoundBy except ImportError: - from src.toolbox.modules.base import BaseModule, ModuleMetadata, ModuleResult, ModuleFinding + from src.toolbox.modules.base import BaseModule, ModuleMetadata, ModuleResult, ModuleFinding, FoundBy logger = logging.getLogger(__name__) @@ -201,11 +201,22 @@ def _convert_to_findings( recommendation = f"Upgrade {package_name} to a fixed version: {', '.join(fix_versions)}" if fix_versions else f"Check for updates to {package_name}" + # Create FoundBy attribution + found_by = FoundBy( + module="dependency_scanner", + tool_name="pip-audit", + tool_version="unknown", + type="tool" + ) + finding = self.create_finding( + rule_id=f"vulnerable_dependency_{package_name}", title=f"Vulnerable dependency: {package_name} ({vuln_id})", description=f"{description}\n\nAffected package: {package_name} {package_version}", severity=severity, category="vulnerable-dependency", + found_by=found_by, + confidence="high", # pip-audit uses official CVE database file_path=str(rel_path), recommendation=recommendation, metadata={ diff --git a/backend/toolbox/modules/scanner/file_scanner.py b/backend/toolbox/modules/scanner/file_scanner.py index 22de2002..f7a39869 100644 --- a/backend/toolbox/modules/scanner/file_scanner.py +++ b/backend/toolbox/modules/scanner/file_scanner.py @@ -20,12 +20,12 @@ import hashlib try: - from toolbox.modules.base import BaseModule, ModuleMetadata, ModuleResult + from toolbox.modules.base import BaseModule, ModuleMetadata, ModuleResult, FoundBy except ImportError: try: - from modules.base import BaseModule, ModuleMetadata, ModuleResult + from modules.base import BaseModule, ModuleMetadata, ModuleResult, FoundBy except ImportError: - from src.toolbox.modules.base import BaseModule, ModuleMetadata, ModuleResult + from src.toolbox.modules.base import BaseModule, ModuleMetadata, ModuleResult, FoundBy logger = logging.getLogger(__name__) @@ -122,6 +122,14 @@ async def execute(self, config: Dict[str, Any], workspace: Path) -> ModuleResult logger.info(f"Scanning workspace with patterns: {patterns}") + # Create FoundBy attribution for all findings + found_by = FoundBy( + module="file_scanner", + tool_name="File Scanner", + tool_version="1.0.0", + type="tool" + ) + try: # Scan for each pattern for pattern in patterns: @@ -152,10 +160,13 @@ async def execute(self, config: Dict[str, Any], workspace: Path) -> ModuleResult # Check for sensitive files if check_sensitive and self._is_sensitive_file(file_path): findings.append(self.create_finding( + rule_id="sensitive_file", title=f"Potentially sensitive file: {relative_path.name}", description=f"Found potentially sensitive file at {relative_path}", severity="medium", category="sensitive_file", + found_by=found_by, + confidence="medium", file_path=str(relative_path), metadata={ "file_size": file_size, @@ -170,10 +181,13 @@ async def execute(self, config: Dict[str, Any], workspace: Path) -> ModuleResult # Create informational finding for each file findings.append(self.create_finding( + rule_id="file_enumeration", title=f"File discovered: {relative_path.name}", description=f"File: {relative_path}", severity="info", category="file_enumeration", + found_by=found_by, + confidence="high", file_path=str(relative_path), metadata={ "file_size": file_size, From ef8384c4795adec51c27744332494632aaa26a8d Mon Sep 17 00:00:00 2001 From: tduhamel42 Date: Sun, 2 Nov 2025 16:20:37 +0100 Subject: [PATCH 07/16] fix: Update main.py to use finding_id instead of rule_id parameter Aligns main.py with the updated findings.py command that changed from --rule to --id for finding lookups by unique UUID. --- cli/src/fuzzforge_cli/main.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cli/src/fuzzforge_cli/main.py b/cli/src/fuzzforge_cli/main.py index 66b7c25c..7f5aa547 100644 --- a/cli/src/fuzzforge_cli/main.py +++ b/cli/src/fuzzforge_cli/main.py @@ -263,13 +263,13 @@ def workflow_main(): @finding_app.command("show") def show_finding_detail( run_id: str = typer.Argument(..., help="Run ID to get finding from"), - rule_id: str = typer.Option(..., "--rule", "-r", help="Rule ID of the specific finding to show") + finding_id: str = typer.Option(..., "--id", "-i", help="Unique ID of the specific finding to show") ): """ šŸ” Show detailed information about a specific finding """ from .commands.findings import show_finding - show_finding(run_id=run_id, rule_id=rule_id) + show_finding(run_id=run_id, finding_id=finding_id) @finding_app.callback(invoke_without_command=True) From 3af9a0023cd6fa2147ca5fba1f984b613e26e287 Mon Sep 17 00:00:00 2001 From: tduhamel42 Date: Sun, 2 Nov 2025 16:29:32 +0100 Subject: [PATCH 08/16] feat: Remove Confidence column from findings table to reduce confusion Removes the Confidence column from the findings table display to eliminate confusion with the Severity column (both used High/Medium/Low terminology). Changes: - Removed 'Conf' column from table structure - Removed confidence extraction logic for both native and SARIF formats - Removed confidence badge creation and styling - Table now shows: ID | Severity | Rule | Message | Found By | Location Confidence data is still available in detailed finding view (ff finding show). --- cli/src/fuzzforge_cli/commands/findings.py | 8 -------- 1 file changed, 8 deletions(-) diff --git a/cli/src/fuzzforge_cli/commands/findings.py b/cli/src/fuzzforge_cli/commands/findings.py index 72fbfc17..2bcb52c8 100644 --- a/cli/src/fuzzforge_cli/commands/findings.py +++ b/cli/src/fuzzforge_cli/commands/findings.py @@ -600,7 +600,6 @@ def display_findings_table(findings_data: Dict[str, Any], limit: Optional[int] = results_table = Table(box=box.ROUNDED) results_table.add_column("ID", width=10, justify="left", style="dim") results_table.add_column("Severity", width=10, justify="left", no_wrap=True) - results_table.add_column("Conf", width=4, justify="center", no_wrap=True) # Confidence results_table.add_column("Rule", width=18, justify="left", style="bold cyan", no_wrap=True) results_table.add_column("Message", width=35, justify="left", no_wrap=True) results_table.add_column("Found By", width=15, justify="left", style="yellow", no_wrap=True) @@ -611,7 +610,6 @@ def display_findings_table(findings_data: Dict[str, Any], limit: Optional[int] = # Native format finding_id = finding.get("id", "")[:8] # First 8 chars severity = finding.get("severity", "info") - confidence = finding.get("confidence", "medium")[0].upper() # H/M/L rule_id = finding.get("rule_id", "unknown") message = finding.get("title", "No message") found_by_info = finding.get("found_by", {}) @@ -630,7 +628,6 @@ def display_findings_table(findings_data: Dict[str, Any], limit: Optional[int] = props = finding.get("properties", {}) finding_id = props.get("findingId", "")[:8] if props.get("findingId") else "N/A" severity = finding.get("level", "note") - confidence = "M" # Not available in SARIF rule_id = finding.get("ruleId", "unknown") message = finding.get("message", {}).get("text", "No message") found_by = "unknown" @@ -651,10 +648,6 @@ def display_findings_table(findings_data: Dict[str, Any], limit: Optional[int] = # Create styled text objects severity_text = Text(severity.upper(), style=severity_style(severity)) - # Confidence badge with color - conf_color = {"H": "green", "M": "yellow", "L": "red"}.get(confidence, "white") - confidence_text = Text(confidence, style=f"bold {conf_color}") - # Truncate long text rule_text = Text(rule_id) rule_text.truncate(18, overflow="ellipsis") @@ -671,7 +664,6 @@ def display_findings_table(findings_data: Dict[str, Any], limit: Optional[int] = results_table.add_row( finding_id, severity_text, - confidence_text, rule_text, message_text, found_by_text, From ce87346321f013203193225e091f59f3dbb240b5 Mon Sep 17 00:00:00 2001 From: tduhamel42 Date: Sun, 2 Nov 2025 16:35:21 +0100 Subject: [PATCH 09/16] feat: Remove Rule column from findings table for cleaner display Removes the Rule column from findings table to simplify the view and reduce redundancy with the Message column. Rule ID is still available in: - Detailed finding view (ff finding show --id ) - By-rule grouping command (ff findings by-rule --rule ) Changes: - Removed 'Rule' column from table structure - Removed rule_text extraction and styling logic - Expanded Message column from 35 to 50 chars (more space available) - Expanded Location column from 18 to 20 chars - Table now shows: ID | Severity | Message | Found By | Location Benefits: - Cleaner, more scannable table - Message column has more room to show details - Less visual clutter while maintaining all functionality --- cli/src/fuzzforge_cli/commands/findings.py | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/cli/src/fuzzforge_cli/commands/findings.py b/cli/src/fuzzforge_cli/commands/findings.py index 2bcb52c8..242995c4 100644 --- a/cli/src/fuzzforge_cli/commands/findings.py +++ b/cli/src/fuzzforge_cli/commands/findings.py @@ -600,10 +600,9 @@ def display_findings_table(findings_data: Dict[str, Any], limit: Optional[int] = results_table = Table(box=box.ROUNDED) results_table.add_column("ID", width=10, justify="left", style="dim") results_table.add_column("Severity", width=10, justify="left", no_wrap=True) - results_table.add_column("Rule", width=18, justify="left", style="bold cyan", no_wrap=True) - results_table.add_column("Message", width=35, justify="left", no_wrap=True) + results_table.add_column("Message", width=50, justify="left", no_wrap=True) results_table.add_column("Found By", width=15, justify="left", style="yellow", no_wrap=True) - results_table.add_column("Location", width=18, justify="left", style="dim", no_wrap=True) + results_table.add_column("Location", width=20, justify="left", style="dim", no_wrap=True) for finding in paginated_findings: if is_native: @@ -649,11 +648,8 @@ def display_findings_table(findings_data: Dict[str, Any], limit: Optional[int] = severity_text = Text(severity.upper(), style=severity_style(severity)) # Truncate long text - rule_text = Text(rule_id) - rule_text.truncate(18, overflow="ellipsis") - message_text = Text(message) - message_text.truncate(35, overflow="ellipsis") + message_text.truncate(50, overflow="ellipsis") found_by_text = Text(found_by) found_by_text.truncate(15, overflow="ellipsis") @@ -664,7 +660,6 @@ def display_findings_table(findings_data: Dict[str, Any], limit: Optional[int] = results_table.add_row( finding_id, severity_text, - rule_text, message_text, found_by_text, location_text From 0f06e7b1770807c56d1e727615b2594be74fea17 Mon Sep 17 00:00:00 2001 From: tduhamel42 Date: Mon, 3 Nov 2025 14:06:11 +0100 Subject: [PATCH 10/16] feat(cli): modernize HTML export with interactive charts and filters - Rewrite export_to_html with Bootstrap 5 styling - Add Chart.js visualizations (severity, type, category, source) - Add executive summary dashboard with stat cards - Add interactive filtering by severity, type, and search - Add sortable table columns - Add expandable row details with full finding information - Add Prism.js syntax highlighting for code snippets - Display LLM context, confidence, CWE/OWASP, recommendations - Make responsive with print-friendly CSS - Update extract_simplified_findings to handle native format - Update export_to_csv to handle native format with more fields - Fix export functions to use findings_data instead of sarif_data - Add safe_escape helper to handle None values --- cli/src/fuzzforge_cli/commands/findings.py | 935 +++++++++++++++++---- 1 file changed, 784 insertions(+), 151 deletions(-) diff --git a/cli/src/fuzzforge_cli/commands/findings.py b/cli/src/fuzzforge_cli/commands/findings.py index 242995c4..ea5a5364 100644 --- a/cli/src/fuzzforge_cli/commands/findings.py +++ b/cli/src/fuzzforge_cli/commands/findings.py @@ -757,14 +757,14 @@ def export_findings( try: # Get findings from database first, fallback to API - findings_data = db.get_findings(run_id) - if not findings_data: + findings_record = db.get_findings(run_id) + if not findings_record: console.print(f"šŸ“” Fetching findings from API for run: {run_id}") with get_client() as client: findings = client.get_run_findings(run_id) - sarif_data = findings.sarif + findings_data = findings.sarif else: - sarif_data = findings_data.sarif_data + findings_data = findings_record.findings_data # Generate output filename with timestamp for uniqueness if not output: @@ -776,19 +776,19 @@ def export_findings( # Export based on format if format == "sarif": with open(output_path, 'w') as f: - json.dump(sarif_data, f, indent=2) + json.dump(findings_data, f, indent=2) elif format == "json": # Simplified JSON format - simplified_data = extract_simplified_findings(sarif_data) + simplified_data = extract_simplified_findings(findings_data) with open(output_path, 'w') as f: json.dump(simplified_data, f, indent=2) elif format == "csv": - export_to_csv(sarif_data, output_path) + export_to_csv(findings_data, output_path) elif format == "html": - export_to_html(sarif_data, output_path, run_id) + export_to_html(findings_data, output_path, run_id) else: console.print(f"āŒ Unsupported format: {format}", style="red") @@ -801,71 +801,81 @@ def export_findings( raise typer.Exit(1) -def extract_simplified_findings(sarif_data: Dict[str, Any]) -> Dict[str, Any]: - """Extract simplified findings structure from SARIF""" - runs = sarif_data.get("runs", []) - if not runs: - return {"findings": [], "summary": {}} - - run_data = runs[0] - results = run_data.get("results", []) - tool = run_data.get("tool", {}).get("driver", {}) - - simplified = { - "tool": { - "name": tool.get("name", "Unknown"), - "version": tool.get("version", "Unknown") - }, - "summary": { - "total_issues": len(results), - "by_severity": {} - }, - "findings": [] - } - - for result in results: - level = result.get("level", "note") - simplified["summary"]["by_severity"][level] = simplified["summary"]["by_severity"].get(level, 0) + 1 - - # Extract location - location_info = {} - locations = result.get("locations", []) - if locations: - physical_location = locations[0].get("physicalLocation", {}) - artifact_location = physical_location.get("artifactLocation", {}) - region = physical_location.get("region", {}) - - location_info = { - "file": artifact_location.get("uri", ""), - "line": region.get("startLine"), - "column": region.get("startColumn") - } - - simplified["findings"].append({ - "rule_id": result.get("ruleId", "unknown"), - "severity": level, - "message": result.get("message", {}).get("text", ""), - "location": location_info - }) - - return simplified +def extract_simplified_findings(findings_data: Dict[str, Any]) -> Dict[str, Any]: + """Extract simplified findings structure from native format or SARIF""" + # Detect format + is_native = "findings" in findings_data and "version" in findings_data + if is_native: + # Native FuzzForge format + findings_list = findings_data.get("findings", []) + workflow = findings_data.get("workflow", "Unknown") + summary = findings_data.get("summary", {}) + + simplified = { + "tool": { + "name": workflow, + "version": findings_data.get("version", "1.0.0") + }, + "summary": summary if summary else { + "total_issues": len(findings_list), + "by_severity": {} + }, + "findings": [] + } + + # Count by severity if not in summary + if not summary: + for finding in findings_list: + severity = finding.get("severity", "info") + simplified["summary"]["by_severity"][severity] = simplified["summary"]["by_severity"].get(severity, 0) + 1 + + # Extract simplified findings + for finding in findings_list: + location = finding.get("location", {}) + simplified["findings"].append({ + "id": finding.get("id"), + "rule_id": finding.get("rule_id", "unknown"), + "severity": finding.get("severity", "info"), + "confidence": finding.get("confidence", "medium"), + "title": finding.get("title", ""), + "description": finding.get("description", ""), + "category": finding.get("category", "other"), + "found_by": finding.get("found_by", {}), + "location": { + "file": location.get("file", ""), + "line": location.get("line_start"), + "column": location.get("column_start") + } + }) + else: + # SARIF format + runs = findings_data.get("runs", []) + if not runs: + return {"findings": [], "summary": {}} -def export_to_csv(sarif_data: Dict[str, Any], output_path: Path): - """Export findings to CSV format""" - runs = sarif_data.get("runs", []) - if not runs: - return - - results = runs[0].get("results", []) + run_data = runs[0] + results = run_data.get("results", []) + tool = run_data.get("tool", {}).get("driver", {}) - with open(output_path, 'w', newline='', encoding='utf-8') as csvfile: - fieldnames = ['rule_id', 'severity', 'message', 'file', 'line', 'column'] - writer = csv.DictWriter(csvfile, fieldnames=fieldnames) - writer.writeheader() + simplified = { + "tool": { + "name": tool.get("name", "Unknown"), + "version": tool.get("version", "Unknown") + }, + "summary": { + "total_issues": len(results), + "by_severity": {} + }, + "findings": [] + } for result in results: - location_info = {"file": "", "line": "", "column": ""} + level = result.get("level", "note") + simplified["summary"]["by_severity"][level] = simplified["summary"]["by_severity"].get(level, 0) + 1 + + # Extract location + location_info = {} locations = result.get("locations", []) if locations: physical_location = locations[0].get("physicalLocation", {}) @@ -874,109 +884,732 @@ def export_to_csv(sarif_data: Dict[str, Any], output_path: Path): location_info = { "file": artifact_location.get("uri", ""), - "line": region.get("startLine", ""), - "column": region.get("startColumn", "") + "line": region.get("startLine"), + "column": region.get("startColumn") } - writer.writerow({ - "rule_id": result.get("ruleId", ""), - "severity": result.get("level", "note"), + simplified["findings"].append({ + "rule_id": result.get("ruleId", "unknown"), + "severity": level, "message": result.get("message", {}).get("text", ""), - **location_info + "location": location_info }) + return simplified -def export_to_html(sarif_data: Dict[str, Any], output_path: Path, run_id: str): - """Export findings to HTML format""" - runs = sarif_data.get("runs", []) - if not runs: - return - run_data = runs[0] - results = run_data.get("results", []) - tool = run_data.get("tool", {}).get("driver", {}) +def export_to_csv(findings_data: Dict[str, Any], output_path: Path): + """Export findings to CSV format (supports both native and SARIF)""" + # Detect format + is_native = "findings" in findings_data and "version" in findings_data + + with open(output_path, 'w', newline='', encoding='utf-8') as csvfile: + if is_native: + # Native FuzzForge format - include more fields + fieldnames = ['id', 'rule_id', 'severity', 'confidence', 'title', 'category', 'module', 'file', 'line', 'column'] + writer = csv.DictWriter(csvfile, fieldnames=fieldnames) + writer.writeheader() + + findings_list = findings_data.get("findings", []) + for finding in findings_list: + location = finding.get("location", {}) + found_by = finding.get("found_by", {}) + + writer.writerow({ + "id": finding.get("id", "")[:8], + "rule_id": finding.get("rule_id", ""), + "severity": finding.get("severity", "info"), + "confidence": finding.get("confidence", "medium"), + "title": finding.get("title", ""), + "category": finding.get("category", ""), + "module": found_by.get("module", ""), + "file": location.get("file", ""), + "line": location.get("line_start", ""), + "column": location.get("column_start", "") + }) + else: + # SARIF format + fieldnames = ['rule_id', 'severity', 'message', 'file', 'line', 'column'] + writer = csv.DictWriter(csvfile, fieldnames=fieldnames) + writer.writeheader() + + runs = findings_data.get("runs", []) + if not runs: + return - # Simple HTML template + results = runs[0].get("results", []) + + for result in results: + location_info = {"file": "", "line": "", "column": ""} + locations = result.get("locations", []) + if locations: + physical_location = locations[0].get("physicalLocation", {}) + artifact_location = physical_location.get("artifactLocation", {}) + region = physical_location.get("region", {}) + + location_info = { + "file": artifact_location.get("uri", ""), + "line": region.get("startLine", ""), + "column": region.get("startColumn", "") + } + + writer.writerow({ + "rule_id": result.get("ruleId", ""), + "severity": result.get("level", "note"), + "message": result.get("message", {}).get("text", ""), + **location_info + }) + + +def export_to_html(findings_data: Dict[str, Any], output_path: Path, run_id: str): + """Export findings to modern, interactive HTML format with charts""" + import html + from datetime import datetime + + # Helper function to safely escape strings + def safe_escape(value): + """Safely escape a value, handling None and non-string types""" + if value is None: + return "" + return html.escape(str(value)) + + # Detect format (native or SARIF) + is_native = "findings" in findings_data and "version" in findings_data + + if is_native: + # Native FuzzForge format + findings_list = findings_data.get("findings", []) + workflow = findings_data.get("workflow", "Security Assessment") + summary = findings_data.get("summary", {}) + total_findings = len(findings_list) + else: + # SARIF format (backward compatibility) + runs = findings_data.get("runs", []) + if not runs: + # Empty report + findings_list = [] + workflow = "Security Assessment" + summary = {} + total_findings = 0 + else: + run_data = runs[0] + findings_list = run_data.get("results", []) + tool = run_data.get("tool", {}).get("driver", {}) + workflow = tool.get("name", "Security Assessment") + total_findings = len(findings_list) + summary = {} + + # Calculate statistics + severity_counts = {"critical": 0, "high": 0, "medium": 0, "low": 0, "info": 0} + confidence_counts = {"high": 0, "medium": 0, "low": 0} + category_counts = {} + source_counts = {} + type_counts = {} + + for finding in findings_list: + if is_native: + severity = finding.get("severity", "info") + confidence = finding.get("confidence", "medium") + category = finding.get("category", "other") + found_by = finding.get("found_by", {}) + source = found_by.get("module", "unknown") + detection_type = found_by.get("type", "tool") + else: + # Map SARIF levels to severity + level = finding.get("level", "note") + severity_map = {"error": "high", "warning": "medium", "note": "low", "none": "info"} + severity = severity_map.get(level, "info") + confidence = "medium" + category = "other" + source = "unknown" + detection_type = "tool" + + severity_counts[severity] = severity_counts.get(severity, 0) + 1 + confidence_counts[confidence] = confidence_counts.get(confidence, 0) + 1 + category_counts[category] = category_counts.get(category, 0) + 1 + source_counts[source] = source_counts.get(source, 0) + 1 + type_counts[detection_type] = type_counts.get(detection_type, 0) + 1 + + # Prepare chart data + severity_data = {k: v for k, v in severity_counts.items() if v > 0} + category_data = dict(sorted(category_counts.items(), key=lambda x: x[1], reverse=True)[:10]) + source_data = dict(sorted(source_counts.items(), key=lambda x: x[1], reverse=True)[:10]) + type_data = {k: v for k, v in type_counts.items() if v > 0} + + # Generate findings rows + findings_rows = "" + for idx, finding in enumerate(findings_list): + if is_native: + finding_id = finding.get("id", "")[:8] if finding.get("id") else "" + severity = finding.get("severity", "info") + confidence = finding.get("confidence", "medium") + title = safe_escape(finding.get("title") or "No title") + description = safe_escape(finding.get("description")) + rule_id = safe_escape(finding.get("rule_id") or "unknown") + category = safe_escape(finding.get("category") or "other") + + found_by = finding.get("found_by") or {} + module = safe_escape(found_by.get("module") or "unknown") + tool_name = safe_escape(found_by.get("tool_name") or "Unknown") + detection_type = found_by.get("type") or "tool" + + location = finding.get("location") or {} + file_path = safe_escape(location.get("file")) + line_start = location.get("line_start") + code_snippet = safe_escape(location.get("snippet")) + + cwe = safe_escape(finding.get("cwe")) + owasp = safe_escape(finding.get("owasp")) + recommendation = safe_escape(finding.get("recommendation")) + + llm_context = finding.get("llm_context") + if llm_context: + llm_model = safe_escape(llm_context.get("model")) + prompt_text = llm_context.get("prompt", "") + if prompt_text: + llm_prompt_preview = safe_escape(prompt_text[:100] + "..." if len(prompt_text) > 100 else prompt_text) + else: + llm_prompt_preview = "" + else: + llm_model = "" + llm_prompt_preview = "" + else: + # SARIF format + props = finding.get("properties") or {} + finding_id = props.get("findingId", "")[:8] if props.get("findingId") else "" + level = finding.get("level", "note") + severity_map = {"error": "high", "warning": "medium", "note": "low", "none": "info"} + severity = severity_map.get(level, "info") + confidence = "medium" + rule_id = safe_escape(finding.get("ruleId") or "unknown") + message = finding.get("message") or {} + title = safe_escape(message.get("text") or "No message") + description = title + category = "other" + module = "unknown" + tool_name = "Unknown" + detection_type = "tool" + + locations = finding.get("locations", []) + if locations: + physical_location = locations[0].get("physicalLocation") or {} + artifact_location = physical_location.get("artifactLocation") or {} + region = physical_location.get("region") or {} + file_path = safe_escape(artifact_location.get("uri")) + line_start = region.get("startLine") + snippet_obj = region.get("snippet") or {} + code_snippet = safe_escape(snippet_obj.get("text")) + else: + file_path = "" + line_start = None + code_snippet = "" + + cwe = "" + owasp = "" + recommendation = "" + llm_model = "" + llm_prompt_preview = "" + + location_str = file_path + if line_start: + location_str += f":{line_start}" + + severity_badge = { + "critical": 'CRITICAL', + "high": 'HIGH', + "medium": 'MEDIUM', + "low": 'LOW', + "info": 'INFO' + }.get(severity, 'INFO') + + confidence_badge = { + "high": 'High', + "medium": 'Medium', + "low": 'Low' + }.get(confidence, 'Medium') + + type_icon = { + "llm": "šŸ¤–", + "tool": "šŸ”§", + "fuzzer": "šŸŽÆ", + "manual": "šŸ‘¤" + }.get(detection_type, "šŸ”§") + + # Build details HTML + details_html = f""" + + """ + + findings_rows += f""" + + {finding_id} + {severity_badge} + {title} + {type_icon} {module} + {location_str} + + + {details_html} + + """ + + # Generate HTML html_content = f""" - + - Security Findings - {run_id} + + + Security Findings Report - {run_id} + + + + + + + + + + + + + + -
-

Security Findings Report

-

Run ID: {run_id}

-

Tool: {tool.get('name', 'Unknown')} v{tool.get('version', 'Unknown')}

-

Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}

+ +
+
+

Security Findings Report

+

{workflow}

+

Run ID: {run_id} | Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}

+
-
-

Summary

-

Total Issues: {len(results)}

+
+ +
+
+

šŸ“Š Executive Summary

+
+ +
+
+
+

{total_findings}

+

Total Findings

+
+
+
+ +
+
+
+

{severity_counts['critical'] + severity_counts['high']}

+

Critical + High

+
+
+
+ +
+
+
+

{severity_counts['medium']}

+

Medium

+
+
+
+ +
+
+
+

{severity_counts['low'] + severity_counts['info']}

+

Low + Info

+
+
+
+
+ + +
+
+
+
+
Severity Distribution
+
+ +
+
+
+
+ +
+
+
+
Detection Type
+
+ +
+
+
+
+ +
+
+
+
Top Categories
+
+ +
+
+
+
+ +
+
+
+
Findings by Source
+
+ +
+
+
+
+
+ + +
+
+
+
+
šŸ” Detailed Findings
+ + +
+
+ +
+
+ +
+
+ +
+
+ +
+
+ + +
+ + + + + + + + + + + + {findings_rows} + +
IDSeverityFindingSourceLocation
+
+ +

+ {total_findings} of {total_findings} findings shown. + Click on a row to view details. +

+
+
+
+
-
-

Detailed Findings

- - - - - - - - - - -""" - - for result in results: - level = result.get("level", "note") - rule_id = result.get("ruleId", "unknown") - message = result.get("message", {}).get("text", "") - - # Extract location - location_str = "" - locations = result.get("locations", []) - if locations: - physical_location = locations[0].get("physicalLocation", {}) - artifact_location = physical_location.get("artifactLocation", {}) - region = physical_location.get("region", {}) - - file_path = artifact_location.get("uri", "") - if file_path: - location_str = file_path - if region.get("startLine"): - location_str += f":{region['startLine']}" - - html_content += f""" - - - - - - - """ - - html_content += """ - -
Rule IDSeverityMessageLocation
{rule_id}{level}{message}{location_str}
-
+ + + + - """ +""" with open(output_path, 'w', encoding='utf-8') as f: f.write(html_content) From d29fc7e263b5f9f4d21a0e8d4796abee3cbada49 Mon Sep 17 00:00:00 2001 From: Capton1 Date: Fri, 7 Nov 2025 14:02:21 +0700 Subject: [PATCH 11/16] feat: make finding reports having the fuzzforge styling and colors --- cli/src/fuzzforge_cli/commands/findings.py | 834 +++++++++++++++------ 1 file changed, 622 insertions(+), 212 deletions(-) diff --git a/cli/src/fuzzforge_cli/commands/findings.py b/cli/src/fuzzforge_cli/commands/findings.py index ea5a5364..7eada205 100644 --- a/cli/src/fuzzforge_cli/commands/findings.py +++ b/cli/src/fuzzforge_cli/commands/findings.py @@ -1115,18 +1115,18 @@ def safe_escape(value): location_str += f":{line_start}" severity_badge = { - "critical": 'CRITICAL', - "high": 'HIGH', - "medium": 'MEDIUM', - "low": 'LOW', - "info": 'INFO' - }.get(severity, 'INFO') + "critical": 'CRITICAL', + "high": 'HIGH', + "medium": 'MEDIUM', + "low": 'LOW', + "info": 'INFO' + }.get(severity, 'INFO') confidence_badge = { - "high": 'High', - "medium": 'Medium', - "low": 'Low' - }.get(confidence, 'Medium') + "high": 'High', + "medium": 'Medium', + "low": 'Low' + }.get(confidence, 'Medium') type_icon = { "llm": "šŸ¤–", @@ -1138,33 +1138,31 @@ def safe_escape(value): # Build details HTML details_html = f"""