diff --git a/backend/src/models/finding_schema.py b/backend/src/models/finding_schema.py new file mode 100644 index 00000000..a58b8299 --- /dev/null +++ b/backend/src/models/finding_schema.py @@ -0,0 +1,166 @@ +""" +FuzzForge Native Finding Format Schema + +This module defines the native finding format used internally by FuzzForge. +This format is more expressive than SARIF and optimized for security testing workflows. +""" + +# Copyright (c) 2025 FuzzingLabs +# +# Licensed under the Business Source License 1.1 (BSL). See the LICENSE file +# at the root of this repository for details. +# +# After the Change Date (four years from publication), this version of the +# Licensed Work will be made available under the Apache License, Version 2.0. +# See the LICENSE-APACHE file or http://www.apache.org/licenses/LICENSE-2.0 +# +# Additional attribution and requirements are provided in the NOTICE file. + +from typing import Dict, Any, List, Optional, Literal +from pydantic import BaseModel, Field +from datetime import datetime + + +class FoundBy(BaseModel): + """Information about who/what found the vulnerability""" + module: str = Field(..., description="FuzzForge module that detected the finding (e.g., 'semgrep_scanner', 'llm_analysis')") + tool_name: str = Field(..., description="Name of the underlying tool (e.g., 'Semgrep', 'Claude-3.5-Sonnet', 'MobSF')") + tool_version: str = Field(..., description="Version of the tool") + type: Literal["llm", "tool", "fuzzer", "manual"] = Field(..., description="Type of detection method") + + +class LLMContext(BaseModel): + """Context information for LLM-detected findings""" + model: str = Field(..., description="LLM model used (e.g., 'claude-3-5-sonnet-20250129')") + prompt: str = Field(..., description="Prompt or analysis instructions used") + temperature: Optional[float] = Field(None, description="Temperature parameter used for generation") + + +class Location(BaseModel): + """Location information for a finding""" + file: str = Field(..., description="File path relative to workspace root") + line_start: Optional[int] = Field(None, description="Starting line number (1-indexed)") + line_end: Optional[int] = Field(None, description="Ending line number (1-indexed)") + column_start: Optional[int] = Field(None, description="Starting column number (1-indexed)") + column_end: Optional[int] = Field(None, description="Ending column number (1-indexed)") + snippet: Optional[str] = Field(None, description="Code snippet at the location") + + +class Finding(BaseModel): + """Individual security finding""" + id: str = Field(..., description="Unique finding identifier (UUID)") + rule_id: str = Field(..., description="Rule/pattern identifier (e.g., 'sql_injection', 'hardcoded_secret')") + found_by: FoundBy = Field(..., description="Detection attribution") + llm_context: Optional[LLMContext] = Field(None, description="LLM-specific context (only if found_by.type == 'llm')") + + title: str = Field(..., description="Short finding title") + description: str = Field(..., description="Detailed description of the finding") + + severity: Literal["critical", "high", "medium", "low", "info"] = Field(..., description="Severity level") + confidence: Literal["high", "medium", "low"] = Field(..., description="Confidence level in the finding") + + category: str = Field(..., description="Finding category (e.g., 'injection', 'authentication', 'cryptography')") + cwe: Optional[str] = Field(None, description="CWE identifier (e.g., 'CWE-89')") + owasp: Optional[str] = Field(None, description="OWASP category (e.g., 'A03:2021-Injection')") + + location: Optional[Location] = Field(None, description="Location of the finding in source code") + + recommendation: Optional[str] = Field(None, description="Remediation recommendation") + references: List[str] = Field(default_factory=list, description="External references (URLs, documentation)") + + metadata: Dict[str, Any] = Field(default_factory=dict, description="Additional metadata") + + +class FindingsSummary(BaseModel): + """Summary statistics for findings""" + total_findings: int = Field(..., description="Total number of findings") + by_severity: Dict[str, int] = Field(default_factory=dict, description="Count by severity level") + by_confidence: Dict[str, int] = Field(default_factory=dict, description="Count by confidence level") + by_category: Dict[str, int] = Field(default_factory=dict, description="Count by category") + by_source: Dict[str, int] = Field(default_factory=dict, description="Count by detection source (module name)") + by_type: Dict[str, int] = Field(default_factory=dict, description="Count by detection type (llm/tool/fuzzer)") + affected_files: int = Field(0, description="Number of unique files with findings") + + +class FuzzForgeFindingsReport(BaseModel): + """Native FuzzForge findings report format""" + version: str = Field(default="1.0.0", description="Format version") + run_id: str = Field(..., description="Workflow run identifier") + workflow: str = Field(..., description="Workflow name") + timestamp: datetime = Field(default_factory=datetime.utcnow, description="Report generation timestamp") + + findings: List[Finding] = Field(default_factory=list, description="List of security findings") + summary: FindingsSummary = Field(..., description="Summary statistics") + + metadata: Dict[str, Any] = Field(default_factory=dict, description="Additional report metadata") + + +# JSON Schema export for documentation +FINDING_SCHEMA_VERSION = "1.0.0" + +def get_json_schema() -> Dict[str, Any]: + """Get JSON schema for the FuzzForge findings format""" + return FuzzForgeFindingsReport.model_json_schema() + + +def validate_findings_report(data: Dict[str, Any]) -> FuzzForgeFindingsReport: + """ + Validate a findings report against the schema + + Args: + data: Dictionary containing findings report data + + Returns: + Validated FuzzForgeFindingsReport object + + Raises: + ValidationError: If data doesn't match schema + """ + return FuzzForgeFindingsReport(**data) + + +def create_summary(findings: List[Finding]) -> FindingsSummary: + """ + Generate summary statistics from a list of findings + + Args: + findings: List of Finding objects + + Returns: + FindingsSummary with aggregated statistics + """ + summary = FindingsSummary( + total_findings=len(findings), + by_severity={}, + by_confidence={}, + by_category={}, + by_source={}, + by_type={}, + affected_files=0 + ) + + affected_files = set() + + for finding in findings: + # Count by severity + summary.by_severity[finding.severity] = summary.by_severity.get(finding.severity, 0) + 1 + + # Count by confidence + summary.by_confidence[finding.confidence] = summary.by_confidence.get(finding.confidence, 0) + 1 + + # Count by category + summary.by_category[finding.category] = summary.by_category.get(finding.category, 0) + 1 + + # Count by source (module) + summary.by_source[finding.found_by.module] = summary.by_source.get(finding.found_by.module, 0) + 1 + + # Count by type + summary.by_type[finding.found_by.type] = summary.by_type.get(finding.found_by.type, 0) + 1 + + # Track affected files + if finding.location and finding.location.file: + affected_files.add(finding.location.file) + + summary.affected_files = len(affected_files) + + return summary diff --git a/backend/src/models/findings.py b/backend/src/models/findings.py index b71a9b64..0eaea6bf 100644 --- a/backend/src/models/findings.py +++ b/backend/src/models/findings.py @@ -19,10 +19,10 @@ class WorkflowFindings(BaseModel): - """Findings from a workflow execution in SARIF format""" + """Findings from a workflow execution in native FuzzForge format""" workflow: str = Field(..., description="Workflow name") run_id: str = Field(..., description="Unique run identifier") - sarif: Dict[str, Any] = Field(..., description="SARIF formatted findings") + sarif: Dict[str, Any] = Field(..., description="Findings in native FuzzForge format (field name kept for API compatibility)") metadata: Dict[str, Any] = Field(default_factory=dict, description="Additional metadata") diff --git a/backend/tests/unit/test_modules/test_atheris_fuzzer.py b/backend/tests/unit/test_modules/test_atheris_fuzzer.py index 9cd01cee..8873aa6a 100644 --- a/backend/tests/unit/test_modules/test_atheris_fuzzer.py +++ b/backend/tests/unit/test_modules/test_atheris_fuzzer.py @@ -159,11 +159,20 @@ class TestAtherisFuzzerFindingGeneration: async def test_create_crash_finding(self, atheris_fuzzer): """Test crash finding creation""" + from modules.base import FoundBy + finding = atheris_fuzzer.create_finding( + rule_id="atheris-crash", title="Crash: Exception in TestOneInput", description="IndexError: list index out of range", severity="high", category="crash", + found_by=FoundBy( + module="atheris_fuzzer", + tool_name="Atheris", + tool_version="2.3.0", + type="fuzzer" + ), file_path="fuzz_target.py", metadata={ "crash_type": "IndexError", diff --git a/backend/tests/unit/test_modules/test_cargo_fuzzer.py b/backend/tests/unit/test_modules/test_cargo_fuzzer.py index f550b9a1..ef20b774 100644 --- a/backend/tests/unit/test_modules/test_cargo_fuzzer.py +++ b/backend/tests/unit/test_modules/test_cargo_fuzzer.py @@ -161,11 +161,20 @@ class TestCargoFuzzerFindingGeneration: async def test_create_finding_from_crash(self, cargo_fuzzer): """Test finding creation""" + from modules.base import FoundBy + finding = cargo_fuzzer.create_finding( + rule_id="cargo-fuzz-crash", title="Crash: Segmentation Fault", description="Test crash", severity="critical", category="crash", + found_by=FoundBy( + module="cargo_fuzz", + tool_name="cargo-fuzz", + tool_version="0.11.2", + type="fuzzer" + ), file_path="fuzz/fuzz_targets/fuzz_target_1.rs", metadata={"crash_type": "SIGSEGV"} ) diff --git a/backend/toolbox/modules/analyzer/bandit_analyzer.py b/backend/toolbox/modules/analyzer/bandit_analyzer.py index ecf81a84..b5c20746 100644 --- a/backend/toolbox/modules/analyzer/bandit_analyzer.py +++ b/backend/toolbox/modules/analyzer/bandit_analyzer.py @@ -21,12 +21,12 @@ from typing import Dict, Any, List try: - from toolbox.modules.base import BaseModule, ModuleMetadata, ModuleResult, ModuleFinding + from toolbox.modules.base import BaseModule, ModuleMetadata, ModuleResult, ModuleFinding, FoundBy except ImportError: try: - from modules.base import BaseModule, ModuleMetadata, ModuleResult, ModuleFinding + from modules.base import BaseModule, ModuleMetadata, ModuleResult, ModuleFinding, FoundBy except ImportError: - from src.toolbox.modules.base import BaseModule, ModuleMetadata, ModuleResult, ModuleFinding + from src.toolbox.modules.base import BaseModule, ModuleMetadata, ModuleResult, ModuleFinding, FoundBy logger = logging.getLogger(__name__) @@ -237,12 +237,34 @@ def _convert_to_findings( except (ValueError, TypeError): rel_path = Path(filename).name + # Extract confidence and CWE + confidence = issue.get("issue_confidence", "LOW").lower() + cwe_info = issue.get("issue_cwe", {}) + cwe_id = f"CWE-{cwe_info.get('id')}" if cwe_info and cwe_info.get("id") else None + + # Create FoundBy attribution + # Try to get Bandit version from metrics, fall back to unknown + bandit_version = "unknown" + if "metrics" in bandit_result: + bandit_version = bandit_result["metrics"].get("_version", "unknown") + + found_by = FoundBy( + module="bandit_analyzer", + tool_name="Bandit", + tool_version=bandit_version, + type="tool" + ) + # Create finding finding = self.create_finding( + rule_id=test_id, title=f"{test_name} ({test_id})", description=issue_text, severity=severity, category="security-issue", + found_by=found_by, + confidence=confidence, + cwe=cwe_id, file_path=str(rel_path), line_start=line_number, line_end=line_number, @@ -251,8 +273,6 @@ def _convert_to_findings( metadata={ "test_id": test_id, "test_name": test_name, - "confidence": issue.get("issue_confidence", "LOW").lower(), - "cwe": issue.get("issue_cwe", {}).get("id") if issue.get("issue_cwe") else None, "more_info": issue.get("more_info", "") } ) diff --git a/backend/toolbox/modules/analyzer/llm_analyzer.py b/backend/toolbox/modules/analyzer/llm_analyzer.py index b3b13748..1d504cf7 100644 --- a/backend/toolbox/modules/analyzer/llm_analyzer.py +++ b/backend/toolbox/modules/analyzer/llm_analyzer.py @@ -18,12 +18,12 @@ from typing import Dict, Any, List try: - from toolbox.modules.base import BaseModule, ModuleMetadata, ModuleResult + from toolbox.modules.base import BaseModule, ModuleMetadata, ModuleResult, FoundBy, LLMContext except ImportError: try: - from modules.base import BaseModule, ModuleMetadata, ModuleResult + from modules.base import BaseModule, ModuleMetadata, ModuleResult, FoundBy, LLMContext except ImportError: - from src.toolbox.modules.base import BaseModule, ModuleMetadata, ModuleResult + from src.toolbox.modules.base import BaseModule, ModuleMetadata, ModuleResult, FoundBy, LLMContext logger = logging.getLogger(__name__) @@ -270,10 +270,14 @@ async def _analyze_file( return [] # Parse LLM response into findings + full_prompt = f"{system_prompt}\n\n{user_message}" findings = self._parse_llm_response( llm_response=llm_response, file_path=file_path, - workspace=workspace + workspace=workspace, + llm_model=llm_model, + llm_provider=llm_provider, + prompt=full_prompt ) return findings @@ -282,7 +286,10 @@ def _parse_llm_response( self, llm_response: str, file_path: Path, - workspace: Path + workspace: Path, + llm_model: str, + llm_provider: str, + prompt: str ) -> List: """Parse LLM response into structured findings""" @@ -302,7 +309,9 @@ def _parse_llm_response( if line.startswith("ISSUE:"): # Save previous issue if exists if current_issue: - findings.append(self._create_module_finding(current_issue, relative_path)) + findings.append(self._create_module_finding( + current_issue, relative_path, llm_model, llm_provider, prompt + )) current_issue = {"title": line.replace("ISSUE:", "").strip()} elif line.startswith("SEVERITY:"): @@ -320,11 +329,20 @@ def _parse_llm_response( # Save last issue if current_issue: - findings.append(self._create_module_finding(current_issue, relative_path)) + findings.append(self._create_module_finding( + current_issue, relative_path, llm_model, llm_provider, prompt + )) return findings - def _create_module_finding(self, issue: Dict[str, Any], file_path: str): + def _create_module_finding( + self, + issue: Dict[str, Any], + file_path: str, + llm_model: str, + llm_provider: str, + prompt: str + ): """Create a ModuleFinding from parsed issue""" severity_map = { @@ -334,12 +352,39 @@ def _create_module_finding(self, issue: Dict[str, Any], file_path: str): "info": "low" } + # Determine confidence based on severity (LLM is more confident on critical issues) + confidence_map = { + "error": "high", + "warning": "medium", + "note": "medium", + "info": "low" + } + + # Create FoundBy attribution + found_by = FoundBy( + module="llm_analyzer", + tool_name=f"{llm_provider}/{llm_model}", + tool_version="1.0.0", + type="llm" + ) + + # Create LLM context + llm_context = LLMContext( + model=llm_model, + prompt=prompt, + temperature=None # Not exposed in current config + ) + # Use base class helper to create proper ModuleFinding return self.create_finding( + rule_id=f"llm_security_{issue.get('severity', 'warning')}", title=issue.get("title", "Security issue detected"), description=issue.get("description", ""), severity=severity_map.get(issue.get("severity", "warning"), "medium"), category="security", + found_by=found_by, + confidence=confidence_map.get(issue.get("severity", "warning"), "medium"), + llm_context=llm_context, file_path=file_path, line_start=issue.get("line"), metadata={ diff --git a/backend/toolbox/modules/analyzer/mypy_analyzer.py b/backend/toolbox/modules/analyzer/mypy_analyzer.py index 9d3e39f4..e9823c53 100644 --- a/backend/toolbox/modules/analyzer/mypy_analyzer.py +++ b/backend/toolbox/modules/analyzer/mypy_analyzer.py @@ -21,12 +21,12 @@ from typing import Dict, Any, List try: - from toolbox.modules.base import BaseModule, ModuleMetadata, ModuleResult, ModuleFinding + from toolbox.modules.base import BaseModule, ModuleMetadata, ModuleResult, ModuleFinding, FoundBy except ImportError: try: - from modules.base import BaseModule, ModuleMetadata, ModuleResult, ModuleFinding + from modules.base import BaseModule, ModuleMetadata, ModuleResult, ModuleFinding, FoundBy except ImportError: - from src.toolbox.modules.base import BaseModule, ModuleMetadata, ModuleResult, ModuleFinding + from src.toolbox.modules.base import BaseModule, ModuleMetadata, ModuleResult, ModuleFinding, FoundBy logger = logging.getLogger(__name__) @@ -189,18 +189,29 @@ def _parse_mypy_output(self, output: str, workspace: Path) -> List[ModuleFinding title = f"Type error: {error_code or 'type-issue'}" description = message + # Create FoundBy attribution + found_by = FoundBy( + module="mypy_analyzer", + tool_name="Mypy", + tool_version="unknown", # Mypy doesn't include version in output + type="tool" + ) + finding = self.create_finding( + rule_id=error_code or "type-issue", title=title, description=description, severity=severity, category="type-error", + found_by=found_by, + confidence="high", # Mypy is highly confident in its type checking file_path=str(rel_path), line_start=int(line_num), line_end=int(line_num), + column_start=int(column) if column else None, recommendation="Review and fix the type inconsistency or add appropriate type annotations", metadata={ "error_code": error_code or "unknown", - "column": int(column) if column else None, "level": level } ) diff --git a/backend/toolbox/modules/analyzer/security_analyzer.py b/backend/toolbox/modules/analyzer/security_analyzer.py index 3b4a2ea1..0f537da1 100644 --- a/backend/toolbox/modules/analyzer/security_analyzer.py +++ b/backend/toolbox/modules/analyzer/security_analyzer.py @@ -19,12 +19,12 @@ from typing import Dict, Any, List try: - from toolbox.modules.base import BaseModule, ModuleMetadata, ModuleResult, ModuleFinding + from toolbox.modules.base import BaseModule, ModuleMetadata, ModuleResult, ModuleFinding, FoundBy except ImportError: try: - from modules.base import BaseModule, ModuleMetadata, ModuleResult, ModuleFinding + from modules.base import BaseModule, ModuleMetadata, ModuleResult, ModuleFinding, FoundBy except ImportError: - from src.toolbox.modules.base import BaseModule, ModuleMetadata, ModuleResult, ModuleFinding + from src.toolbox.modules.base import BaseModule, ModuleMetadata, ModuleResult, ModuleFinding, FoundBy logger = logging.getLogger(__name__) @@ -217,11 +217,22 @@ def _check_hardcoded_secrets( if self._is_false_positive_secret(match.group(0)): continue + # Create FoundBy attribution + found_by = FoundBy( + module="security_analyzer", + tool_name="Security Analyzer", + tool_version="1.0.0", + type="tool" + ) + findings.append(self.create_finding( + rule_id=f"hardcoded_{secret_type.lower().replace(' ', '_')}", title=f"Hardcoded {secret_type} detected", description=f"Found potential hardcoded {secret_type} in {file_path}", severity="high" if "key" in secret_type.lower() else "medium", category="hardcoded_secret", + found_by=found_by, + confidence="medium", file_path=str(file_path), line_start=line_num, code_snippet=line_content.strip()[:100], @@ -261,11 +272,23 @@ def _check_sql_injection( line_num = content[:match.start()].count('\n') + 1 line_content = lines[line_num - 1] if line_num <= len(lines) else "" + # Create FoundBy attribution + found_by = FoundBy( + module="security_analyzer", + tool_name="Security Analyzer", + tool_version="1.0.0", + type="tool" + ) + findings.append(self.create_finding( + rule_id=f"sql_injection_{vuln_type.lower().replace(' ', '_')}", title=f"Potential SQL Injection: {vuln_type}", description=f"Detected potential SQL injection vulnerability via {vuln_type}", severity="high", category="sql_injection", + found_by=found_by, + confidence="medium", + cwe="CWE-89", file_path=str(file_path), line_start=line_num, code_snippet=line_content.strip()[:100], @@ -323,11 +346,22 @@ def _check_dangerous_functions( line_num = content[:match.start()].count('\n') + 1 line_content = lines[line_num - 1] if line_num <= len(lines) else "" + # Create FoundBy attribution + found_by = FoundBy( + module="security_analyzer", + tool_name="Security Analyzer", + tool_version="1.0.0", + type="tool" + ) + findings.append(self.create_finding( + rule_id=f"dangerous_function_{func_name.replace('()', '').replace('.', '_')}", title=f"Dangerous function: {func_name}", description=f"Use of potentially dangerous function {func_name}: {risk_type}", severity="medium", category="dangerous_function", + found_by=found_by, + confidence="medium", file_path=str(file_path), line_start=line_num, code_snippet=line_content.strip()[:100], diff --git a/backend/toolbox/modules/android/mobsf_scanner.py b/backend/toolbox/modules/android/mobsf_scanner.py index 3b16e1b4..07073fe3 100644 --- a/backend/toolbox/modules/android/mobsf_scanner.py +++ b/backend/toolbox/modules/android/mobsf_scanner.py @@ -24,12 +24,12 @@ import aiohttp try: - from toolbox.modules.base import BaseModule, ModuleMetadata, ModuleFinding, ModuleResult + from toolbox.modules.base import BaseModule, ModuleMetadata, ModuleFinding, ModuleResult, FoundBy except ImportError: try: - from modules.base import BaseModule, ModuleMetadata, ModuleFinding, ModuleResult + from modules.base import BaseModule, ModuleMetadata, ModuleFinding, ModuleResult, FoundBy except ImportError: - from src.toolbox.modules.base import BaseModule, ModuleMetadata, ModuleFinding, ModuleResult + from src.toolbox.modules.base import BaseModule, ModuleMetadata, ModuleFinding, ModuleResult, FoundBy logger = logging.getLogger(__name__) @@ -278,6 +278,14 @@ def _parse_scan_results(self, scan_data: Dict[str, Any], apk_path: Path) -> List """Parse MobSF JSON results into standardized findings""" findings = [] + # Create FoundBy attribution for all MobSF findings + found_by = FoundBy( + module="mobsf_scanner", + tool_name="MobSF", + tool_version="3.9.7", + type="tool" + ) + # Parse permissions if 'permissions' in scan_data: for perm_name, perm_attrs in scan_data['permissions'].items(): @@ -287,10 +295,13 @@ def _parse_scan_results(self, scan_data: Dict[str, Any], apk_path: Path) -> List ) finding = self.create_finding( + rule_id=f"android_permission_{perm_name.replace('.', '_')}", title=f"Android Permission: {perm_name}", description=perm_attrs.get('description', 'No description'), severity=severity, category="android-permission", + found_by=found_by, + confidence="high", metadata={ 'permission': perm_name, 'status': perm_attrs.get('status'), @@ -307,13 +318,19 @@ def _parse_scan_results(self, scan_data: Dict[str, Any], apk_path: Path) -> List if isinstance(item, dict): severity = self.SEVERITY_MAP.get(item.get('severity', '').lower(), 'medium') + title = item.get('title') or item.get('name') or "Manifest Issue" + rule = item.get('rule') or "manifest_issue" + finding = self.create_finding( - title=item.get('title') or item.get('name') or "Manifest Issue", + rule_id=f"android_manifest_{rule.replace(' ', '_').replace('-', '_')}", + title=title, description=item.get('description', 'No description'), severity=severity, category="android-manifest", + found_by=found_by, + confidence="high", metadata={ - 'rule': item.get('rule'), + 'rule': rule, 'tool': 'mobsf', } ) @@ -335,16 +352,32 @@ def _parse_scan_results(self, scan_data: Dict[str, Any], apk_path: Path) -> List # Create a finding for each affected file if isinstance(files_dict, dict) and files_dict: for file_path, line_numbers in files_dict.items(): + # Extract first line number if available + line_start = None + if line_numbers: + try: + # Can be string like "28" or "65,81" + line_start = int(str(line_numbers).split(',')[0]) + except (ValueError, AttributeError): + pass + + # Extract CWE from metadata + cwe_value = metadata_dict.get('cwe') + cwe_id = f"CWE-{cwe_value}" if cwe_value else None + finding = self.create_finding( + rule_id=finding_name.replace(' ', '_').replace('-', '_'), title=finding_name, description=metadata_dict.get('description', 'No description'), severity=severity, category="android-code-analysis", + found_by=found_by, + confidence="medium", + cwe=cwe_id, + owasp=metadata_dict.get('owasp'), file_path=file_path, - line_number=line_numbers, # Can be string like "28" or "65,81" + line_start=line_start, metadata={ - 'cwe': metadata_dict.get('cwe'), - 'owasp': metadata_dict.get('owasp'), 'masvs': metadata_dict.get('masvs'), 'cvss': metadata_dict.get('cvss'), 'ref': metadata_dict.get('ref'), @@ -355,14 +388,21 @@ def _parse_scan_results(self, scan_data: Dict[str, Any], apk_path: Path) -> List findings.append(finding) else: # Fallback: create one finding without file info + # Extract CWE from metadata + cwe_value = metadata_dict.get('cwe') + cwe_id = f"CWE-{cwe_value}" if cwe_value else None + finding = self.create_finding( + rule_id=finding_name.replace(' ', '_').replace('-', '_'), title=finding_name, description=metadata_dict.get('description', 'No description'), severity=severity, category="android-code-analysis", + found_by=found_by, + confidence="medium", + cwe=cwe_id, + owasp=metadata_dict.get('owasp'), metadata={ - 'cwe': metadata_dict.get('cwe'), - 'owasp': metadata_dict.get('owasp'), 'masvs': metadata_dict.get('masvs'), 'cvss': metadata_dict.get('cvss'), 'ref': metadata_dict.get('ref'), @@ -389,13 +429,25 @@ def _parse_scan_results(self, scan_data: Dict[str, Any], apk_path: Path) -> List # Create a finding for each affected file if isinstance(files_dict, dict) and files_dict: for file_path, line_numbers in files_dict.items(): + # Extract first line number if available + line_start = None + if line_numbers: + try: + # Can be string like "28" or "65,81" + line_start = int(str(line_numbers).split(',')[0]) + except (ValueError, AttributeError): + pass + finding = self.create_finding( + rule_id=f"android_behavior_{key.replace(' ', '_').replace('-', '_')}", title=f"Behavior: {label}", description=metadata_dict.get('description', 'No description'), severity=severity, category="android-behavior", + found_by=found_by, + confidence="medium", file_path=file_path, - line_number=line_numbers, + line_start=line_start, metadata={ 'line_numbers': line_numbers, 'behavior_key': key, @@ -406,10 +458,13 @@ def _parse_scan_results(self, scan_data: Dict[str, Any], apk_path: Path) -> List else: # Fallback: create one finding without file info finding = self.create_finding( + rule_id=f"android_behavior_{key.replace(' ', '_').replace('-', '_')}", title=f"Behavior: {label}", description=metadata_dict.get('description', 'No description'), severity=severity, category="android-behavior", + found_by=found_by, + confidence="medium", metadata={ 'behavior_key': key, 'tool': 'mobsf', diff --git a/backend/toolbox/modules/android/opengrep_android.py b/backend/toolbox/modules/android/opengrep_android.py index 01e32c4e..d3ce9267 100644 --- a/backend/toolbox/modules/android/opengrep_android.py +++ b/backend/toolbox/modules/android/opengrep_android.py @@ -23,12 +23,12 @@ from typing import Dict, Any, List try: - from toolbox.modules.base import BaseModule, ModuleMetadata, ModuleFinding, ModuleResult + from toolbox.modules.base import BaseModule, ModuleMetadata, ModuleFinding, ModuleResult, FoundBy except ImportError: try: - from modules.base import BaseModule, ModuleMetadata, ModuleFinding, ModuleResult + from modules.base import BaseModule, ModuleMetadata, ModuleFinding, ModuleResult, FoundBy except ImportError: - from src.toolbox.modules.base import BaseModule, ModuleMetadata, ModuleFinding, ModuleResult + from src.toolbox.modules.base import BaseModule, ModuleMetadata, ModuleFinding, ModuleResult, FoundBy logger = logging.getLogger(__name__) @@ -302,23 +302,40 @@ def _parse_opengrep_output(self, output: str, workspace: Path, config: Dict[str, # Map severity to our standard levels finding_severity = self._map_severity(severity) + # Map confidence + confidence_map = {"HIGH": "high", "MEDIUM": "medium", "LOW": "low"} + finding_confidence = confidence_map.get(confidence, "medium") + + # Format CWE and OWASP + cwe_id = f"CWE-{cwe[0]}" if cwe and isinstance(cwe, list) and cwe else None + owasp_str = owasp[0] if owasp and isinstance(owasp, list) and owasp else None + + # Create FoundBy attribution + found_by = FoundBy( + module="opengrep_android", + tool_name="OpenGrep", + tool_version="1.45.0", + type="tool" + ) + # Create finding finding = self.create_finding( + rule_id=rule_id, title=f"Android Security: {rule_id}", description=message or f"OpenGrep rule {rule_id} triggered", severity=finding_severity, category=self._get_category(rule_id, extra), + found_by=found_by, + confidence=finding_confidence, + cwe=cwe_id, + owasp=owasp_str, file_path=path_info if path_info else None, line_start=start_line if start_line > 0 else None, line_end=end_line if end_line > 0 and end_line != start_line else None, code_snippet=lines.strip() if lines else None, recommendation=self._get_recommendation(rule_id, extra), metadata={ - "rule_id": rule_id, "opengrep_severity": severity, - "confidence": confidence, - "cwe": cwe, - "owasp": owasp, "fix": extra.get("fix", ""), "impact": extra.get("impact", ""), "likelihood": extra.get("likelihood", ""), diff --git a/backend/toolbox/modules/base.py b/backend/toolbox/modules/base.py index dcef98d1..2990edeb 100644 --- a/backend/toolbox/modules/base.py +++ b/backend/toolbox/modules/base.py @@ -35,18 +35,48 @@ class ModuleMetadata(BaseModel): requires_workspace: bool = Field(True, description="Whether module requires workspace access") +class FoundBy(BaseModel): + """Information about who/what found the vulnerability""" + module: str = Field(..., description="FuzzForge module that detected the finding") + tool_name: str = Field(..., description="Name of the underlying tool") + tool_version: str = Field(..., description="Version of the tool") + type: str = Field(..., description="Type of detection method (llm, tool, fuzzer, manual)") + + +class LLMContext(BaseModel): + """Context information for LLM-detected findings""" + model: str = Field(..., description="LLM model used") + prompt: str = Field(..., description="Prompt or analysis instructions used") + temperature: Optional[float] = Field(None, description="Temperature parameter used for generation") + + class ModuleFinding(BaseModel): """Individual finding from a module""" - id: str = Field(..., description="Unique finding ID") + id: str = Field(..., description="Unique finding ID (UUID)") + rule_id: str = Field(..., description="Rule/pattern identifier") + found_by: FoundBy = Field(..., description="Detection attribution") + llm_context: Optional[LLMContext] = Field(None, description="LLM-specific context") + title: str = Field(..., description="Finding title") description: str = Field(..., description="Detailed description") - severity: str = Field(..., description="Severity level (info, low, medium, high, critical)") + + severity: str = Field(..., description="Severity level (critical, high, medium, low, info)") + confidence: str = Field(default="medium", description="Confidence level (high, medium, low)") + category: str = Field(..., description="Finding category") + cwe: Optional[str] = Field(None, description="CWE identifier (e.g., 'CWE-89')") + owasp: Optional[str] = Field(None, description="OWASP category") + file_path: Optional[str] = Field(None, description="Affected file path relative to workspace") line_start: Optional[int] = Field(None, description="Starting line number") line_end: Optional[int] = Field(None, description="Ending line number") + column_start: Optional[int] = Field(None, description="Starting column number") + column_end: Optional[int] = Field(None, description="Ending column number") code_snippet: Optional[str] = Field(None, description="Relevant code snippet") + recommendation: Optional[str] = Field(None, description="Remediation recommendation") + references: List[str] = Field(default_factory=list, description="External references") + metadata: Dict[str, Any] = Field(default_factory=dict, description="Additional metadata") @@ -140,20 +170,32 @@ def validate_workspace(self, workspace: Path) -> bool: def create_finding( self, + rule_id: str, title: str, description: str, severity: str, category: str, + found_by: FoundBy, + confidence: str = "medium", + llm_context: Optional[LLMContext] = None, + cwe: Optional[str] = None, + owasp: Optional[str] = None, **kwargs ) -> ModuleFinding: """ Helper method to create a standardized finding. Args: + rule_id: Rule/pattern identifier title: Finding title description: Detailed description - severity: Severity level + severity: Severity level (critical, high, medium, low, info) category: Finding category + found_by: Detection attribution (FoundBy object) + confidence: Confidence level (high, medium, low) + llm_context: Optional LLM context information + cwe: Optional CWE identifier + owasp: Optional OWASP category **kwargs: Additional finding fields Returns: @@ -164,10 +206,16 @@ def create_finding( return ModuleFinding( id=finding_id, + rule_id=rule_id, + found_by=found_by, + llm_context=llm_context, title=title, description=description, severity=severity, + confidence=confidence, category=category, + cwe=cwe, + owasp=owasp, **kwargs ) @@ -226,29 +274,62 @@ def _generate_summary(self, findings: List[ModuleFinding]) -> Dict[str, Any]: Summary dictionary """ severity_counts = { - "info": 0, - "low": 0, + "critical": 0, + "high": 0, "medium": 0, + "low": 0, + "info": 0 + } + + confidence_counts = { "high": 0, - "critical": 0 + "medium": 0, + "low": 0 } category_counts = {} + source_counts = {} + type_counts = {} + affected_files = set() for finding in findings: # Count by severity if finding.severity in severity_counts: severity_counts[finding.severity] += 1 + # Count by confidence + if finding.confidence in confidence_counts: + confidence_counts[finding.confidence] += 1 + # Count by category if finding.category not in category_counts: category_counts[finding.category] = 0 category_counts[finding.category] += 1 + # Count by source (module) + module = finding.found_by.module + if module not in source_counts: + source_counts[module] = 0 + source_counts[module] += 1 + + # Count by type + detection_type = finding.found_by.type + if detection_type not in type_counts: + type_counts[detection_type] = 0 + type_counts[detection_type] += 1 + + # Track affected files + if finding.file_path: + affected_files.add(finding.file_path) + return { "total_findings": len(findings), "severity_counts": severity_counts, + "confidence_counts": confidence_counts, "category_counts": category_counts, + "source_counts": source_counts, + "type_counts": type_counts, + "affected_files": len(affected_files), "highest_severity": self._get_highest_severity(findings) } diff --git a/backend/toolbox/modules/fuzzer/atheris_fuzzer.py b/backend/toolbox/modules/fuzzer/atheris_fuzzer.py index 3f0c42d6..68a1e6c1 100644 --- a/backend/toolbox/modules/fuzzer/atheris_fuzzer.py +++ b/backend/toolbox/modules/fuzzer/atheris_fuzzer.py @@ -19,7 +19,7 @@ import uuid import httpx -from modules.base import BaseModule, ModuleMetadata, ModuleResult, ModuleFinding +from modules.base import BaseModule, ModuleMetadata, ModuleResult, ModuleFinding, FoundBy logger = logging.getLogger(__name__) @@ -556,7 +556,16 @@ async def _generate_findings(self, target_path: Path) -> List[ModuleFinding]: # Encode crash input for storage crash_input_b64 = base64.b64encode(crash["input"]).decode() + # Create FoundBy attribution + found_by = FoundBy( + module="atheris_fuzzer", + tool_name="Atheris", + tool_version="unknown", + type="fuzzer" + ) + finding = self.create_finding( + rule_id=f"fuzzer_crash_{crash['exception_type'].lower().replace(' ', '_')}", title=f"Crash: {crash['exception_type']}", description=( f"Atheris found crash during fuzzing:\n" @@ -566,6 +575,8 @@ async def _generate_findings(self, target_path: Path) -> List[ModuleFinding]: ), severity="critical", category="crash", + found_by=found_by, + confidence="high", # Fuzzer-found crashes are highly reliable file_path=str(target_path), metadata={ "crash_input_base64": crash_input_b64, diff --git a/backend/toolbox/modules/fuzzer/cargo_fuzzer.py b/backend/toolbox/modules/fuzzer/cargo_fuzzer.py index c4fc746c..1f7522f0 100644 --- a/backend/toolbox/modules/fuzzer/cargo_fuzzer.py +++ b/backend/toolbox/modules/fuzzer/cargo_fuzzer.py @@ -13,7 +13,7 @@ from pathlib import Path from typing import Dict, Any, List, Optional, Callable -from modules.base import BaseModule, ModuleMetadata, ModuleResult, ModuleFinding +from modules.base import BaseModule, ModuleMetadata, ModuleResult, ModuleFinding, FoundBy logger = logging.getLogger(__name__) @@ -426,14 +426,25 @@ async def _analyze_crash( else: severity = "high" + # Create FoundBy attribution + found_by = FoundBy( + module="cargo_fuzz", + tool_name="cargo-fuzz", + tool_version="0.11.2", + type="fuzzer" + ) + # Create finding finding = self.create_finding( + rule_id=f"fuzzer_crash_{error_type.lower().replace(' ', '_')}", title=f"Crash: {error_type} in {target_name}", description=f"Cargo-fuzz discovered a crash in target '{target_name}'. " f"Error type: {error_type}. " f"Input size: {len(crash_input)} bytes.", severity=severity, category="crash", + found_by=found_by, + confidence="high", # Fuzzer-found crashes are highly reliable file_path=f"fuzz/fuzz_targets/{target_name}.rs", code_snippet=stack_trace[:500], recommendation="Review the crash details and fix the underlying bug. " diff --git a/backend/toolbox/modules/reporter/__init__.py b/backend/toolbox/modules/reporter/__init__.py index 7812ff12..38f1725d 100644 --- a/backend/toolbox/modules/reporter/__init__.py +++ b/backend/toolbox/modules/reporter/__init__.py @@ -9,6 +9,6 @@ # # Additional attribution and requirements are provided in the NOTICE file. -from .sarif_reporter import SARIFReporter +from .native_reporter import SARIFReporter __all__ = ["SARIFReporter"] \ No newline at end of file diff --git a/backend/toolbox/modules/reporter/sarif_reporter.py b/backend/toolbox/modules/reporter/native_reporter.py similarity index 66% rename from backend/toolbox/modules/reporter/sarif_reporter.py rename to backend/toolbox/modules/reporter/native_reporter.py index 2a8bec75..eb799f8d 100644 --- a/backend/toolbox/modules/reporter/sarif_reporter.py +++ b/backend/toolbox/modules/reporter/native_reporter.py @@ -1,5 +1,6 @@ """ -SARIF Reporter Module - Generates SARIF-formatted security reports +Native Reporter Module - Generates native FuzzForge format security reports +(Previously SARIF Reporter - now generates native format, SARIF export available separately) """ # Copyright (c) 2025 FuzzingLabs @@ -31,50 +32,46 @@ class SARIFReporter(BaseModule): """ - Generates SARIF (Static Analysis Results Interchange Format) reports. + Generates native FuzzForge format security reports. This module: - - Converts findings to SARIF format + - Converts findings to native FuzzForge format - Aggregates results from multiple modules - Adds metadata and context - Provides actionable recommendations + - (SARIF export available via separate exporter module) """ def get_metadata(self) -> ModuleMetadata: """Get module metadata""" return ModuleMetadata( - name="sarif_reporter", - version="1.0.0", - description="Generates SARIF-formatted security reports", + name="native_reporter", + version="2.0.0", + description="Generates native FuzzForge format security reports", author="FuzzForge Team", category="reporter", - tags=["reporting", "sarif", "output"], + tags=["reporting", "native", "output"], input_schema={ "findings": { "type": "array", "description": "List of findings to report", "required": True }, - "tool_name": { + "workflow_name": { "type": "string", - "description": "Name of the tool", + "description": "Name of the workflow", "default": "FuzzForge Security Assessment" }, - "tool_version": { + "run_id": { "type": "string", - "description": "Tool version", - "default": "1.0.0" - }, - "include_code_flows": { - "type": "boolean", - "description": "Include code flow information", - "default": False + "description": "Run identifier", + "required": True } }, output_schema={ - "sarif": { + "native": { "type": "object", - "description": "SARIF 2.1.0 formatted report" + "description": "Native FuzzForge findings format" } }, requires_workspace=False # Reporter doesn't need direct workspace access @@ -88,22 +85,21 @@ def validate_config(self, config: Dict[str, Any]) -> bool: async def execute(self, config: Dict[str, Any], workspace: Path = None) -> ModuleResult: """ - Execute the SARIF reporter module. + Execute the native reporter module. Args: config: Module configuration with findings workspace: Optional workspace path for context Returns: - ModuleResult with SARIF report + ModuleResult with native format report """ self.start_timer() self.validate_config(config) # Get configuration - tool_name = config.get("tool_name", "FuzzForge Security Assessment") - tool_version = config.get("tool_version", "1.0.0") - include_code_flows = config.get("include_code_flows", False) + workflow_name = config.get("workflow_name", "FuzzForge Security Assessment") + run_id = config.get("run_id", "unknown") # Collect findings from either direct findings or module results all_findings = [] @@ -123,16 +119,14 @@ async def execute(self, config: Dict[str, Any], workspace: Path = None) -> Modul elif hasattr(module_result, "findings"): all_findings.extend(module_result.findings) - logger.info(f"Generating SARIF report for {len(all_findings)} findings") + logger.info(f"Generating native format report for {len(all_findings)} findings") try: - # Generate SARIF report - sarif_report = self._generate_sarif( + # Generate native format report + native_report = self._generate_native_report( findings=all_findings, - tool_name=tool_name, - tool_version=tool_version, - include_code_flows=include_code_flows, - workspace_path=str(workspace) if workspace else None + workflow_name=workflow_name, + run_id=run_id ) # Create summary @@ -146,23 +140,141 @@ async def execute(self, config: Dict[str, Any], workspace: Path = None) -> Modul findings=[], # Reporter doesn't generate new findings summary=summary, metadata={ - "tool_name": tool_name, - "tool_version": tool_version, - "report_format": "SARIF 2.1.0", + "workflow_name": workflow_name, + "run_id": run_id, + "report_format": "Native FuzzForge 1.0.0", "total_findings": len(all_findings) }, error=None, - sarif=sarif_report # Add SARIF as custom field + sarif=native_report # Field name kept for API compatibility ) except Exception as e: - logger.error(f"SARIF reporter failed: {e}") + logger.error(f"Native reporter failed: {e}") return self.create_result( findings=[], status="failed", error=str(e) ) + def _generate_native_report( + self, + findings: List[ModuleFinding], + workflow_name: str, + run_id: str + ) -> Dict[str, Any]: + """ + Generate native FuzzForge format report. + + Args: + findings: List of findings to report + workflow_name: Name of the workflow + run_id: Run identifier + + Returns: + Native FuzzForge formatted dictionary + """ + # Convert ModuleFinding objects to native format dictionaries + findings_list = [] + for finding in findings: + finding_dict = { + "id": finding.id, + "rule_id": finding.rule_id, + "found_by": { + "module": finding.found_by.module, + "tool_name": finding.found_by.tool_name, + "tool_version": finding.found_by.tool_version, + "type": finding.found_by.type + }, + "title": finding.title, + "description": finding.description, + "severity": finding.severity, + "confidence": finding.confidence, + "category": finding.category, + "recommendation": finding.recommendation, + "references": finding.references + } + + # Add optional fields + if finding.cwe: + finding_dict["cwe"] = finding.cwe + if finding.owasp: + finding_dict["owasp"] = finding.owasp + if finding.llm_context: + finding_dict["llm_context"] = { + "model": finding.llm_context.model, + "prompt": finding.llm_context.prompt, + "temperature": finding.llm_context.temperature + } + + # Add location if available + if finding.file_path: + finding_dict["location"] = { + "file": finding.file_path, + "line_start": finding.line_start, + "line_end": finding.line_end, + "column_start": finding.column_start, + "column_end": finding.column_end, + "snippet": finding.code_snippet + } + + finding_dict["metadata"] = finding.metadata + findings_list.append(finding_dict) + + # Create summary + from datetime import datetime + summary = self._create_native_summary(findings) + + # Build native format structure + native_report = { + "version": "1.0.0", + "run_id": run_id, + "workflow": workflow_name, + "timestamp": datetime.utcnow().isoformat() + "Z", + "findings": findings_list, + "summary": summary + } + + return native_report + + def _create_native_summary(self, findings: List[ModuleFinding]) -> Dict[str, Any]: + """Create summary for native format""" + summary = { + "total_findings": len(findings), + "by_severity": {}, + "by_confidence": {}, + "by_category": {}, + "by_source": {}, + "by_type": {}, + "affected_files": 0 + } + + affected_files = set() + + for finding in findings: + # Count by severity + summary["by_severity"][finding.severity] = summary["by_severity"].get(finding.severity, 0) + 1 + + # Count by confidence + summary["by_confidence"][finding.confidence] = summary["by_confidence"].get(finding.confidence, 0) + 1 + + # Count by category + summary["by_category"][finding.category] = summary["by_category"].get(finding.category, 0) + 1 + + # Count by source (module) + summary["by_source"][finding.found_by.module] = summary["by_source"].get(finding.found_by.module, 0) + 1 + + # Count by type + summary["by_type"][finding.found_by.type] = summary["by_type"].get(finding.found_by.type, 0) + 1 + + # Track affected files + if finding.file_path: + affected_files.add(finding.file_path) + + summary["affected_files"] = len(affected_files) + return summary + + # Keep old SARIF methods for reference/future SARIF export module def _generate_sarif( self, findings: List[ModuleFinding], diff --git a/backend/toolbox/modules/scanner/dependency_scanner.py b/backend/toolbox/modules/scanner/dependency_scanner.py index 4c7791c4..831b0059 100644 --- a/backend/toolbox/modules/scanner/dependency_scanner.py +++ b/backend/toolbox/modules/scanner/dependency_scanner.py @@ -21,12 +21,12 @@ from typing import Dict, Any, List try: - from toolbox.modules.base import BaseModule, ModuleMetadata, ModuleResult, ModuleFinding + from toolbox.modules.base import BaseModule, ModuleMetadata, ModuleResult, ModuleFinding, FoundBy except ImportError: try: - from modules.base import BaseModule, ModuleMetadata, ModuleResult, ModuleFinding + from modules.base import BaseModule, ModuleMetadata, ModuleResult, ModuleFinding, FoundBy except ImportError: - from src.toolbox.modules.base import BaseModule, ModuleMetadata, ModuleResult, ModuleFinding + from src.toolbox.modules.base import BaseModule, ModuleMetadata, ModuleResult, ModuleFinding, FoundBy logger = logging.getLogger(__name__) @@ -201,11 +201,22 @@ def _convert_to_findings( recommendation = f"Upgrade {package_name} to a fixed version: {', '.join(fix_versions)}" if fix_versions else f"Check for updates to {package_name}" + # Create FoundBy attribution + found_by = FoundBy( + module="dependency_scanner", + tool_name="pip-audit", + tool_version="unknown", + type="tool" + ) + finding = self.create_finding( + rule_id=f"vulnerable_dependency_{package_name}", title=f"Vulnerable dependency: {package_name} ({vuln_id})", description=f"{description}\n\nAffected package: {package_name} {package_version}", severity=severity, category="vulnerable-dependency", + found_by=found_by, + confidence="high", # pip-audit uses official CVE database file_path=str(rel_path), recommendation=recommendation, metadata={ diff --git a/backend/toolbox/modules/scanner/file_scanner.py b/backend/toolbox/modules/scanner/file_scanner.py index 22de2002..f7a39869 100644 --- a/backend/toolbox/modules/scanner/file_scanner.py +++ b/backend/toolbox/modules/scanner/file_scanner.py @@ -20,12 +20,12 @@ import hashlib try: - from toolbox.modules.base import BaseModule, ModuleMetadata, ModuleResult + from toolbox.modules.base import BaseModule, ModuleMetadata, ModuleResult, FoundBy except ImportError: try: - from modules.base import BaseModule, ModuleMetadata, ModuleResult + from modules.base import BaseModule, ModuleMetadata, ModuleResult, FoundBy except ImportError: - from src.toolbox.modules.base import BaseModule, ModuleMetadata, ModuleResult + from src.toolbox.modules.base import BaseModule, ModuleMetadata, ModuleResult, FoundBy logger = logging.getLogger(__name__) @@ -122,6 +122,14 @@ async def execute(self, config: Dict[str, Any], workspace: Path) -> ModuleResult logger.info(f"Scanning workspace with patterns: {patterns}") + # Create FoundBy attribution for all findings + found_by = FoundBy( + module="file_scanner", + tool_name="File Scanner", + tool_version="1.0.0", + type="tool" + ) + try: # Scan for each pattern for pattern in patterns: @@ -152,10 +160,13 @@ async def execute(self, config: Dict[str, Any], workspace: Path) -> ModuleResult # Check for sensitive files if check_sensitive and self._is_sensitive_file(file_path): findings.append(self.create_finding( + rule_id="sensitive_file", title=f"Potentially sensitive file: {relative_path.name}", description=f"Found potentially sensitive file at {relative_path}", severity="medium", category="sensitive_file", + found_by=found_by, + confidence="medium", file_path=str(relative_path), metadata={ "file_size": file_size, @@ -170,10 +181,13 @@ async def execute(self, config: Dict[str, Any], workspace: Path) -> ModuleResult # Create informational finding for each file findings.append(self.create_finding( + rule_id="file_enumeration", title=f"File discovered: {relative_path.name}", description=f"File: {relative_path}", severity="info", category="file_enumeration", + found_by=found_by, + confidence="high", file_path=str(relative_path), metadata={ "file_size": file_size, diff --git a/backend/toolbox/workflows/android_static_analysis/workflow.py b/backend/toolbox/workflows/android_static_analysis/workflow.py index 8376cd2d..e7e8929d 100644 --- a/backend/toolbox/workflows/android_static_analysis/workflow.py +++ b/backend/toolbox/workflows/android_static_analysis/workflow.py @@ -267,7 +267,8 @@ async def run( ) # Calculate summary - total_findings = len(sarif_report.get("runs", [{}])[0].get("results", [])) + runs = sarif_report.get("runs", []) + total_findings = len(runs[0].get("results", [])) if runs else 0 summary = { "workflow": "android_static_analysis", diff --git a/cli/src/fuzzforge_cli/commands/findings.py b/cli/src/fuzzforge_cli/commands/findings.py index 70585273..5f944d6d 100644 --- a/cli/src/fuzzforge_cli/commands/findings.py +++ b/cli/src/fuzzforge_cli/commands/findings.py @@ -66,6 +66,14 @@ def get_findings( format: str = typer.Option( "table", "--format", "-f", help="Output format: table, json, sarif" + ), + limit: Optional[int] = typer.Option( + None, "--limit", "-l", + help="Maximum number of findings to display (no limit by default)" + ), + offset: int = typer.Option( + 0, "--offset", + help="Number of findings to skip (for pagination)" ) ): """ @@ -86,40 +94,62 @@ def get_findings( try: db = ensure_project_db() - # Extract summary from SARIF - sarif_data = findings.sarif - runs_data = sarif_data.get("runs", []) + # Get findings data (API returns .sarif for now, will be native format later) + findings_data = findings.sarif summary = {} - if runs_data: - results = runs_data[0].get("results", []) + # Support both native format and SARIF format + if "findings" in findings_data: + # Native FuzzForge format + findings_list = findings_data.get("findings", []) summary = { - "total_issues": len(results), + "total_issues": len(findings_list), "by_severity": {}, "by_rule": {}, - "tools": [] + "by_source": {} } - for result in results: - level = result.get("level", "note") - rule_id = result.get("ruleId", "unknown") + for finding in findings_list: + severity = finding.get("severity", "info") + rule_id = finding.get("rule_id", "unknown") + module = finding.get("found_by", {}).get("module", "unknown") - summary["by_severity"][level] = summary["by_severity"].get(level, 0) + 1 + summary["by_severity"][severity] = summary["by_severity"].get(severity, 0) + 1 summary["by_rule"][rule_id] = summary["by_rule"].get(rule_id, 0) + 1 + summary["by_source"][module] = summary["by_source"].get(module, 0) + 1 + + elif "runs" in findings_data: + # SARIF format (backward compatibility) + runs_data = findings_data.get("runs", []) + if runs_data: + results = runs_data[0].get("results", []) + summary = { + "total_issues": len(results), + "by_severity": {}, + "by_rule": {}, + "tools": [] + } - # Extract tool info - tool = runs_data[0].get("tool", {}) - driver = tool.get("driver", {}) - if driver.get("name"): - summary["tools"].append({ - "name": driver.get("name"), - "version": driver.get("version"), - "rules": len(driver.get("rules", [])) - }) + for result in results: + level = result.get("level", "note") + rule_id = result.get("ruleId", "unknown") + + summary["by_severity"][level] = summary["by_severity"].get(level, 0) + 1 + summary["by_rule"][rule_id] = summary["by_rule"].get(rule_id, 0) + 1 + + # Extract tool info + tool = runs_data[0].get("tool", {}) + driver = tool.get("driver", {}) + if driver.get("name"): + summary["tools"].append({ + "name": driver.get("name"), + "version": driver.get("version"), + "rules": len(driver.get("rules", [])) + }) finding_record = FindingRecord( run_id=run_id, - sarif_data=sarif_data, + findings_data=findings_data, summary=summary, created_at=datetime.now() ) @@ -138,12 +168,12 @@ def get_findings( console.print(sarif_json) else: # table format - display_findings_table(findings.sarif) + display_findings_table(findings.sarif, limit=limit, offset=offset) # Suggest export command and show command - console.print(f"\nšŸ’” View full details of a finding: [bold cyan]ff finding show {run_id} --rule [/bold cyan]") - console.print(f"šŸ’” Export these findings: [bold cyan]ff findings export {run_id} --format sarif[/bold cyan]") - console.print(" Supported formats: [cyan]sarif[/cyan] (standard), [cyan]json[/cyan], [cyan]csv[/cyan], [cyan]html[/cyan]") + console.print(f"\nšŸ’” View full details of a finding: [bold cyan]ff finding show {run_id} --id [/bold cyan]") + console.print(f"šŸ’” Export these findings: [bold cyan]ff findings export {run_id} --format native[/bold cyan]") + console.print(" Supported formats: [cyan]native[/cyan] (default), [cyan]sarif[/cyan], [cyan]json[/cyan], [cyan]csv[/cyan], [cyan]html[/cyan]") except Exception as e: console.print(f"āŒ Failed to get findings: {e}", style="red") @@ -152,12 +182,13 @@ def get_findings( def show_finding( run_id: str = typer.Argument(..., help="Run ID to get finding from"), - rule_id: str = typer.Option(..., "--rule", "-r", help="Rule ID of the specific finding to show") + finding_id: str = typer.Option(..., "--id", "-i", help="Unique ID of the specific finding to show") ): """ šŸ” Show detailed information about a specific finding This function is registered as a command in main.py under the finding (singular) command group. + Use the unique finding ID (shown in the findings table) to view details. """ try: require_project() @@ -173,96 +204,263 @@ def show_finding( with get_client() as client: console.print(f"šŸ” Fetching findings for run: {run_id}") findings = client.get_run_findings(run_id) - sarif_data = findings.sarif + findings_dict = findings.sarif # API still returns .sarif for now else: - sarif_data = findings_data.sarif_data - - # Find the specific finding by rule_id - runs = sarif_data.get("runs", []) - if not runs: - console.print("āŒ No findings data available", style="red") - raise typer.Exit(1) - - run_data = runs[0] - results = run_data.get("results", []) - tool = run_data.get("tool", {}).get("driver", {}) + findings_dict = findings_data.findings_data - # Search for matching finding + # Find the specific finding by unique ID + # For now, support both SARIF (old) and native format (new) matching_finding = None - for result in results: - if result.get("ruleId") == rule_id: - matching_finding = result - break + + # Try native format first + if "findings" in findings_dict: + for finding in findings_dict.get("findings", []): + if finding.get("id") == finding_id or finding.get("id", "").startswith(finding_id): + matching_finding = finding + break + # Fallback to SARIF format (for backward compatibility during transition) + elif "runs" in findings_dict: + runs = findings_dict.get("runs", []) + if runs: + run_data = runs[0] + results = run_data.get("results", []) + for result in results: + # Check if finding ID is in properties + props = result.get("properties", {}) + fid = props.get("findingId", "") + if fid == finding_id or fid.startswith(finding_id): + matching_finding = result + break if not matching_finding: - console.print(f"āŒ No finding found with rule ID: {rule_id}", style="red") + console.print(f"āŒ No finding found with ID: {finding_id}", style="red") console.print(f"šŸ’” Use [bold cyan]ff findings get {run_id}[/bold cyan] to see all findings", style="dim") raise typer.Exit(1) # Display detailed finding - display_finding_detail(matching_finding, tool, run_id) + display_finding_detail(matching_finding, run_id) except Exception as e: console.print(f"āŒ Failed to get finding: {e}", style="red") raise typer.Exit(1) -def display_finding_detail(finding: Dict[str, Any], tool: Dict[str, Any], run_id: str): - """Display detailed information about a single finding""" - rule_id = finding.get("ruleId", "unknown") - level = finding.get("level", "note") - message = finding.get("message", {}) - message_text = message.get("text", "No summary available") - message_markdown = message.get("markdown", message_text) +@app.command("by-rule") +def show_findings_by_rule( + run_id: str = typer.Argument(..., help="Run ID to get findings from"), + rule_id: str = typer.Option(..., "--rule", "-r", help="Rule ID to filter findings") +): + """ + šŸ” Show all findings matching a specific rule - # Get location - locations = finding.get("locations", []) - location_str = "Unknown location" - code_snippet = None + This command shows ALL findings that match the given rule ID. + Useful when you have multiple instances of the same vulnerability type. + """ + try: + require_project() + validate_run_id(run_id) - if locations: - physical_location = locations[0].get("physicalLocation", {}) - artifact_location = physical_location.get("artifactLocation", {}) - region = physical_location.get("region", {}) + # Try to get from database first, fallback to API + db = get_project_db() + findings_data = None + if db: + findings_data = db.get_findings(run_id) - file_path = artifact_location.get("uri", "") - if file_path: - location_str = file_path - if region.get("startLine"): - location_str += f":{region['startLine']}" - if region.get("startColumn"): - location_str += f":{region['startColumn']}" + if not findings_data: + with get_client() as client: + console.print(f"šŸ” Fetching findings for run: {run_id}") + findings = client.get_run_findings(run_id) + findings_dict = findings.sarif # API still returns .sarif for now + else: + findings_dict = findings_data.findings_data + + # Find all findings matching the rule + matching_findings = [] + + # Try native format first + if "findings" in findings_dict: + for finding in findings_dict.get("findings", []): + if finding.get("rule_id") == rule_id: + matching_findings.append(finding) + # Fallback to SARIF format + elif "runs" in findings_dict: + runs = findings_dict.get("runs", []) + if runs: + run_data = runs[0] + results = run_data.get("results", []) + for result in results: + if result.get("ruleId") == rule_id: + matching_findings.append(result) + + if not matching_findings: + console.print(f"āŒ No findings found with rule ID: {rule_id}", style="red") + console.print(f"šŸ’” Use [bold cyan]ff findings get {run_id}[/bold cyan] to see all findings", style="dim") + raise typer.Exit(1) + + console.print(f"\nšŸ” Found {len(matching_findings)} finding(s) matching rule: [bold cyan]{rule_id}[/bold cyan]\n") + + # Display each finding + for i, finding in enumerate(matching_findings, 1): + console.print(f"[bold]Finding {i} of {len(matching_findings)}[/bold]") + display_finding_detail(finding, run_id) + if i < len(matching_findings): + console.print("\n" + "─" * 80 + "\n") + + except Exception as e: + console.print(f"āŒ Failed to get findings: {e}", style="red") + raise typer.Exit(1) + + +def display_finding_detail(finding: Dict[str, Any], run_id: str): + """Display detailed information about a single finding (supports both native and SARIF format)""" + + # Detect format and extract fields + is_native = "rule_id" in finding # Native format has rule_id, SARIF has ruleId + + if is_native: + # Native FuzzForge format + finding_id = finding.get("id", "unknown") + rule_id = finding.get("rule_id", "unknown") + title = finding.get("title", "No title") + description = finding.get("description", "No description") + severity = finding.get("severity", "info") + confidence = finding.get("confidence", "medium") + category = finding.get("category", "unknown") + cwe = finding.get("cwe") + owasp = finding.get("owasp") + recommendation = finding.get("recommendation") + + # Found by information + found_by = finding.get("found_by", {}) + module = found_by.get("module", "unknown") + tool_name = found_by.get("tool_name", "Unknown") + tool_version = found_by.get("tool_version", "unknown") + detection_type = found_by.get("type", "unknown") + + # LLM context if available + llm_context = finding.get("llm_context") + + # Location + location = finding.get("location", {}) + file_path = location.get("file", "") + line_start = location.get("line_start") + column_start = location.get("column_start") + code_snippet = location.get("snippet") + + location_str = file_path if file_path else "Unknown location" + if line_start: + location_str += f":{line_start}" + if column_start: + location_str += f":{column_start}" + + else: + # SARIF format (backward compatibility) + props = finding.get("properties", {}) + finding_id = props.get("findingId", "unknown") + rule_id = finding.get("ruleId", "unknown") + title = props.get("title", "No title") + severity = finding.get("level", "note") + confidence = "medium" # Not available in SARIF + category = "unknown" + cwe = None + owasp = None + + message = finding.get("message", {}) + description = message.get("text", "No description") + recommendation = None + + module = "unknown" + tool_name = "Unknown" + tool_version = "unknown" + detection_type = "tool" + llm_context = None + + # Location from SARIF + locations = finding.get("locations", []) + location_str = "Unknown location" + code_snippet = None + + if locations: + physical_location = locations[0].get("physicalLocation", {}) + artifact_location = physical_location.get("artifactLocation", {}) + region = physical_location.get("region", {}) + + file_path = artifact_location.get("uri", "") + if file_path: + location_str = file_path + if region.get("startLine"): + location_str += f":{region['startLine']}" + if region.get("startColumn"): + location_str += f":{region['startColumn']}" - # Get code snippet if available - if region.get("snippet", {}).get("text"): - code_snippet = region["snippet"]["text"].strip() + if region.get("snippet", {}).get("text"): + code_snippet = region["snippet"]["text"].strip() # Get severity style severity_color = { + "critical": "red", + "high": "red", + "medium": "yellow", + "low": "blue", + "info": "cyan", + # SARIF levels "error": "red", "warning": "yellow", - "note": "blue", - "info": "cyan" - }.get(level.lower(), "white") + "note": "blue" + }.get(severity.lower(), "white") # Build detailed content content_lines = [] + content_lines.append(f"[bold]Finding ID:[/bold] {finding_id}") content_lines.append(f"[bold]Rule ID:[/bold] {rule_id}") - content_lines.append(f"[bold]Severity:[/bold] [{severity_color}]{level.upper()}[/{severity_color}]") + content_lines.append(f"[bold]Title:[/bold] {title}") + + # Confidence indicator with emoji + confidence_indicators = { + "high": "🟢", + "medium": "🟔", + "low": "šŸ”“" + } + confidence_emoji = confidence_indicators.get(confidence.lower(), "⚪") + content_lines.append(f"[bold]Severity:[/bold] [{severity_color}]{severity.upper()}[/{severity_color}] [bold]Confidence:[/bold] {confidence_emoji} {confidence.capitalize()}") + + if cwe: + content_lines.append(f"[bold]CWE:[/bold] {cwe}") + if owasp: + content_lines.append(f"[bold]OWASP:[/bold] {owasp}") + + content_lines.append(f"[bold]Category:[/bold] {category}") content_lines.append(f"[bold]Location:[/bold] {location_str}") - content_lines.append(f"[bold]Tool:[/bold] {tool.get('name', 'Unknown')} v{tool.get('version', 'unknown')}") + + # Enhanced found_by display with badge + type_badges = { + "llm": "šŸ¤–", + "tool": "šŸ”§", + "fuzzer": "šŸŽÆ", + "manual": "šŸ‘¤" + } + type_badge = type_badges.get(detection_type.lower(), "šŸ”") + content_lines.append(f"[bold]Found by:[/bold] {type_badge} {tool_name} v{tool_version} [dim]({module})[/dim] [[yellow]{detection_type}[/yellow]]") + + # LLM context details + if llm_context: + model = llm_context.get("model", "unknown") + prompt = llm_context.get("prompt", "") + content_lines.append(f"[bold]LLM Model:[/bold] {model}") + if prompt: + # Show first 100 chars of prompt + prompt_preview = prompt[:100] + "..." if len(prompt) > 100 else prompt + content_lines.append(f"[bold]Prompt:[/bold] [dim]{prompt_preview}[/dim]") + content_lines.append(f"[bold]Run ID:[/bold] {run_id}") content_lines.append("") - content_lines.append("[bold]Summary:[/bold]") - content_lines.append(message_text) - content_lines.append("") content_lines.append("[bold]Description:[/bold]") - content_lines.append(message_markdown) + content_lines.append(description) - if code_snippet: + if recommendation: content_lines.append("") - content_lines.append("[bold]Code Snippet:[/bold]") - content_lines.append(f"[dim]{code_snippet}[/dim]") + content_lines.append("[bold]šŸ’” Recommendation:[/bold]") + content_lines.append(recommendation) content = "\n".join(content_lines) @@ -275,106 +473,204 @@ def display_finding_detail(finding: Dict[str, Any], tool: Dict[str, Any], run_id box=box.ROUNDED, padding=(1, 2) )) + + # Display code snippet with syntax highlighting (separate from panel for better rendering) + if code_snippet: + # Detect language from file path + language = "text" + if is_native and location: + file_path = location.get("file", "") + elif not is_native and locations: + file_path = locations[0].get("physicalLocation", {}).get("artifactLocation", {}).get("uri", "") + else: + file_path = "" + + if file_path: + ext = Path(file_path).suffix.lower() + language_map = { + ".py": "python", + ".js": "javascript", + ".ts": "typescript", + ".java": "java", + ".c": "c", + ".cpp": "cpp", + ".cc": "cpp", + ".h": "c", + ".hpp": "cpp", + ".go": "go", + ".rs": "rust", + ".rb": "ruby", + ".php": "php", + ".swift": "swift", + ".kt": "kotlin", + ".cs": "csharp", + ".html": "html", + ".xml": "xml", + ".json": "json", + ".yaml": "yaml", + ".yml": "yaml", + ".sh": "bash", + ".bash": "bash", + ".sql": "sql", + } + language = language_map.get(ext, "text") + + console.print("\n[bold]Code Snippet:[/bold]") + syntax = Syntax( + code_snippet, + language, + theme="monokai", + line_numbers=True, + start_line=line_start if is_native and location.get("line_start") else 1 + ) + console.print(syntax) + console.print() - console.print(f"šŸ’” Export this run: [bold cyan]ff findings export {run_id} --format sarif[/bold cyan]") + console.print(f"šŸ’” View all findings with this rule: [bold cyan]ff findings by-rule {run_id} --rule {rule_id}[/bold cyan]") + console.print(f"šŸ’” Export this run: [bold cyan]ff findings export {run_id} --format native[/bold cyan]") -def display_findings_table(sarif_data: Dict[str, Any]): - """Display SARIF findings in a rich table format""" - runs = sarif_data.get("runs", []) - if not runs: - console.print("ā„¹ļø No findings data available", style="dim") - return +def display_findings_table(findings_data: Dict[str, Any], limit: Optional[int] = None, offset: int = 0): + """Display findings in a rich table format (supports both native and SARIF formats)""" - run_data = runs[0] - results = run_data.get("results", []) - tool = run_data.get("tool", {}) - driver = tool.get("driver", {}) + # Detect format and extract findings + is_native = "findings" in findings_data + + if is_native: + # Native FuzzForge format + findings_list = findings_data.get("findings", []) + workflow = findings_data.get("workflow", "Unknown") + total_findings = len(findings_list) + else: + # SARIF format (backward compatibility) + runs = findings_data.get("runs", []) + if not runs: + console.print("ā„¹ļø No findings data available", style="dim") + return + + run_data = runs[0] + findings_list = run_data.get("results", []) + tool = run_data.get("tool", {}).get("driver", {}) + workflow = tool.get("name", "Unknown") + total_findings = len(findings_list) # Tool information console.print("\nšŸ” [bold]Security Analysis Results[/bold]") - if driver.get("name"): - console.print(f"Tool: {driver.get('name')} v{driver.get('version', 'unknown')}") + console.print(f"Workflow: {workflow}") - if not results: + if not findings_list: console.print("āœ… No security issues found!", style="green") return # Summary statistics summary_by_level = {} - for result in results: - level = result.get("level", "note") + for finding in findings_list: + if is_native: + level = finding.get("severity", "info") + else: + level = finding.get("level", "note") summary_by_level[level] = summary_by_level.get(level, 0) + 1 summary_table = Table(show_header=False, box=box.SIMPLE) summary_table.add_column("Severity", width=15, justify="left", style="bold") summary_table.add_column("Count", width=8, justify="right", style="bold") - for level, count in sorted(summary_by_level.items()): - # Create Rich Text object with color styling - level_text = level.upper() - severity_text = Text(level_text, style=severity_style(level)) + # Sort by severity order (critical > high > medium > low > info) + severity_order = {"critical": 0, "high": 1, "error": 1, "medium": 2, "warning": 2, "low": 3, "note": 3, "info": 4} + for level in sorted(summary_by_level.keys(), key=lambda x: severity_order.get(x, 99)): + count = summary_by_level[level] + severity_text = Text(level.upper(), style=severity_style(level)) count_text = Text(str(count)) - summary_table.add_row(severity_text, count_text) console.print( Panel.fit( summary_table, - title=f"šŸ“Š Summary ({len(results)} total issues)", + title=f"šŸ“Š Summary ({total_findings} total issues)", box=box.ROUNDED ) ) - # Detailed results - Rich Text-based table with proper emoji alignment + # Apply pagination + start_idx = offset + end_idx = start_idx + limit if limit else len(findings_list) + paginated_findings = findings_list[start_idx:end_idx] + + # Detailed results table with enhanced columns results_table = Table(box=box.ROUNDED) - results_table.add_column("Severity", width=12, justify="left", no_wrap=True) - results_table.add_column("Rule", justify="left", style="bold cyan", no_wrap=True) - results_table.add_column("Message", width=45, justify="left", no_wrap=True) + results_table.add_column("ID", width=10, justify="left", style="dim") + results_table.add_column("Severity", width=10, justify="left", no_wrap=True) + results_table.add_column("Message", width=50, justify="left", no_wrap=True) + results_table.add_column("Found By", width=15, justify="left", style="yellow", no_wrap=True) results_table.add_column("Location", width=20, justify="left", style="dim", no_wrap=True) - for result in results[:50]: # Limit to first 50 results - level = result.get("level", "note") - rule_id = result.get("ruleId", "unknown") - message = result.get("message", {}).get("text", "No message") - - # Extract location information - locations = result.get("locations", []) - location_str = "" - if locations: - physical_location = locations[0].get("physicalLocation", {}) - artifact_location = physical_location.get("artifactLocation", {}) - region = physical_location.get("region", {}) - - file_path = artifact_location.get("uri", "") + for finding in paginated_findings: + if is_native: + # Native format + finding_id = finding.get("id", "")[:8] # First 8 chars + severity = finding.get("severity", "info") + rule_id = finding.get("rule_id", "unknown") + message = finding.get("title", "No message") + found_by_info = finding.get("found_by", {}) + found_by = found_by_info.get("module", "unknown") + + location = finding.get("location", {}) + file_path = location.get("file", "") + line_start = location.get("line_start") + location_str = "" if file_path: location_str = Path(file_path).name - if region.get("startLine"): - location_str += f":{region['startLine']}" - if region.get("startColumn"): - location_str += f":{region['startColumn']}" + if line_start: + location_str += f":{line_start}" + else: + # SARIF format + props = finding.get("properties", {}) + finding_id = props.get("findingId", "")[:8] if props.get("findingId") else "N/A" + severity = finding.get("level", "note") + rule_id = finding.get("ruleId", "unknown") + message = finding.get("message", {}).get("text", "No message") + found_by = "unknown" + + locations = finding.get("locations", []) + location_str = "" + if locations: + physical_location = locations[0].get("physicalLocation", {}) + artifact_location = physical_location.get("artifactLocation", {}) + region = physical_location.get("region", {}) - # Create Rich Text objects with color styling - severity_text = Text(level.upper(), style=severity_style(level)) - severity_text.truncate(12, overflow="ellipsis") + file_path = artifact_location.get("uri", "") + if file_path: + location_str = Path(file_path).name + if region.get("startLine"): + location_str += f":{region['startLine']}" + + # Create styled text objects + severity_text = Text(severity.upper(), style=severity_style(severity)) - # Show full rule ID without truncation + # Truncate long text message_text = Text(message) - message_text.truncate(45, overflow="ellipsis") + message_text.truncate(50, overflow="ellipsis") + + found_by_text = Text(found_by) + found_by_text.truncate(15, overflow="ellipsis") location_text = Text(location_str) - location_text.truncate(20, overflow="ellipsis") + location_text.truncate(18, overflow="ellipsis") results_table.add_row( + finding_id, severity_text, - rule_id, # Pass string directly to show full UUID message_text, + found_by_text, location_text ) console.print("\nšŸ“‹ [bold]Detailed Results[/bold]") - if len(results) > 50: - console.print(f"Showing first 50 of {len(results)} results") + + # Pagination info + if limit and total_findings > limit: + console.print(f"Showing {start_idx + 1}-{min(end_idx, total_findings)} of {total_findings} results") + console.print() console.print(results_table) @@ -461,14 +757,14 @@ def export_findings( try: # Get findings from database first, fallback to API - findings_data = db.get_findings(run_id) - if not findings_data: + findings_record = db.get_findings(run_id) + if not findings_record: console.print(f"šŸ“” Fetching findings from API for run: {run_id}") with get_client() as client: findings = client.get_run_findings(run_id) - sarif_data = findings.sarif + findings_data = findings.sarif else: - sarif_data = findings_data.sarif_data + findings_data = findings_record.findings_data # Generate output filename with timestamp for uniqueness if not output: @@ -480,19 +776,19 @@ def export_findings( # Export based on format if format == "sarif": with open(output_path, 'w') as f: - json.dump(sarif_data, f, indent=2) + json.dump(findings_data, f, indent=2) elif format == "json": # Simplified JSON format - simplified_data = extract_simplified_findings(sarif_data) + simplified_data = extract_simplified_findings(findings_data) with open(output_path, 'w') as f: json.dump(simplified_data, f, indent=2) elif format == "csv": - export_to_csv(sarif_data, output_path) + export_to_csv(findings_data, output_path) elif format == "html": - export_to_html(sarif_data, output_path, run_id) + export_to_html(findings_data, output_path, run_id) else: console.print(f"āŒ Unsupported format: {format}", style="red") @@ -505,71 +801,81 @@ def export_findings( raise typer.Exit(1) -def extract_simplified_findings(sarif_data: Dict[str, Any]) -> Dict[str, Any]: - """Extract simplified findings structure from SARIF""" - runs = sarif_data.get("runs", []) - if not runs: - return {"findings": [], "summary": {}} - - run_data = runs[0] - results = run_data.get("results", []) - tool = run_data.get("tool", {}).get("driver", {}) - - simplified = { - "tool": { - "name": tool.get("name", "Unknown"), - "version": tool.get("version", "Unknown") - }, - "summary": { - "total_issues": len(results), - "by_severity": {} - }, - "findings": [] - } - - for result in results: - level = result.get("level", "note") - simplified["summary"]["by_severity"][level] = simplified["summary"]["by_severity"].get(level, 0) + 1 - - # Extract location - location_info = {} - locations = result.get("locations", []) - if locations: - physical_location = locations[0].get("physicalLocation", {}) - artifact_location = physical_location.get("artifactLocation", {}) - region = physical_location.get("region", {}) - - location_info = { - "file": artifact_location.get("uri", ""), - "line": region.get("startLine"), - "column": region.get("startColumn") - } - - simplified["findings"].append({ - "rule_id": result.get("ruleId", "unknown"), - "severity": level, - "message": result.get("message", {}).get("text", ""), - "location": location_info - }) - - return simplified - +def extract_simplified_findings(findings_data: Dict[str, Any]) -> Dict[str, Any]: + """Extract simplified findings structure from native format or SARIF""" + # Detect format + is_native = "findings" in findings_data and "version" in findings_data + + if is_native: + # Native FuzzForge format + findings_list = findings_data.get("findings", []) + workflow = findings_data.get("workflow", "Unknown") + summary = findings_data.get("summary", {}) + + simplified = { + "tool": { + "name": workflow, + "version": findings_data.get("version", "1.0.0") + }, + "summary": summary if summary else { + "total_issues": len(findings_list), + "by_severity": {} + }, + "findings": [] + } + + # Count by severity if not in summary + if not summary: + for finding in findings_list: + severity = finding.get("severity", "info") + simplified["summary"]["by_severity"][severity] = simplified["summary"]["by_severity"].get(severity, 0) + 1 + + # Extract simplified findings + for finding in findings_list: + location = finding.get("location", {}) + simplified["findings"].append({ + "id": finding.get("id"), + "rule_id": finding.get("rule_id", "unknown"), + "severity": finding.get("severity", "info"), + "confidence": finding.get("confidence", "medium"), + "title": finding.get("title", ""), + "description": finding.get("description", ""), + "category": finding.get("category", "other"), + "found_by": finding.get("found_by", {}), + "location": { + "file": location.get("file", ""), + "line": location.get("line_start"), + "column": location.get("column_start") + } + }) + else: + # SARIF format + runs = findings_data.get("runs", []) + if not runs: + return {"findings": [], "summary": {}} -def export_to_csv(sarif_data: Dict[str, Any], output_path: Path): - """Export findings to CSV format""" - runs = sarif_data.get("runs", []) - if not runs: - return + run_data = runs[0] + results = run_data.get("results", []) + tool = run_data.get("tool", {}).get("driver", {}) - results = runs[0].get("results", []) - - with open(output_path, 'w', newline='', encoding='utf-8') as csvfile: - fieldnames = ['rule_id', 'severity', 'message', 'file', 'line', 'column'] - writer = csv.DictWriter(csvfile, fieldnames=fieldnames) - writer.writeheader() + simplified = { + "tool": { + "name": tool.get("name", "Unknown"), + "version": tool.get("version", "Unknown") + }, + "summary": { + "total_issues": len(results), + "by_severity": {} + }, + "findings": [] + } for result in results: - location_info = {"file": "", "line": "", "column": ""} + level = result.get("level", "note") + simplified["summary"]["by_severity"][level] = simplified["summary"]["by_severity"].get(level, 0) + 1 + + # Extract location + location_info = {} locations = result.get("locations", []) if locations: physical_location = locations[0].get("physicalLocation", {}) @@ -578,109 +884,1790 @@ def export_to_csv(sarif_data: Dict[str, Any], output_path: Path): location_info = { "file": artifact_location.get("uri", ""), - "line": region.get("startLine", ""), - "column": region.get("startColumn", "") + "line": region.get("startLine"), + "column": region.get("startColumn") } - writer.writerow({ - "rule_id": result.get("ruleId", ""), - "severity": result.get("level", "note"), + simplified["findings"].append({ + "rule_id": result.get("ruleId", "unknown"), + "severity": level, "message": result.get("message", {}).get("text", ""), - **location_info + "location": location_info }) + return simplified -def export_to_html(sarif_data: Dict[str, Any], output_path: Path, run_id: str): - """Export findings to HTML format""" - runs = sarif_data.get("runs", []) - if not runs: - return - run_data = runs[0] - results = run_data.get("results", []) - tool = run_data.get("tool", {}).get("driver", {}) +def export_to_csv(findings_data: Dict[str, Any], output_path: Path): + """Export findings to CSV format (supports both native and SARIF)""" + # Detect format + is_native = "findings" in findings_data and "version" in findings_data - # Simple HTML template + with open(output_path, 'w', newline='', encoding='utf-8') as csvfile: + if is_native: + # Native FuzzForge format - include more fields + fieldnames = ['id', 'rule_id', 'severity', 'confidence', 'title', 'category', 'module', 'file', 'line', 'column'] + writer = csv.DictWriter(csvfile, fieldnames=fieldnames) + writer.writeheader() + + findings_list = findings_data.get("findings", []) + for finding in findings_list: + location = finding.get("location", {}) + found_by = finding.get("found_by", {}) + + writer.writerow({ + "id": finding.get("id", "")[:8], + "rule_id": finding.get("rule_id", ""), + "severity": finding.get("severity", "info"), + "confidence": finding.get("confidence", "medium"), + "title": finding.get("title", ""), + "category": finding.get("category", ""), + "module": found_by.get("module", ""), + "file": location.get("file", ""), + "line": location.get("line_start", ""), + "column": location.get("column_start", "") + }) + else: + # SARIF format + fieldnames = ['rule_id', 'severity', 'message', 'file', 'line', 'column'] + writer = csv.DictWriter(csvfile, fieldnames=fieldnames) + writer.writeheader() + + runs = findings_data.get("runs", []) + if not runs: + return + + results = runs[0].get("results", []) + + for result in results: + location_info = {"file": "", "line": "", "column": ""} + locations = result.get("locations", []) + if locations: + physical_location = locations[0].get("physicalLocation", {}) + artifact_location = physical_location.get("artifactLocation", {}) + region = physical_location.get("region", {}) + + location_info = { + "file": artifact_location.get("uri", ""), + "line": region.get("startLine", ""), + "column": region.get("startColumn", "") + } + + writer.writerow({ + "rule_id": result.get("ruleId", ""), + "severity": result.get("level", "note"), + "message": result.get("message", {}).get("text", ""), + **location_info + }) + + +def export_to_html(findings_data: Dict[str, Any], output_path: Path, run_id: str): + """Export findings to modern, interactive HTML format with charts""" + import html + from datetime import datetime + + # Helper function to safely escape strings + def safe_escape(value): + """Safely escape a value, handling None and non-string types""" + if value is None: + return "" + return html.escape(str(value)) + + # Detect format (native or SARIF) + is_native = "findings" in findings_data and "version" in findings_data + + if is_native: + # Native FuzzForge format + findings_list = findings_data.get("findings", []) + workflow = findings_data.get("workflow", "Security Assessment") + summary = findings_data.get("summary", {}) + total_findings = len(findings_list) + else: + # SARIF format (backward compatibility) + runs = findings_data.get("runs", []) + if not runs: + # Empty report + findings_list = [] + workflow = "Security Assessment" + summary = {} + total_findings = 0 + else: + run_data = runs[0] + findings_list = run_data.get("results", []) + tool = run_data.get("tool", {}).get("driver", {}) + workflow = tool.get("name", "Security Assessment") + total_findings = len(findings_list) + summary = {} + + # Calculate statistics + severity_counts = {"critical": 0, "high": 0, "medium": 0, "low": 0, "info": 0} + confidence_counts = {"high": 0, "medium": 0, "low": 0} + category_counts = {} + source_counts = {} + type_counts = {} + + for finding in findings_list: + if is_native: + severity = finding.get("severity", "info") + confidence = finding.get("confidence", "medium") + category = finding.get("category", "other") + found_by = finding.get("found_by", {}) + source = found_by.get("module", "unknown") + detection_type = found_by.get("type", "tool") + else: + # Map SARIF levels to severity + level = finding.get("level", "note") + severity_map = {"error": "high", "warning": "medium", "note": "low", "none": "info"} + severity = severity_map.get(level, "info") + confidence = "medium" + category = "other" + source = "unknown" + detection_type = "tool" + + severity_counts[severity] = severity_counts.get(severity, 0) + 1 + confidence_counts[confidence] = confidence_counts.get(confidence, 0) + 1 + category_counts[category] = category_counts.get(category, 0) + 1 + source_counts[source] = source_counts.get(source, 0) + 1 + type_counts[detection_type] = type_counts.get(detection_type, 0) + 1 + + # Prepare chart data + severity_data = {k: v for k, v in severity_counts.items() if v > 0} + category_data = dict(sorted(category_counts.items(), key=lambda x: x[1], reverse=True)[:10]) + source_data = dict(sorted(source_counts.items(), key=lambda x: x[1], reverse=True)[:10]) + type_data = {k: v for k, v in type_counts.items() if v > 0} + + # Generate findings rows + findings_rows = "" + for idx, finding in enumerate(findings_list): + if is_native: + finding_id = finding.get("id", "")[:8] if finding.get("id") else "" + severity = finding.get("severity", "info") + confidence = finding.get("confidence", "medium") + title = safe_escape(finding.get("title") or "No title") + description = safe_escape(finding.get("description")) + rule_id = safe_escape(finding.get("rule_id") or "unknown") + category = safe_escape(finding.get("category") or "other") + + found_by = finding.get("found_by") or {} + module = safe_escape(found_by.get("module") or "unknown") + tool_name = safe_escape(found_by.get("tool_name") or "Unknown") + detection_type = found_by.get("type") or "tool" + + location = finding.get("location") or {} + file_path = safe_escape(location.get("file")) + line_start = location.get("line_start") + code_snippet = safe_escape(location.get("snippet")) + + cwe = safe_escape(finding.get("cwe")) + owasp = safe_escape(finding.get("owasp")) + recommendation = safe_escape(finding.get("recommendation")) + + llm_context = finding.get("llm_context") + if llm_context: + llm_model = safe_escape(llm_context.get("model")) + prompt_text = llm_context.get("prompt", "") + if prompt_text: + llm_prompt_preview = safe_escape(prompt_text[:100] + "..." if len(prompt_text) > 100 else prompt_text) + else: + llm_prompt_preview = "" + else: + llm_model = "" + llm_prompt_preview = "" + else: + # SARIF format + props = finding.get("properties") or {} + finding_id = props.get("findingId", "")[:8] if props.get("findingId") else "" + level = finding.get("level", "note") + severity_map = {"error": "high", "warning": "medium", "note": "low", "none": "info"} + severity = severity_map.get(level, "info") + confidence = "medium" + rule_id = safe_escape(finding.get("ruleId") or "unknown") + message = finding.get("message") or {} + title = safe_escape(message.get("text") or "No message") + description = title + category = "other" + module = "unknown" + tool_name = "Unknown" + detection_type = "tool" + + locations = finding.get("locations", []) + if locations: + physical_location = locations[0].get("physicalLocation") or {} + artifact_location = physical_location.get("artifactLocation") or {} + region = physical_location.get("region") or {} + file_path = safe_escape(artifact_location.get("uri")) + line_start = region.get("startLine") + snippet_obj = region.get("snippet") or {} + code_snippet = safe_escape(snippet_obj.get("text")) + else: + file_path = "" + line_start = None + code_snippet = "" + + cwe = "" + owasp = "" + recommendation = "" + llm_model = "" + llm_prompt_preview = "" + + location_str = file_path if file_path else "-" + if line_start and file_path: + location_str = f"{file_path}:{line_start}" + + severity_badge = { + "critical": 'CRITICAL', + "high": 'HIGH', + "medium": 'MEDIUM', + "low": 'LOW', + "info": 'INFO' + }.get(severity, 'INFO') + + confidence_badge = { + "high": 'High', + "medium": 'Medium', + "low": 'Low' + }.get(confidence, 'Medium') + + type_icon = { + "llm": "šŸ¤–", + "tool": "šŸ”§", + "fuzzer": "šŸŽÆ", + "manual": "šŸ‘¤" + }.get(detection_type, "šŸ”§") + + # Build details HTML + details_html = f""" + + """ + + findings_rows += f""" + + {finding_id} + {severity_badge} + {title} + {type_icon} {module} + {location_str} + + + {details_html} + + """ + + # Generate HTML html_content = f""" - + - Security Findings - {run_id} + + + Security Findings Report - {run_id} + + + + + + + + + + + + -
-

Security Findings Report

-

Run ID: {run_id}

-

Tool: {tool.get('name', 'Unknown')} v{tool.get('version', 'Unknown')}

-

Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}

+ +
+
+

Security Findings Report

+

{workflow}

+

Run ID: {run_id} | Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}

+
-
-

Summary

-

Total Issues: {len(results)}

+
+ +

Executive Summary

+
+
+
+
{total_findings}
+
Total Findings
+
+
+ +
+
+
{severity_counts['critical'] + severity_counts['high']}
+
Critical + High
+
+
+ +
+
+
{severity_counts['medium']}
+
Medium
+
+
+ +
+
+
{severity_counts['low'] + severity_counts['info']}
+
Low + Info
+
+
+
+ + +

Analysis

+
+
+
+
Severity Distribution
+
+ +
+
+
+ +
+
+
Detection Type
+
+ +
+
+
+ +
+
+
Top Categories
+
+ +
+
+
+ +
+
+
Findings by Source
+
+ +
+
+
+
+ + +

Detailed Findings

+
+ +
+ + +
+ + + + + +
+ +
+ + +
+ + + + + + + + + + + + {findings_rows} + +
IDSeverityFindingSourceLocation
+
+ +

+ {total_findings} of {total_findings} findings shown. + Click on a row to view details. +

+
-
-

Detailed Findings

- - - - - - - - - - -""" - - for result in results: - level = result.get("level", "note") - rule_id = result.get("ruleId", "unknown") - message = result.get("message", {}).get("text", "") - - # Extract location - location_str = "" - locations = result.get("locations", []) - if locations: - physical_location = locations[0].get("physicalLocation", {}) - artifact_location = physical_location.get("artifactLocation", {}) - region = physical_location.get("region", {}) - - file_path = artifact_location.get("uri", "") - if file_path: - location_str = file_path - if region.get("startLine"): - location_str += f":{region['startLine']}" - - html_content += f""" - - - - - - - """ - - html_content += """ - -
Rule IDSeverityMessageLocation
{rule_id}{level}{message}{location_str}
-
+ - """ +""" with open(output_path, 'w', encoding='utf-8') as f: f.write(html_content) @@ -761,9 +2748,10 @@ def all_findings( [cyan]Recent Findings (7 days):[/cyan] {stats['recent_findings']} [bold]Severity Distribution:[/bold] - šŸ”“ Errors: {stats['severity_distribution'].get('error', 0)} - 🟔 Warnings: {stats['severity_distribution'].get('warning', 0)} - šŸ”µ Notes: {stats['severity_distribution'].get('note', 0)} + šŸ”“ Critical: {stats['severity_distribution'].get('critical', 0)} + 🟠 High: {stats['severity_distribution'].get('high', 0) + stats['severity_distribution'].get('error', 0)} + 🟔 Medium: {stats['severity_distribution'].get('medium', 0) + stats['severity_distribution'].get('warning', 0)} + šŸ”µ Low: {stats['severity_distribution'].get('low', 0) + stats['severity_distribution'].get('note', 0)} ā„¹ļø Info: {stats['severity_distribution'].get('info', 0)} [bold]By Workflow:[/bold]""" @@ -804,9 +2792,10 @@ def all_findings( table.add_column("Workflow", style="dim", width=20) table.add_column("Date", justify="center") table.add_column("Issues", justify="center", style="bold") - table.add_column("Errors", justify="center", style="red") - table.add_column("Warnings", justify="center", style="yellow") - table.add_column("Notes", justify="center", style="blue") + table.add_column("Critical", justify="center", style="red") + table.add_column("High", justify="center", style="red") + table.add_column("Medium", justify="center", style="yellow") + table.add_column("Low", justify="center", style="blue") # Get run info for each finding runs_info = {} @@ -825,19 +2814,29 @@ def all_findings( total_issues = summary.get("total_issues", 0) by_severity = summary.get("by_severity", {}) - # Count issues from SARIF data if summary is incomplete - if total_issues == 0 and "runs" in finding.sarif_data: - for run in finding.sarif_data["runs"]: - total_issues += len(run.get("results", [])) + # Count issues from findings_data if summary is incomplete + if total_issues == 0: + if "findings" in finding.findings_data: + total_issues = len(finding.findings_data.get("findings", [])) + elif "runs" in finding.findings_data: + for run in finding.findings_data["runs"]: + total_issues += len(run.get("results", [])) + + # Support both native (critical/high/medium/low) and SARIF (error/warning/note) severities + critical = by_severity.get("critical", 0) + high = by_severity.get("high", 0) + by_severity.get("error", 0) # Map error to high + medium = by_severity.get("medium", 0) + by_severity.get("warning", 0) # Map warning to medium + low = by_severity.get("low", 0) + by_severity.get("note", 0) # Map note to low table.add_row( run_id, # Show full Run ID workflow_name[:17] + "..." if len(workflow_name) > 20 else workflow_name, finding.created_at.strftime("%Y-%m-%d %H:%M"), str(total_issues), - str(by_severity.get("error", 0)), - str(by_severity.get("warning", 0)), - str(by_severity.get("note", 0)) + str(critical), + str(high), + str(medium), + str(low) ) console.print(table) @@ -1073,4 +3072,4 @@ def findings_callback(ctx: typer.Context): return # Default to history when no subcommand provided - findings_history(limit=20) \ No newline at end of file + findings_history(limit=20) diff --git a/cli/src/fuzzforge_cli/commands/workflow_exec.py b/cli/src/fuzzforge_cli/commands/workflow_exec.py index 80db77d9..9edbb3fb 100644 --- a/cli/src/fuzzforge_cli/commands/workflow_exec.py +++ b/cli/src/fuzzforge_cli/commands/workflow_exec.py @@ -301,7 +301,7 @@ def execute_workflow( ), fail_on: Optional[str] = typer.Option( None, "--fail-on", - help="Fail build if findings match severity (critical,high,medium,low,all,none). Use with --wait" + help="Fail build if findings match SARIF level (error,warning,note,info,all,none). Use with --wait" ), export_sarif: Optional[str] = typer.Option( None, "--export-sarif", @@ -423,8 +423,9 @@ def execute_workflow( # Don't fail the whole operation if database save fails console.print(f"āš ļø Failed to save execution to database: {e}", style="yellow") - console.print(f"\nšŸ’” Monitor progress: [bold cyan]fuzzforge monitor live {response.run_id}[/bold cyan]") - console.print(f"šŸ’” Check status: [bold cyan]fuzzforge workflow status {response.run_id}[/bold cyan]") + console.print(f"\nšŸ’” Monitor progress: [bold cyan]ff monitor live {response.run_id}[/bold cyan]") + console.print(f"šŸ’” Check status: [bold cyan]ff workflow status {response.run_id}[/bold cyan]") + console.print(f"šŸ’” View findings: [bold cyan]ff finding {response.run_id}[/bold cyan]") # Suggest --live for fuzzing workflows if not live and not wait and "fuzzing" in workflow.lower(): @@ -501,7 +502,7 @@ def execute_workflow( console.print(f"āš ļø Failed to check findings: {e}", style="yellow") if not fail_on and not export_sarif: - console.print(f"šŸ’” View findings: [bold cyan]fuzzforge findings {response.run_id}[/bold cyan]") + console.print(f"šŸ’” View findings: [bold cyan]ff finding {response.run_id}[/bold cyan]") except KeyboardInterrupt: console.print("\nā¹ļø Monitoring cancelled (execution continues in background)", style="yellow") diff --git a/cli/src/fuzzforge_cli/database.py b/cli/src/fuzzforge_cli/database.py index 3c8e86cf..88615f73 100644 --- a/cli/src/fuzzforge_cli/database.py +++ b/cli/src/fuzzforge_cli/database.py @@ -46,7 +46,7 @@ class FindingRecord(BaseModel): """Database record for findings""" id: Optional[int] = None run_id: str - sarif_data: Dict[str, Any] + findings_data: Dict[str, Any] # Native FuzzForge format summary: Dict[str, Any] = {} created_at: datetime @@ -81,7 +81,7 @@ class FuzzForgeDatabase: CREATE TABLE IF NOT EXISTS findings ( id INTEGER PRIMARY KEY AUTOINCREMENT, run_id TEXT NOT NULL, - sarif_data TEXT NOT NULL, + findings_data TEXT NOT NULL, summary TEXT DEFAULT '{}', created_at TIMESTAMP NOT NULL, FOREIGN KEY (run_id) REFERENCES runs (run_id) @@ -292,21 +292,21 @@ def update_run_status(self, run_id: str, status: str, completed_at: Optional[dat # Findings management methods def save_findings(self, finding: FindingRecord) -> int: - """Save findings and return the ID""" + """Save findings in native FuzzForge format and return the ID""" with self.connection() as conn: cursor = conn.execute(""" - INSERT INTO findings (run_id, sarif_data, summary, created_at) + INSERT INTO findings (run_id, findings_data, summary, created_at) VALUES (?, ?, ?, ?) """, ( finding.run_id, - json.dumps(finding.sarif_data), + json.dumps(finding.findings_data), json.dumps(finding.summary), finding.created_at )) return cursor.lastrowid def get_findings(self, run_id: str) -> Optional[FindingRecord]: - """Get findings for a run""" + """Get findings for a run in native FuzzForge format""" with self.connection() as conn: row = conn.execute( "SELECT * FROM findings WHERE run_id = ? ORDER BY created_at DESC LIMIT 1", @@ -317,14 +317,14 @@ def get_findings(self, run_id: str) -> Optional[FindingRecord]: return FindingRecord( id=row["id"], run_id=row["run_id"], - sarif_data=json.loads(row["sarif_data"]), + findings_data=json.loads(row["findings_data"]), summary=json.loads(row["summary"]), created_at=row["created_at"] ) return None def list_findings(self, limit: int = 50) -> List[FindingRecord]: - """List recent findings""" + """List recent findings in native FuzzForge format""" with self.connection() as conn: rows = conn.execute(""" SELECT * FROM findings @@ -336,7 +336,7 @@ def list_findings(self, limit: int = 50) -> List[FindingRecord]: FindingRecord( id=row["id"], run_id=row["run_id"], - sarif_data=json.loads(row["sarif_data"]), + findings_data=json.loads(row["findings_data"]), summary=json.loads(row["summary"]), created_at=row["created_at"] ) @@ -380,18 +380,17 @@ def get_all_findings(self, finding = FindingRecord( id=row["id"], run_id=row["run_id"], - sarif_data=json.loads(row["sarif_data"]), + findings_data=json.loads(row["findings_data"]), summary=json.loads(row["summary"]), created_at=row["created_at"] ) - # Filter by severity if specified + # Filter by severity if specified (native format) if severity: finding_severities = set() - if "runs" in finding.sarif_data: - for run in finding.sarif_data["runs"]: - for result in run.get("results", []): - finding_severities.add(result.get("level", "note").lower()) + if "findings" in finding.findings_data: + for f in finding.findings_data["findings"]: + finding_severities.add(f.get("severity", "info").lower()) if not any(sev.lower() in finding_severities for sev in severity): continue @@ -408,7 +407,7 @@ def get_findings_by_workflow(self, workflow: str) -> List[FindingRecord]: return self.get_all_findings(workflow=workflow) def get_aggregated_stats(self) -> Dict[str, Any]: - """Get aggregated statistics for all findings using SQL aggregation""" + """Get aggregated statistics for all findings using native format and SQL aggregation""" with self.connection() as conn: # Total findings and runs total_findings = conn.execute("SELECT COUNT(*) FROM findings").fetchone()[0] @@ -429,39 +428,38 @@ def get_aggregated_stats(self) -> Dict[str, Any]: WHERE created_at > datetime('now', '-7 days') """).fetchone()[0] - # Use SQL JSON functions to aggregate severity stats efficiently + # Use SQL JSON functions to aggregate severity stats efficiently (native format) # This avoids loading all findings into memory severity_stats = conn.execute(""" SELECT - SUM(json_array_length(json_extract(sarif_data, '$.runs[0].results'))) as total_issues, + SUM(json_array_length(json_extract(findings_data, '$.findings'))) as total_issues, COUNT(*) as finding_count FROM findings - WHERE json_extract(sarif_data, '$.runs[0].results') IS NOT NULL + WHERE json_extract(findings_data, '$.findings') IS NOT NULL """).fetchone() total_issues = severity_stats["total_issues"] or 0 - # Get severity distribution using SQL + # Get severity distribution using native format (critical/high/medium/low/info) # Note: This is a simplified version - for full accuracy we'd need JSON parsing # But it's much more efficient than loading all data into Python - severity_counts = {"error": 0, "warning": 0, "note": 0, "info": 0} + severity_counts = {"critical": 0, "high": 0, "medium": 0, "low": 0, "info": 0} # Sample the first N findings for severity distribution # This gives a good approximation without loading everything sample_findings = conn.execute(""" - SELECT sarif_data + SELECT findings_data FROM findings LIMIT ? """, (STATS_SAMPLE_SIZE,)).fetchall() for row in sample_findings: try: - data = json.loads(row["sarif_data"]) - if "runs" in data: - for run in data["runs"]: - for result in run.get("results", []): - level = result.get("level", "note").lower() - severity_counts[level] = severity_counts.get(level, 0) + 1 + data = json.loads(row["findings_data"]) + if "findings" in data: + for finding in data["findings"]: + severity = finding.get("severity", "info").lower() + severity_counts[severity] = severity_counts.get(severity, 0) + 1 except (json.JSONDecodeError, KeyError): continue diff --git a/cli/src/fuzzforge_cli/main.py b/cli/src/fuzzforge_cli/main.py index 66b7c25c..239235e1 100644 --- a/cli/src/fuzzforge_cli/main.py +++ b/cli/src/fuzzforge_cli/main.py @@ -263,13 +263,13 @@ def workflow_main(): @finding_app.command("show") def show_finding_detail( run_id: str = typer.Argument(..., help="Run ID to get finding from"), - rule_id: str = typer.Option(..., "--rule", "-r", help="Rule ID of the specific finding to show") + finding_id: str = typer.Option(..., "--id", "-i", help="Unique ID of the specific finding to show") ): """ šŸ” Show detailed information about a specific finding """ from .commands.findings import show_finding - show_finding(run_id=run_id, rule_id=rule_id) + show_finding(run_id=run_id, finding_id=finding_id) @finding_app.callback(invoke_without_command=True) @@ -316,7 +316,7 @@ def finding_main( console.print("āŒ No project database found", style="red") return - get_findings(run_id=finding_id, save=True, format="table") + get_findings(run_id=finding_id, save=True, format="table", limit=None, offset=0) except Exception as e: console.print(f"āŒ Failed to get findings: {e}", style="red") @@ -390,7 +390,7 @@ def main(): console.print(f"šŸ” Displaying finding: {finding_id}") try: - get_findings(run_id=finding_id, save=True, format="table") + get_findings(run_id=finding_id, save=True, format="table", limit=None, offset=0) return except Exception as e: console.print(f"āŒ Failed to get finding: {e}", style="red") diff --git a/docs/docs/reference/cli-reference.md b/docs/docs/reference/cli-reference.md index dd7b4d29..2f160065 100644 --- a/docs/docs/reference/cli-reference.md +++ b/docs/docs/reference/cli-reference.md @@ -304,15 +304,15 @@ View and analyze individual findings. **Usage:** ```bash -ff finding [id] # Show latest or specific finding -ff finding show --rule # Show specific finding detail +ff finding [id] # Show latest or specific finding +ff finding show --id # Show specific finding detail ``` **Examples:** ```bash -ff finding # Show latest finding -ff finding python_sast-abc123 # Show specific run findings -ff finding show python_sast-abc123 --rule f2cf5e3e # Show specific finding +ff finding # Show latest finding +ff finding python_sast-abc123 # Show specific run findings +ff finding show python_sast-abc123 --id f2cf5e3e # Show specific finding ``` --- @@ -445,15 +445,20 @@ ff ai [COMMAND] ``` **Subcommands:** +- `agent` — Start interactive AI agent +- `status` — Check AI agent status +- `server [--port]` — Start AI agent server + +**Planned Features (Coming Soon):** - `analyze ` — Analyze findings with AI - `explain ` — Get AI explanation of a finding - `remediate ` — Get remediation suggestions **Examples:** ```bash -ff ai analyze python_sast-abc123 # Analyze all findings -ff ai explain python_sast-abc123:finding1 # Explain specific finding -ff ai remediate python_sast-abc123:finding1 # Get fix suggestions +ff ai agent # Start interactive AI agent +ff ai status # Check agent status +ff ai server --port 8080 # Start server on custom port ``` --- @@ -466,19 +471,22 @@ Ingest knowledge into the AI knowledge base. **Usage:** ```bash -ff ingest [COMMAND] +ff ingest [path] [OPTIONS] ``` -**Subcommands:** -- `file ` — Ingest a file -- `directory ` — Ingest directory contents -- `workflow ` — Ingest workflow documentation +**Options:** +- `--recursive, -r` — Recursively ingest directory contents +- `--file-types, -t` — Comma-separated file types to ingest (e.g., "md,txt,py") +- `--exclude, -e` — Patterns to exclude +- `--dataset, -d` — Target dataset name +- `--force, -f` — Force reingest even if already processed **Examples:** ```bash -ff ingest file ./docs/security.md # Ingest single file -ff ingest directory ./docs # Ingest directory -ff ingest workflow python_sast # Ingest workflow docs +ff ingest ./docs/security.md # Ingest single file +ff ingest ./docs --recursive # Ingest directory recursively +ff ingest ./src -t "py,js" --exclude "test_*" # Ingest with filters +ff ingest ./docs -d security_docs # Ingest to specific dataset ``` --- diff --git a/sdk/src/fuzzforge_sdk/client.py b/sdk/src/fuzzforge_sdk/client.py index c4f29d38..8e1f5aae 100644 --- a/sdk/src/fuzzforge_sdk/client.py +++ b/sdk/src/fuzzforge_sdk/client.py @@ -393,10 +393,6 @@ def submit_workflow_with_upload( # Prepare multipart form data url = urljoin(self.base_url, f"/workflows/{workflow_name}/upload-and-submit") - files = { - "file": (filename, open(upload_file, "rb"), "application/gzip") - } - data = {} if parameters: @@ -418,13 +414,15 @@ def track_progress(monitor): # This is a placeholder - real implementation would need custom approach pass - response = self._client.post(url, files=files, data=data) - - # Close file handle - files["file"][1].close() + # Use context manager to ensure file handle is closed + with open(upload_file, "rb") as f: + files = { + "file": (filename, f, "application/gzip") + } + response = self._client.post(url, files=files, data=data) - data = self._handle_response(response) - return RunSubmissionResponse(**data) + response_data = self._handle_response(response) + return RunSubmissionResponse(**response_data) finally: # Cleanup temporary tarball @@ -480,10 +478,6 @@ async def asubmit_workflow_with_upload( # Prepare multipart form data url = urljoin(self.base_url, f"/workflows/{workflow_name}/upload-and-submit") - files = { - "file": (filename, open(upload_file, "rb"), "application/gzip") - } - data = {} if parameters: @@ -494,10 +488,12 @@ async def asubmit_workflow_with_upload( logger.info(f"Uploading {filename} to {workflow_name}...") - response = await self._async_client.post(url, files=files, data=data) - - # Close file handle - files["file"][1].close() + # Use context manager to ensure file handle is closed + with open(upload_file, "rb") as f: + files = { + "file": (filename, f, "application/gzip") + } + response = await self._async_client.post(url, files=files, data=data) response_data = await self._ahandle_response(response) return RunSubmissionResponse(**response_data) diff --git a/sdk/src/fuzzforge_sdk/exceptions.py b/sdk/src/fuzzforge_sdk/exceptions.py index c5876582..31b31842 100644 --- a/sdk/src/fuzzforge_sdk/exceptions.py +++ b/sdk/src/fuzzforge_sdk/exceptions.py @@ -415,16 +415,20 @@ def from_http_error(status_code: int, response_text: str, url: str) -> FuzzForge if "/workflows/" in url and "/submit" not in url: # Extract workflow name from URL parts = url.split("/workflows/") - if len(parts) > 1: - workflow_name = parts[1].split("/")[0] - return WorkflowNotFoundError(workflow_name, context=context) + if len(parts) > 1 and parts[1]: + workflow_segments = parts[1].split("/") + if workflow_segments and workflow_segments[0]: + workflow_name = workflow_segments[0] + return WorkflowNotFoundError(workflow_name, context=context) elif "/runs/" in url: # Extract run ID from URL parts = url.split("/runs/") - if len(parts) > 1: - run_id = parts[1].split("/")[0] - return RunNotFoundError(run_id, context) + if len(parts) > 1 and parts[1]: + run_segments = parts[1].split("/") + if run_segments and run_segments[0]: + run_id = run_segments[0] + return RunNotFoundError(run_id, context) elif status_code == 400: # Check for specific error patterns in response diff --git a/workers/ossfuzz/activities.py b/workers/ossfuzz/activities.py index 7b0ef7cd..664a879e 100644 --- a/workers/ossfuzz/activities.py +++ b/workers/ossfuzz/activities.py @@ -368,11 +368,11 @@ def parse_fuzzing_stats(stdout: str, stderr: str, engine: str) -> Dict[str, Any] # Example: #8192 NEW cov: 1234 ft: 5678 corp: 89/10KB parts = line.split() for i, part in enumerate(parts): - if part.startswith("cov:"): + if part.startswith("cov:") and i+1 < len(parts): stats["coverage"] = int(parts[i+1]) - elif part.startswith("corp:"): + elif part.startswith("corp:") and i+1 < len(parts): stats["corpus_entries"] = int(parts[i+1].split('/')[0]) - elif part.startswith("exec/s:"): + elif part.startswith("exec/s:") and i+1 < len(parts): stats["executions_per_sec"] = float(parts[i+1]) elif part.startswith("#"): stats["total_executions"] = int(part[1:])