From f8f947698c074ebdf21825199a374a3399493260 Mon Sep 17 00:00:00 2001 From: "Paige T." Date: Mon, 2 Mar 2026 01:36:01 +0000 Subject: [PATCH] - refactored to use pyproject.toml; deleted requirements.txt, setup.cfg, and setup.py, so there's just one--pyproject.toml - cargo fmt'd rust source; readability - added rust artifacts to gitignore - add dev dependencies to pyproject.toml; black, flake8, pytest. Added bs4 to dev dependencies (reporting test) - reformatted python code with black; readability - add doc generation workflow; cargo doc and sphinx; API docs get generated for both and you can customize the docs starting from docs/source/index.rst --- .github/workflows/generate-docs.yml | 47 ++ .gitignore | 7 +- docs/build/.gitignore | 1 + docs/source/.gitignore | 3 + docs/source/_static/.gitignore | 1 + docs/source/conf.py | 27 + docs/source/index.rst | 24 + plugins/aipocgen.py | 226 ++++---- pyproject.toml | 53 ++ requirements.txt | 11 - setup.cfg | 12 - setup.py | 55 -- src/main.rs | 57 +- .../_rust_core/src/analysis/ast_analysis.rs | 51 +- .../src/analysis/config_analysis.rs | 26 +- src/pyspector/_rust_core/src/analysis/mod.rs | 62 ++- .../_rust_core/src/analysis/taint_analysis.rs | 297 ++++++---- src/pyspector/_rust_core/src/ast_parser.rs | 2 +- .../src/graph/call_graph_builder.rs | 30 +- .../_rust_core/src/graph/cfg_builder.rs | 48 +- src/pyspector/_rust_core/src/graph/mod.rs | 2 +- .../_rust_core/src/graph/representation.rs | 19 +- src/pyspector/_rust_core/src/issues.rs | 11 +- src/pyspector/_rust_core/src/lib.rs | 29 +- src/pyspector/_rust_core/src/rules.rs | 8 +- src/pyspector/_rust_core/src/supply_chain.rs | 304 +++++----- src/pyspector/cli.py | 527 ++++++++++++------ src/pyspector/config.py | 44 +- src/pyspector/plugin_system.py | 312 ++++++----- src/pyspector/reporting.py | 70 ++- src/pyspector/triage.py | 130 +++-- tests/examples/hardcoded_anthropic_key.py | 1 + tests/unit/reporting_test.py | 19 +- 33 files changed, 1531 insertions(+), 985 deletions(-) create mode 100644 .github/workflows/generate-docs.yml create mode 100644 docs/build/.gitignore create mode 100644 docs/source/.gitignore create mode 100644 docs/source/_static/.gitignore create mode 100644 docs/source/conf.py create mode 100644 docs/source/index.rst delete mode 100644 requirements.txt delete mode 100644 setup.cfg delete mode 100644 setup.py diff --git a/.github/workflows/generate-docs.yml b/.github/workflows/generate-docs.yml new file mode 100644 index 0000000..b730157 --- /dev/null +++ b/.github/workflows/generate-docs.yml @@ -0,0 +1,47 @@ +name: Generate Documentation + +on: + push: + branches: + - '*' + pull_request: + branches: + - '*' + +jobs: + build-docs: + runs-on: ubuntu-latest + + steps: + - name: Checkout code + uses: actions/checkout@v2 + + - name: Set up Python + uses: actions/setup-python@v2 + with: + python-version: 3.9 # Adjust as needed + + - name: Set up Rust + uses: actions/setup-rust@v1 + with: + rust-version: 1.59 # Ensure this matches your Rust version + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install ."[dev]" + + - name: Generate Rust Documentation (cargo doc) + run: | + cargo doc --no-deps --document-private-items --target-dir docs/build/rust + + - name: Generate Python Documentation with Sphinx + run: | + sphinx-apidoc -o docs/source/ src/pyspector + sphinx-build docs/source docs/build + + - name: Upload documentation artifact + uses: actions/upload-artifact@v2 + with: + name: docs + path: docs/build/ diff --git a/.gitignore b/.gitignore index 204739f..58a6de4 100644 --- a/.gitignore +++ b/.gitignore @@ -9,6 +9,7 @@ __pycache__/ # Distribution / packaging .Python build/ +!docs/build/.gitignore develop-eggs/ dist/ downloads/ @@ -60,4 +61,8 @@ venv.bak/ # IDEs .idea/ -.vscode/ \ No newline at end of file +.vscode/ + +# Rust +Cargo.lock +target/ diff --git a/docs/build/.gitignore b/docs/build/.gitignore new file mode 100644 index 0000000..4406e4c --- /dev/null +++ b/docs/build/.gitignore @@ -0,0 +1 @@ +* diff --git a/docs/source/.gitignore b/docs/source/.gitignore new file mode 100644 index 0000000..0dc0089 --- /dev/null +++ b/docs/source/.gitignore @@ -0,0 +1,3 @@ +modules.rst +pyspector.rst +pyspector.rules.rst diff --git a/docs/source/_static/.gitignore b/docs/source/_static/.gitignore new file mode 100644 index 0000000..792d600 --- /dev/null +++ b/docs/source/_static/.gitignore @@ -0,0 +1 @@ +# diff --git a/docs/source/conf.py b/docs/source/conf.py new file mode 100644 index 0000000..4723d53 --- /dev/null +++ b/docs/source/conf.py @@ -0,0 +1,27 @@ +# Configuration file for the Sphinx documentation builder. +# +# For the full list of built-in configuration values, see the documentation: +# https://www.sphinx-doc.org/en/master/usage/configuration.html + +# -- Project information ----------------------------------------------------- +# https://www.sphinx-doc.org/en/master/usage/configuration.html#project-information + +project = 'PySpector' +copyright = '2026, ParzivalHack' +author = 'ParzivalHack' + +# -- General configuration --------------------------------------------------- +# https://www.sphinx-doc.org/en/master/usage/configuration.html#general-configuration + +extensions = ['sphinx.ext.autodoc'] + +templates_path = ['_templates'] +exclude_patterns = [] + + + +# -- Options for HTML output ------------------------------------------------- +# https://www.sphinx-doc.org/en/master/usage/configuration.html#options-for-html-output + +html_theme = 'sphinx_rtd_theme' +html_static_path = ['_static'] diff --git a/docs/source/index.rst b/docs/source/index.rst new file mode 100644 index 0000000..60fe82f --- /dev/null +++ b/docs/source/index.rst @@ -0,0 +1,24 @@ +.. PySpector documentation master file, created by + sphinx-quickstart on Mon Mar 2 03:40:13 2026. + You can adapt this file completely to your liking, but it should at least + contain the root `toctree` directive. + +PySpector documentation +======================= + +Add your content using ``reStructuredText`` syntax. See the +`reStructuredText `_ +documentation for details. + +.. toctree:: + :maxdepth: 2 + :caption: Contents: + + modules + +Rust Documentation +=================== + +You can view the generated Rust documentation here: + +`Rust Documentation (cargo doc) `_ diff --git a/plugins/aipocgen.py b/plugins/aipocgen.py index a6b81e7..d7119ae 100644 --- a/plugins/aipocgen.py +++ b/plugins/aipocgen.py @@ -47,6 +47,7 @@ @dataclass class Vulnerability: """Represents a vulnerability finding""" + vuln_type: str file_path: str line_number: int @@ -60,14 +61,14 @@ class GroqPoCGeneratorPlugin(PySpectorPlugin): Plugin that generates Proof of Concept exploits using Groq AI. Analyzes PySpector findings and creates safe, educational PoC code. """ - + MODELS = { - 'llama-3.1-70b': 'llama-3.1-70b-versatile', - 'llama-3.1-8b': 'llama-3.1-8b-instant', - 'mixtral-8x7b': 'mixtral-8x7b-32768', - 'llama-3.3-70b': 'llama-3.3-70b-versatile', + "llama-3.1-70b": "llama-3.1-70b-versatile", + "llama-3.1-8b": "llama-3.1-8b-instant", + "mixtral-8x7b": "mixtral-8x7b-32768", + "llama-3.3-70b": "llama-3.3-70b-versatile", } - + @property def metadata(self) -> PluginMetadata: return PluginMetadata( @@ -76,20 +77,20 @@ def metadata(self) -> PluginMetadata: author="Tommaso Bona", description="Generate Proof of Concept exploits/PoCs, directly based on PySpector's scan findings, using Groq AI", requires=["groq"], - category="security" + category="security", ) - + def initialize(self, config: Dict[str, Any]) -> bool: """Initialize the plugin with configuration""" self.config = dict(config) - self.dry_run = bool(config.get('dry_run', False)) - self.api_key = config.get('api_key') or os.environ.get('GROQ_API_KEY') - self.model = config.get('model', 'llama-3.3-70b') - self.model_id = self.MODELS.get(self.model, self.MODELS['llama-3.3-70b']) - severity_source = config.get('severity_filter', ['HIGH', 'CRITICAL']) + self.dry_run = bool(config.get("dry_run", False)) + self.api_key = config.get("api_key") or os.environ.get("GROQ_API_KEY") + self.model = config.get("model", "llama-3.3-70b") + self.model_id = self.MODELS.get(self.model, self.MODELS["llama-3.3-70b"]) + severity_source = config.get("severity_filter", ["HIGH", "CRITICAL"]) self.severity_filter = [str(sev).upper() for sev in severity_source] - self.max_pocs = int(config.get('max_pocs', 5)) - self.output_dir = config.get('output_dir', 'pocs') + self.max_pocs = int(config.get("max_pocs", 5)) + self.output_dir = config.get("output_dir", "pocs") self.client = None if self.dry_run: @@ -122,128 +123,139 @@ def initialize(self, config: Dict[str, Any]) -> bool: print(f" Max PoCs: {self.max_pocs}") return True - + def validate_config(self, config: Dict[str, Any]) -> tuple[bool, str]: """Validate plugin configuration""" - model = config.get('model', 'llama-3.3-70b') + model = config.get("model", "llama-3.3-70b") if model not in self.MODELS: - return False, f"Invalid model: {model}. Choose from: {', '.join(self.MODELS.keys())}" + return ( + False, + f"Invalid model: {model}. Choose from: {', '.join(self.MODELS.keys())}", + ) - dry_run = config.get('dry_run', False) + dry_run = config.get("dry_run", False) if not isinstance(dry_run, bool): return False, "dry_run must be a boolean value" - severity_filter = config.get('severity_filter', ['HIGH', 'CRITICAL']) + severity_filter = config.get("severity_filter", ["HIGH", "CRITICAL"]) if not isinstance(severity_filter, list) or not severity_filter: return False, "severity_filter must be a non-empty list of severities" - valid_severities = {'LOW', 'MEDIUM', 'HIGH', 'CRITICAL'} + valid_severities = {"LOW", "MEDIUM", "HIGH", "CRITICAL"} for sev in severity_filter: if str(sev).upper() not in valid_severities: - return False, f"Invalid severity: {sev}. Choose from: {', '.join(sorted(valid_severities))}" + return ( + False, + f"Invalid severity: {sev}. Choose from: {', '.join(sorted(valid_severities))}", + ) - max_pocs = config.get('max_pocs', 5) + max_pocs = config.get("max_pocs", 5) if not isinstance(max_pocs, int) or max_pocs < 1: return False, "max_pocs must be a positive integer" - output_dir = config.get('output_dir', 'pocs') + output_dir = config.get("output_dir", "pocs") if not isinstance(output_dir, str) or not output_dir.strip(): return False, "output_dir must be a non-empty string" - if not dry_run and not config.get('api_key') and not os.environ.get('GROQ_API_KEY'): - return False, "Provide api_key in config or set GROQ_API_KEY environment variable (or enable dry_run)" + if ( + not dry_run + and not config.get("api_key") + and not os.environ.get("GROQ_API_KEY") + ): + return ( + False, + "Provide api_key in config or set GROQ_API_KEY environment variable (or enable dry_run)", + ) return True, "" - + def process_findings( - self, - findings: List[Dict[str, Any]], - scan_path: Path, - **kwargs + self, findings: List[Dict[str, Any]], scan_path: Path, **kwargs ) -> Dict[str, Any]: """Process findings and generate PoCs""" print(f"\n{'='*60}") print("Groq PoC Generator") print(f"{'='*60}") if self.dry_run: - print("[*] Offline mode enabled; generating PoC scaffolds without Groq API access.") - + print( + "[*] Offline mode enabled; generating PoC scaffolds without Groq API access." + ) + # Filter by severity filtered = [ - f for f in findings - if f.get('severity', '').upper() in self.severity_filter + f for f in findings if f.get("severity", "").upper() in self.severity_filter ] - + if not filtered: return { - 'success': True, - 'message': f"No findings match severity filter: {', '.join(self.severity_filter)}", - 'data': {'pocs_generated': 0} + "success": True, + "message": f"No findings match severity filter: {', '.join(self.severity_filter)}", + "data": {"pocs_generated": 0}, } - + print(f"[*] Found {len(filtered)} vulnerabilities matching criteria") print(f"[*] Generating up to {self.max_pocs} PoCs...\n") - + # Generate PoCs pocs = {} output_files = [] - - for i, finding in enumerate(filtered[:self.max_pocs]): + + for i, finding in enumerate(filtered[: self.max_pocs]): vuln = Vulnerability( - vuln_type=finding.get('rule_id', 'Unknown'), - file_path=finding.get('file', 'Unknown'), - line_number=finding.get('line', 0), - code_snippet=finding.get('code', ''), - severity=finding.get('severity', 'UNKNOWN').upper(), - description=finding.get('description', '') + vuln_type=finding.get("rule_id", "Unknown"), + file_path=finding.get("file", "Unknown"), + line_number=finding.get("line", 0), + code_snippet=finding.get("code", ""), + severity=finding.get("severity", "UNKNOWN").upper(), + description=finding.get("description", ""), ) - + print(f"{'='*60}") print(f"[*] Generating PoC {i+1}/{min(self.max_pocs, len(filtered))}") print(f"[*] Vulnerability: {vuln.vuln_type}") print(f"[*] Location: {vuln.file_path}:{vuln.line_number}") print(f"[*] Severity: {vuln.severity}") print(f"{'='*60}") - + poc_code = self._generate_poc(vuln) - + if poc_code: # Save PoC poc_filename = self._create_filename(vuln) poc_path = self._save_poc(poc_code, poc_filename, scan_path, vuln) - + if poc_path: output_files.append(str(poc_path)) pocs[poc_filename] = { - 'vulnerability': { - 'type': vuln.vuln_type, - 'file': vuln.file_path, - 'line': vuln.line_number, - 'severity': vuln.severity + "vulnerability": { + "type": vuln.vuln_type, + "file": vuln.file_path, + "line": vuln.line_number, + "severity": vuln.severity, }, - 'poc_path': str(poc_path) + "poc_path": str(poc_path), } print(f"[+] PoC saved: {poc_path}") else: print(f"[!] Failed to generate PoC for {vuln.vuln_type}") - + print() - + # Save summary summary_path = self._save_summary(pocs, scan_path) if summary_path: output_files.append(str(summary_path)) - + return { - 'success': True, - 'message': f"Generated {len(pocs)} PoCs", - 'data': { - 'pocs_generated': len(pocs), - 'output_directory': str(Path(scan_path) / self.output_dir) + "success": True, + "message": f"Generated {len(pocs)} PoCs", + "data": { + "pocs_generated": len(pocs), + "output_directory": str(Path(scan_path) / self.output_dir), }, - 'output_files': output_files + "output_files": output_files, } - + def _generate_poc(self, vuln: Vulnerability) -> Optional[str]: """Generate PoC using Groq API""" if self.dry_run: @@ -313,30 +325,30 @@ def _generate_poc(self, vuln: Vulnerability) -> Optional[str]: model=self.model_id, messages=[ {"role": "system", "content": system_prompt}, - {"role": "user", "content": user_prompt} + {"role": "user", "content": user_prompt}, ], temperature=0.3, max_tokens=4096, top_p=0.9, ) - + content = response.choices[0].message.content - + if content: # Extract code from markdown if present return self._extract_code(content) - + return None - + except Exception as e: print(f"[!] Error calling Groq API: {e}") return None - + def _generate_offline_poc(self, vuln: Vulnerability) -> str: """Generate an offline PoC scaffold when Groq access is unavailable""" snippet = (vuln.code_snippet or "").strip() snippet_repr = repr(snippet or "# Code snippet unavailable") - + lines = [ "#!/usr/bin/env python3", '"""', @@ -351,8 +363,8 @@ def _generate_offline_poc(self, vuln: Vulnerability) -> str: "def explain():", ' """Describe the vulnerability context."""', " return (", - f" \"PySpector flagged {vuln.vuln_type} in {vuln.file_path}:{vuln.line_number}. \"", - " \"Use this scaffold and consult mitigation guidance once Groq access is available.\"", + f' "PySpector flagged {vuln.vuln_type} in {vuln.file_path}:{vuln.line_number}. "', + ' "Use this scaffold and consult mitigation guidance once Groq access is available."', " )", "", "def main():", @@ -365,35 +377,31 @@ def _generate_offline_poc(self, vuln: Vulnerability) -> str: " main()", "", ] - - return '\n'.join(lines) - + + return "\n".join(lines) + def _extract_code(self, response: str) -> str: """Extract Python code from response""" - if '```python' in response: - parts = response.split('```python') + if "```python" in response: + parts = response.split("```python") if len(parts) > 1: - code = parts[1].split('```')[0] + code = parts[1].split("```")[0] return code.strip() - elif '```' in response: - parts = response.split('```') + elif "```" in response: + parts = response.split("```") if len(parts) > 1: return parts[1].strip() - + return response.strip() - + def _create_filename(self, vuln: Vulnerability) -> str: """Create a safe filename for the PoC""" - safe_type = vuln.vuln_type.replace('/', '_').replace('\\', '_') - safe_file = Path(vuln.file_path).stem.replace('.', '_') + safe_type = vuln.vuln_type.replace("/", "_").replace("\\", "_") + safe_file = Path(vuln.file_path).stem.replace(".", "_") return f"{safe_type}_{safe_file}_line{vuln.line_number}.py" - + def _save_poc( - self, - poc_code: str, - filename: str, - scan_path: Path, - vuln: Vulnerability + self, poc_code: str, filename: str, scan_path: Path, vuln: Vulnerability ) -> Optional[Path]: """Save PoC to file""" try: @@ -403,18 +411,18 @@ def _save_poc( output_dir = base_path / self.output_dir output_dir.mkdir(parents=True, exist_ok=True) - + poc_path = output_dir / filename - - with open(poc_path, 'w', encoding='utf-8') as f: + + with open(poc_path, "w", encoding="utf-8") as f: f.write(poc_code) - + return poc_path - + except Exception as e: print(f"[!] Error saving PoC: {e}") return None - + def _save_summary(self, pocs: Dict, scan_path: Path) -> Optional[Path]: """Save summary JSON""" try: @@ -424,17 +432,17 @@ def _save_summary(self, pocs: Dict, scan_path: Path) -> Optional[Path]: output_dir = base_path / self.output_dir summary_path = output_dir / "pocs_summary.json" - - with open(summary_path, 'w', encoding='utf-8') as f: + + with open(summary_path, "w", encoding="utf-8") as f: json.dump(pocs, f, indent=2) - + print(f"[+] Summary saved: {summary_path}") return summary_path - + except Exception as e: print(f"[!] Error saving summary: {e}") return None - + def cleanup(self) -> None: """Cleanup resources""" pass @@ -443,10 +451,10 @@ def cleanup(self) -> None: # This allows the plugin to be tested standalone if __name__ == "__main__": print("Groq PoC Generator Plugin for PySpector") - print("="*60) + print("=" * 60) print(f"Version: {GroqPoCGeneratorPlugin().metadata.version}") print(f"Author: {GroqPoCGeneratorPlugin().metadata.author}") print(f"Description: {GroqPoCGeneratorPlugin().metadata.description}") - print("="*60) + print("=" * 60) print("\nThis is a PySpector plugin.") print("Install with: pyspector plugin install aipogen.py") diff --git a/pyproject.toml b/pyproject.toml index 1e62e38..8e0fbad 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,3 +4,56 @@ requires = [ "setuptools-rust>=1.0" ] build-backend = "setuptools.build_meta" + +[project] +name = "pyspector" +# Version is automatically read from Cargo.toml [package].version +dynamic = ["version"] +authors = [{name = "ParzivalHack"}] +description = "A high-performance, security-focused static analysis tool for Python, powered by Rust." +readme = "README.md" +requires-python = ">=3.8" +keywords = ["static analysis", "security", "python", "rust"] +license = {text = "MIT"} +dependencies = [ + "click>=8.0", + "toml>=0.10", + "requests>=2.25", + "sarif-om>=1.0", + "jinja2>=3.0", + "textual>=0.60", + 'importlib_resources; python_version < "3.9"', +] + +# Add development dependencies under [project.optional-dependencies] +[project.optional-dependencies] +dev = [ + "black", + "pytest", + "flake8", + "sphinx", # Sphinx for generating documentation + "sphinx-autodoc-typehints", # To automatically include type hints in docs + "sphinx-rtd-theme" # For a nice ReadTheDocs theme + "bs4" # Test dependency +] + +[project.scripts] +pyspector = "pyspector.cli:cli" + +[tool.setuptools] +include-package-data = true +zip-safe = false + +[tool.setuptools.packages.find] +where = ["src"] + +[tool.setuptools.package-dir] +"" = "src" + +[tool.setuptools.package-data] +pyspector = ["rules/*.toml"] + +# Rust extension module configuration +[[tool.setuptools-rust.ext-modules]] +target = "pyspector._rust_core" +path = "src/pyspector/_rust_core/Cargo.toml" diff --git a/requirements.txt b/requirements.txt deleted file mode 100644 index 15d1951..0000000 --- a/requirements.txt +++ /dev/null @@ -1,11 +0,0 @@ -# Core application dependencies -click>=8.0 -toml>=0.10 -requests>=2.25 -sarif-om>=1.0 -jinja2>=3.0 # For HTML reporting -# For the interactive TUI -textual>=0.60 -# For building the Rust extension -maturin>=1.0 -setuptools-rust>=1.0 \ No newline at end of file diff --git a/setup.cfg b/setup.cfg deleted file mode 100644 index 4eec812..0000000 --- a/setup.cfg +++ /dev/null @@ -1,12 +0,0 @@ -[metadata] -name = pyspector -version = 0.1.6 - -[options] -package_dir= - =src -packages=find: -include_package_data = True - -[options.packages.find] -where=src \ No newline at end of file diff --git a/setup.py b/setup.py deleted file mode 100644 index 07ee19a..0000000 --- a/setup.py +++ /dev/null @@ -1,55 +0,0 @@ -import os -from setuptools import setup, find_packages -from setuptools_rust import RustExtension - -# Get the project version from the Rust crate to ensure they are always in sync. -cargo_toml_path = os.path.join(os.path.dirname(__file__), "src/pyspector/_rust_core/Cargo.toml") -with open(cargo_toml_path, "r") as f: - for line in f: - if line.startswith("version ="): - version = line.strip().split("=")[1].strip().strip('"') - break - else: - raise RuntimeError("Could not find version in Cargo.toml") - -with open("README.md", encoding="utf-8") as f: - long_description = f.read() - -setup( - name="pyspector", - version=version, - author="ParzivalHack", - keywords="static analysis security python rust", - description="A high-performance, security-focused static analysis tool for Python, powered by Rust.", - long_description=long_description, - long_description_content_type="text/markdown", - license="MIT", - packages=find_packages(where="src"), - package_dir={"": "src"}, - rust_extensions=[ - RustExtension( - "pyspector._rust_core", - path=cargo_toml_path, - ) - ], - python_requires=">=3.8", - install_requires=[ - "click>=8.0", - "toml>=0.10", - "requests>=2.25", - "sarif-om>=1.0", - "jinja2>=3.0", - "textual>=0.60", - 'importlib_resources; python_version < "3.9"', - ], - entry_points={ - "console_scripts": [ - "pyspector = pyspector.cli:cli", - ], - }, - include_package_data=True, - package_data={ - "pyspector": ["rules/*.toml"], - }, - zip_safe=False, -) diff --git a/src/main.rs b/src/main.rs index 3d0399b..33a20dc 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,11 +1,11 @@ -use actix_web::{post, web, App, HttpServer, HttpResponse, Responder}; +use actix_cors::Cors; use actix_governor::{Governor, GovernorConfigBuilder}; +use actix_web::{post, web, App, HttpResponse, HttpServer, Responder}; use pyo3::prelude::*; -use actix_cors::Cors; use pyo3::types::PyDict; use serde::Deserialize; -use std::process::Command; use std::fs; +use std::process::Command; #[derive(Deserialize)] struct ScanRequest { @@ -60,20 +60,23 @@ async fn scan(req: web::Json) -> impl Responder { let result = Python::with_gil(|py| -> Result { // Import the required modules let pyspector_cli = py.import("pyspector.cli").map_err(|e| { - format!("Failed to import pyspector.cli: {}. Is PySpector installed?", e) - })?; - - let pyspector_config = py.import("pyspector.config").map_err(|e| { - format!("Failed to import pyspector.config: {}", e) - })?; - - let pyspector_reporting = py.import("pyspector.reporting").map_err(|e| { - format!("Failed to import pyspector.reporting: {}", e) + format!( + "Failed to import pyspector.cli: {}. Is PySpector installed?", + e + ) })?; - let pyspector_rust_core = py.import("pyspector._rust_core").map_err(|e| { - format!("Failed to import pyspector._rust_core: {}", e) - })?; + let pyspector_config = py + .import("pyspector.config") + .map_err(|e| format!("Failed to import pyspector.config: {}", e))?; + + let pyspector_reporting = py + .import("pyspector.reporting") + .map_err(|e| format!("Failed to import pyspector.reporting: {}", e))?; + + let pyspector_rust_core = py + .import("pyspector._rust_core") + .map_err(|e| format!("Failed to import pyspector._rust_core: {}", e))?; // Load configuration let config = pyspector_config @@ -86,10 +89,10 @@ async fn scan(req: web::Json) -> impl Responder { .map_err(|e| format!("Failed to get default rules: {}", e))?; // Create Path object for the scan target - let pathlib = py.import("pathlib").map_err(|e| { - format!("Failed to import pathlib: {}", e) - })?; - + let pathlib = py + .import("pathlib") + .map_err(|e| format!("Failed to import pathlib: {}", e))?; + let path_obj = pathlib .call_method1("Path", (&target_path,)) .map_err(|e| format!("Failed to create Path object: {}", e))?; @@ -112,7 +115,7 @@ async fn scan(req: web::Json) -> impl Responder { let reporter = pyspector_reporting .call_method1("Reporter", (raw_issues, report_format)) .map_err(|e| format!("Failed to create reporter: {}", e))?; - + let output: String = reporter .call_method0("generate") .map_err(|e| format!("Failed to generate report: {}", e))? @@ -138,15 +141,11 @@ async fn scan(req: web::Json) -> impl Responder { .content_type("application/json") .body(output) } else { - HttpResponse::Ok() - .content_type("text/plain") - .body(output) + HttpResponse::Ok().content_type("text/plain").body(output) } } - Ok(Err(e)) => HttpResponse::InternalServerError() - .body(format!("Scan failed: {}", e)), - Err(e) => HttpResponse::InternalServerError() - .body(format!("Internal error: {}", e)), + Ok(Err(e)) => HttpResponse::InternalServerError().body(format!("Scan failed: {}", e)), + Err(e) => HttpResponse::InternalServerError().body(format!("Internal error: {}", e)), } } @@ -162,7 +161,7 @@ async fn main() -> std::io::Result<()> { HttpServer::new(move || { let cors = Cors::permissive(); - + App::new() .wrap(cors) .wrap(Governor::new(&gov_conf)) @@ -171,4 +170,4 @@ async fn main() -> std::io::Result<()> { .bind(("0.0.0.0", 10000))? .run() .await -} \ No newline at end of file +} diff --git a/src/pyspector/_rust_core/src/analysis/ast_analysis.rs b/src/pyspector/_rust_core/src/analysis/ast_analysis.rs index 8b7c17a..572def9 100644 --- a/src/pyspector/_rust_core/src/analysis/ast_analysis.rs +++ b/src/pyspector/_rust_core/src/analysis/ast_analysis.rs @@ -1,26 +1,40 @@ use crate::ast_parser::AstNode; use crate::issues::Issue; -use crate::rules::{RuleSet, Rule}; +use crate::rules::{Rule, RuleSet}; // Main entry point for AST scanning pub fn scan_ast(ast: &AstNode, file_path: &str, content: &str, ruleset: &RuleSet) -> Vec { let mut issues = Vec::new(); - let ast_rules: Vec<&Rule> = ruleset.rules.iter() + let ast_rules: Vec<&Rule> = ruleset + .rules + .iter() .filter(|r| r.ast_match.is_some()) .collect(); - - if ast_rules.is_empty() { return issues; } + + if ast_rules.is_empty() { + return issues; + } walk_ast(ast, file_path, content, &ast_rules, &mut issues); issues } // Recursively walks the AST, checking each node against the rules -fn walk_ast(node: &AstNode, file_path: &str, content: &str, rules: &[&Rule], issues: &mut Vec) { +fn walk_ast( + node: &AstNode, + file_path: &str, + content: &str, + rules: &[&Rule], + issues: &mut Vec, +) { for rule in rules.iter() { if let Some(match_pattern) = &rule.ast_match { if check_node_match(node, match_pattern) { - let line_content = content.lines().nth(node.lineno.saturating_sub(1) as usize).unwrap_or("").to_string(); + let line_content = content + .lines() + .nth(node.lineno.saturating_sub(1) as usize) + .unwrap_or("") + .to_string(); issues.push(Issue::new( rule.id.clone(), rule.description.clone(), @@ -47,13 +61,18 @@ fn check_node_match(node: &AstNode, match_pattern: &str) -> bool { let (node_type_match, props_str) = if let Some(open_paren) = match_pattern.find('(') { ( &match_pattern[..open_paren], - Some(&match_pattern[open_paren + 1..match_pattern.rfind(')').unwrap_or(match_pattern.len())]) + Some( + &match_pattern + [open_paren + 1..match_pattern.rfind(')').unwrap_or(match_pattern.len())], + ), ) } else { (match_pattern, None) }; - if node.node_type != node_type_match { return false; } + if node.node_type != node_type_match { + return false; + } if let Some(props) = props_str { for prop in props.split(',') { @@ -64,12 +83,14 @@ fn check_node_match(node: &AstNode, match_pattern: &str) -> bool { } } } - + true } fn node_has_property(node: &AstNode, path: &[&str], expected_value: &str) -> bool { - if path.is_empty() { return false; } + if path.is_empty() { + return false; + } let current_part = path[0]; let remaining_path = &path[1..]; @@ -78,9 +99,11 @@ fn node_has_property(node: &AstNode, path: &[&str], expected_value: &str) -> boo if let Some(field_value) = node.fields.get(current_part).and_then(|v| v.as_ref()) { return match field_value { serde_json::Value::String(s) => s == expected_value, - serde_json::Value::Bool(b) => b.to_string().to_lowercase() == expected_value.to_lowercase(), + serde_json::Value::Bool(b) => { + b.to_string().to_lowercase() == expected_value.to_lowercase() + } serde_json::Value::Number(n) => n.to_string() == expected_value, - _ => false + _ => false, }; } } @@ -99,6 +122,6 @@ fn node_has_property(node: &AstNode, path: &[&str], expected_value: &str) -> boo } } } - + false -} \ No newline at end of file +} diff --git a/src/pyspector/_rust_core/src/analysis/config_analysis.rs b/src/pyspector/_rust_core/src/analysis/config_analysis.rs index edd702a..99ab736 100644 --- a/src/pyspector/_rust_core/src/analysis/config_analysis.rs +++ b/src/pyspector/_rust_core/src/analysis/config_analysis.rs @@ -47,27 +47,29 @@ pub fn scan_file(file_path: &str, content: &str, ruleset: &RuleSet) -> Vec bool { let trimmed = line.trim(); - + // Skip obvious comments if trimmed.starts_with('#') { return true; } - + // Skip lines that are entirely string literals (docstrings) - if (trimmed.starts_with("\"\"\"") && trimmed.ends_with("\"\"\"") && trimmed.len() > 6) || - (trimmed.starts_with("'''") && trimmed.ends_with("'''") && trimmed.len() > 6) || - (trimmed.starts_with('"') && trimmed.ends_with('"') && !trimmed.contains(" = ")) || - (trimmed.starts_with('\'') && trimmed.ends_with('\'') && !trimmed.contains(" = ")) { + if (trimmed.starts_with("\"\"\"") && trimmed.ends_with("\"\"\"") && trimmed.len() > 6) + || (trimmed.starts_with("'''") && trimmed.ends_with("'''") && trimmed.len() > 6) + || (trimmed.starts_with('"') && trimmed.ends_with('"') && !trimmed.contains(" = ")) + || (trimmed.starts_with('\'') && trimmed.ends_with('\'') && !trimmed.contains(" = ")) + { return true; } - + // More sophisticated check: if the line contains quotes but no assignment/function call // it's likely a standalone string/docstring - if (trimmed.contains("\"\"\"") || trimmed.contains("'''")) && - !trimmed.contains('=') && - !trimmed.contains('(') { + if (trimmed.contains("\"\"\"") || trimmed.contains("'''")) + && !trimmed.contains('=') + && !trimmed.contains('(') + { return true; } - + false -} \ No newline at end of file +} diff --git a/src/pyspector/_rust_core/src/analysis/mod.rs b/src/pyspector/_rust_core/src/analysis/mod.rs index 6c8c995..060690e 100644 --- a/src/pyspector/_rust_core/src/analysis/mod.rs +++ b/src/pyspector/_rust_core/src/analysis/mod.rs @@ -20,11 +20,14 @@ pub struct AnalysisContext<'a> { } pub fn run_analysis(context: AnalysisContext) -> Vec { - println!("[*] Starting analysis with {} rules", context.ruleset.rules.len()); - + println!( + "[*] Starting analysis with {} rules", + context.ruleset.rules.len() + ); + let root_path = Path::new(&context.root_path); let mut files_to_scan: Vec = Vec::new(); - + // Add common test fixture patterns to exclusions let mut enhanced_exclusions = context.exclusions.clone(); enhanced_exclusions.extend(vec![ @@ -33,7 +36,7 @@ pub fn run_analysis(context: AnalysisContext) -> Vec { "*_test.py".to_string(), "*/test_*.py".to_string(), ]); - + for entry in WalkDir::new(root_path).into_iter().filter_map(|e| e.ok()) { let path = entry.path(); // Collect all files (not just .py) for regex scanning @@ -43,17 +46,17 @@ pub fn run_analysis(context: AnalysisContext) -> Vec { } } } - + println!("[+] Found {} files to scan", files_to_scan.len()); - + // Scan all files with regex patterns let mut issues: Vec = files_to_scan .par_iter() .flat_map(|file_path| { if let Ok(content) = fs::read_to_string(file_path) { config_analysis::scan_file(file_path, &content, &context.ruleset) - } else { - Vec::new() + } else { + Vec::new() } }) .collect(); @@ -61,33 +64,45 @@ pub fn run_analysis(context: AnalysisContext) -> Vec { println!("[+] Found {} issues from config analysis", issues.len()); // Process Python files with AST analysis - let python_issues: Vec = context.py_files + let python_issues: Vec = context + .py_files .par_iter() .flat_map(|py_file| { let mut findings = Vec::new(); - if is_excluded(Path::new(&py_file.file_path), &enhanced_exclusions) { - return findings; + if is_excluded(Path::new(&py_file.file_path), &enhanced_exclusions) { + return findings; } - + // Skip regex scan for Python files (already done above) - + if let Some(ast) = &py_file.ast { - let ast_findings = ast_analysis::scan_ast(ast, &py_file.file_path, &py_file.content, &context.ruleset); + let ast_findings = ast_analysis::scan_ast( + ast, + &py_file.file_path, + &py_file.content, + &context.ruleset, + ); findings.extend(ast_findings); } findings }) .collect(); - - println!("[+] {} issues from Python AST analysis", python_issues.len()); + + println!( + "[+] {} issues from Python AST analysis", + python_issues.len() + ); issues.extend(python_issues); // Build the call graph and run taint analysis let call_graph = call_graph_builder::build_call_graph(context.py_files); let taint_issues = taint_analysis::analyze_program_for_taint(&call_graph, &context.ruleset); - println!("[+] Found {} issues from taint analysis", taint_issues.len()); + println!( + "[+] Found {} issues from taint analysis", + taint_issues.len() + ); issues.extend(taint_issues); - + // Remove duplicates let mut seen = HashSet::new(); issues.retain(|issue| seen.insert(issue.get_fingerprint())); @@ -98,13 +113,16 @@ pub fn run_analysis(context: AnalysisContext) -> Vec { fn is_excluded(path: &Path, exclusions: &[String]) -> bool { let path_str = path.to_str().unwrap_or_default(); - let path_filename = path.file_name().and_then(|s| s.to_str()).unwrap_or_default(); - + let path_filename = path + .file_name() + .and_then(|s| s.to_str()) + .unwrap_or_default(); + exclusions.iter().any(|ex| { // Handle glob patterns if ex.contains('*') { - wildmatch::WildMatch::new(ex).matches(path_str) || - wildmatch::WildMatch::new(ex).matches(path_filename) + wildmatch::WildMatch::new(ex).matches(path_str) + || wildmatch::WildMatch::new(ex).matches(path_filename) } else { // Handle simple substring matching path_str.contains(ex) || path_filename.contains(ex) diff --git a/src/pyspector/_rust_core/src/analysis/taint_analysis.rs b/src/pyspector/_rust_core/src/analysis/taint_analysis.rs index 0184f06..08282a8 100644 --- a/src/pyspector/_rust_core/src/analysis/taint_analysis.rs +++ b/src/pyspector/_rust_core/src/analysis/taint_analysis.rs @@ -9,8 +9,8 @@ use std::collections::{HashMap, HashSet, VecDeque}; /// Origin of a taint #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub enum TaintOrigin { - External, // From a known source (e.g. input(), request.get()) - Param(usize), // From a function parameter (index) + External, // From a known source (e.g. input(), request.get()) + Param(usize), // From a function parameter (index) } /// Per-block taint state: maps variable names to their taint origins @@ -51,44 +51,46 @@ impl TaintContext { // Main entry point for inter-procedural taint analysis pub fn analyze_program_for_taint(call_graph: &CallGraph, ruleset: &RuleSet) -> Vec { - println!("[*] Starting inter-procedural taint analysis with {} functions", call_graph.functions.len()); - + println!( + "[*] Starting inter-procedural taint analysis with {} functions", + call_graph.functions.len() + ); + let mut global_ctx = GlobalTaintContext { summaries: HashMap::new(), }; - + // Initialize summaries for all functions for func_id in call_graph.functions.keys() { - global_ctx.summaries.insert(func_id.clone(), FunctionSummary::default()); + global_ctx + .summaries + .insert(func_id.clone(), FunctionSummary::default()); } - + let mut all_issues = Vec::new(); let mut iterations = 0; - const MAX_GLOBAL_ITERATIONS: usize = 10; - + const MAX_GLOBAL_ITERATIONS: usize = 10; + loop { iterations += 1; println!("[*] Global fixed-point iteration {}", iterations); let mut summaries_changed = false; let mut current_pass_issues = Vec::new(); - + // Analyze each function for (func_id, func_node) in &call_graph.functions { let cfg = build_cfg(func_node); - + let file_path = func_id.split("::").next().unwrap_or(""); let default_content = String::new(); - let content = call_graph.file_contents.get(file_path).unwrap_or(&default_content); - - let (new_summary, issues) = analyze_function_taint( - &cfg, - func_node, - ruleset, - file_path, - content, - &global_ctx - ); - + let content = call_graph + .file_contents + .get(file_path) + .unwrap_or(&default_content); + + let (new_summary, issues) = + analyze_function_taint(&cfg, func_node, ruleset, file_path, content, &global_ctx); + if let Some(old_summary) = global_ctx.summaries.get(func_id) { if &new_summary != old_summary { println!("[*] Summary changed for {}", func_id); @@ -96,18 +98,21 @@ pub fn analyze_program_for_taint(call_graph: &CallGraph, ruleset: &RuleSet) -> V summaries_changed = true; } } - + // Collect issues from the latest pass // We clear the list at the start of each global iteration so we don't duplicate // But we accumulate across functions in the same pass current_pass_issues.extend(issues); } - + if !summaries_changed || iterations >= MAX_GLOBAL_ITERATIONS { if summaries_changed { println!("[!] Warning: Max global iterations reached without convergence"); } else { - println!("[+] Global convergence reached after {} iterations", iterations); + println!( + "[+] Global convergence reached after {} iterations", + iterations + ); } all_issues = current_pass_issues; break; @@ -138,70 +143,69 @@ fn analyze_function_taint( global_ctx: &GlobalTaintContext, ) -> (FunctionSummary, Vec) { let mut ctx = TaintContext::new(); - + // Extract parameters and initialize taint state let params = extract_function_params(func_node); let mut initial_state = TaintState::new(); - + for (idx, param_name) in params.iter().enumerate() { let mut origins = HashSet::new(); origins.insert(TaintOrigin::Param(idx)); initial_state.insert(param_name.clone(), origins); } - + // Initialize blocks for block_id in cfg.blocks.keys() { ctx.entry_states.insert(*block_id, TaintState::new()); ctx.exit_states.insert(*block_id, TaintState::new()); } - + // Set entry block state ctx.entry_states.insert(cfg.entry, initial_state); - + // Worklist algorithm let mut worklist: VecDeque = VecDeque::new(); worklist.push_back(cfg.entry); let mut in_worklist: HashSet = HashSet::new(); in_worklist.insert(cfg.entry); - + let mut iterations = 0; while let Some(block_id) = worklist.pop_front() { in_worklist.remove(&block_id); iterations += 1; - if iterations > 1000 { break; } - + if iterations > 1000 { + break; + } + let block = match cfg.blocks.get(&block_id) { Some(b) => b, None => continue, }; - + // Compute entry state let mut entry_state = if block_id == cfg.entry { - ctx.entry_states.get(&cfg.entry).cloned().unwrap_or_default() + ctx.entry_states + .get(&cfg.entry) + .cloned() + .unwrap_or_default() } else { TaintState::new() }; - + if block_id != cfg.entry { - entry_state = compute_entry_state(block, &ctx.exit_states); + entry_state = compute_entry_state(block, &ctx.exit_states); } else { // Merge back-edges for entry block let back_edge_state = compute_entry_state(block, &ctx.exit_states); merge_states(&mut entry_state, &back_edge_state); } - + ctx.entry_states.insert(block_id, entry_state.clone()); - + // Transfer function - let (exit_state, _) = transfer_function( - block, - entry_state, - ruleset, - file_path, - content, - global_ctx - ); - + let (exit_state, _) = + transfer_function(block, entry_state, ruleset, file_path, content, global_ctx); + // Check change let prev_exit = ctx.exit_states.get(&block_id).cloned().unwrap_or_default(); if exit_state != prev_exit { @@ -214,36 +218,34 @@ fn analyze_function_taint( } } } - + // Collect issues and compute summary from final state let mut issues = Vec::new(); let mut summary = FunctionSummary::default(); - + for block in cfg.blocks.values() { // Re-run transfer to get issues let entry_state = ctx.entry_states.get(&block.id).cloned().unwrap_or_default(); - let (exit_state, block_issues) = transfer_function( - block, - entry_state, - ruleset, - file_path, - content, - global_ctx - ); + let (exit_state, block_issues) = + transfer_function(block, entry_state, ruleset, file_path, content, global_ctx); issues.extend(block_issues); - + // Check Return statements for summary for stmt in &block.statements { if stmt.node_type == "Return" { if let Some(value) = stmt.children.get("value").and_then(|v| v.get(0)) { // Check if return value is a direct source call if value.node_type == "Call" { - let call_name = get_full_call_name(value); - if ruleset.taint_sources.iter().any(|s| call_name.contains(&s.function_call)) { - summary.returns_external_taint = true; - } + let call_name = get_full_call_name(value); + if ruleset + .taint_sources + .iter() + .any(|s| call_name.contains(&s.function_call)) + { + summary.returns_external_taint = true; + } } - + // Check taint of returned variables let names = extract_all_names(value); for name in names { @@ -251,7 +253,9 @@ fn analyze_function_taint( for origin in origins { match origin { TaintOrigin::External => summary.returns_external_taint = true, - TaintOrigin::Param(idx) => { summary.param_flows_to_return.insert(*idx); } + TaintOrigin::Param(idx) => { + summary.param_flows_to_return.insert(*idx); + } } } } @@ -260,7 +264,7 @@ fn analyze_function_taint( } } } - + (summary, issues) } @@ -269,19 +273,20 @@ fn compute_entry_state( exit_states: &HashMap, ) -> TaintState { let mut entry_state = TaintState::new(); - + for pred_id in &block.predecessors { if let Some(pred_exit) = exit_states.get(pred_id) { merge_states(&mut entry_state, pred_exit); } } - + entry_state } fn merge_states(target: &mut TaintState, source: &TaintState) { for (var, origins) in source { - target.entry(var.clone()) + target + .entry(var.clone()) .or_insert_with(HashSet::new) .extend(origins.iter().cloned()); } @@ -296,28 +301,31 @@ fn transfer_function( global_ctx: &GlobalTaintContext, ) -> (TaintState, Vec) { let mut issues = Vec::new(); - + for stmt in &block.statements { match stmt.node_type.as_str() { "Assign" => { if let Some(value_node) = stmt.children.get("value").and_then(|v| v.get(0)) { - let targets: Vec = stmt.children.get("targets") + let targets: Vec = stmt + .children + .get("targets") .map(|targets| { - targets.iter() + targets + .iter() .filter_map(|t| get_name_from_node(t)) .collect() }) .unwrap_or_default(); - + if value_node.node_type == "Call" { let call_name = get_full_call_name(value_node); - + // 1. Check for Taint Source let is_source = ruleset.taint_sources.iter().any(|source| { - call_name.contains(&source.function_call) || - source.function_call.contains(&call_name) + call_name.contains(&source.function_call) + || source.function_call.contains(&call_name) }); - + if is_source { for target in &targets { let mut origins = HashSet::new(); @@ -327,29 +335,31 @@ fn transfer_function( } else { // 2. Check for Sanitizer let is_sanitizer = ruleset.taint_sanitizers.iter().any(|san| { - call_name.contains(&san.function_call) || - san.function_call.contains(&call_name) + call_name.contains(&san.function_call) + || san.function_call.contains(&call_name) }); - + if is_sanitizer { for target in &targets { state.remove(target); } } else { // 3. Check for Inter-procedural Taint (Summaries) - + let mut new_origins = HashSet::new(); - + // Find matching summary - let summary = global_ctx.summaries.iter() + let summary = global_ctx + .summaries + .iter() .find(|(k, _)| k.ends_with(&format!("::{}", call_name))) .map(|(_, v)| v); - + if let Some(summary) = summary { if summary.returns_external_taint { new_origins.insert(TaintOrigin::External); } - + // Check flow from arguments if let Some(args) = value_node.children.get("args") { for ¶m_idx in &summary.param_flows_to_return { @@ -379,7 +389,7 @@ fn transfer_function( } } } - + if !new_origins.is_empty() { for target in &targets { state.insert(target.clone(), new_origins.clone()); @@ -396,7 +406,7 @@ fn transfer_function( new_origins.extend(origins.iter().cloned()); } } - + if !new_origins.is_empty() { for target in &targets { state.insert(target.clone(), new_origins.clone()); @@ -408,8 +418,15 @@ fn transfer_function( "Expr" => { if let Some(value) = stmt.children.get("value").and_then(|v| v.get(0)) { if value.node_type == "Call" { - check_sink_and_report(value, &state, ruleset, file_path, content, &mut issues); - + check_sink_and_report( + value, + &state, + ruleset, + file_path, + content, + &mut issues, + ); + // Sanitizer as standalone statement } } @@ -418,12 +435,19 @@ fn transfer_function( let mut call_sites = Vec::new(); find_call_sites(stmt, &mut call_sites); for call_node in call_sites { - check_sink_and_report(call_node, &state, ruleset, file_path, content, &mut issues); + check_sink_and_report( + call_node, + &state, + ruleset, + file_path, + content, + &mut issues, + ); } } } } - + (state, issues) } @@ -436,20 +460,30 @@ fn check_sink_and_report( issues: &mut Vec, ) { let call_name = get_full_call_name(call_node); - + for sink in &ruleset.taint_sinks { if call_name.contains(&sink.function_call) || sink.function_call.contains(&call_name) { if let Some(args) = call_node.children.get("args") { if args.len() > sink.vulnerable_parameter_index { let arg = &args[sink.vulnerable_parameter_index]; let arg_names = extract_all_names(arg); - + for name in arg_names { if let Some(_origins) = state.get(&name) { // We found a tainted variable flowing to a sink - - println!("[!] VULNERABILITY: Tainted variable '{}' flows to sink '{}'", name, call_name); - report_issue(ruleset, &sink.vulnerability_id, file_path, call_node, content, issues); + + println!( + "[!] VULNERABILITY: Tainted variable '{}' flows to sink '{}'", + name, call_name + ); + report_issue( + ruleset, + &sink.vulnerability_id, + file_path, + call_node, + content, + issues, + ); break; // Report once per sink call } } @@ -476,7 +510,12 @@ fn extract_function_params(func_node: &AstNode) -> Vec { if let Some(args_node) = func_node.children.get("args").and_then(|v| v.get(0)) { if let Some(args_list) = args_node.children.get("args") { for arg in args_list { - if let Some(name) = arg.fields.get("arg").and_then(|v| v.as_ref()).and_then(|v| v.as_str()) { + if let Some(name) = arg + .fields + .get("arg") + .and_then(|v| v.as_ref()) + .and_then(|v| v.as_str()) + { params.push(name.to_string()); } } @@ -501,21 +540,29 @@ fn extract_all_names(node: &AstNode) -> Vec { // --- Helper functions --- fn find_call_sites<'a>(node: &'a AstNode, sites: &mut Vec<&'a AstNode>) { - if node.node_type == "Call" { - sites.push(node); + if node.node_type == "Call" { + sites.push(node); } - for child_list in node.children.values() { - for child in child_list { - find_call_sites(child, sites); - } + for child_list in node.children.values() { + for child in child_list { + find_call_sites(child, sites); + } } } fn get_name_from_node(node: &AstNode) -> Option { match node.node_type.as_str() { - "Name" => node.fields.get("id").and_then(|v| v.as_ref()).and_then(|v| v.as_str().map(String::from)), - "Attribute" => node.fields.get("attr").and_then(|v| v.as_ref()).and_then(|v| v.as_str().map(String::from)), - _ => None + "Name" => node + .fields + .get("id") + .and_then(|v| v.as_ref()) + .and_then(|v| v.as_str().map(String::from)), + "Attribute" => node + .fields + .get("attr") + .and_then(|v| v.as_ref()) + .and_then(|v| v.as_str().map(String::from)), + _ => None, } } @@ -527,15 +574,22 @@ fn get_full_call_name(call_node: &AstNode) -> String { let mut parts = Vec::new(); let mut current = func; while current.node_type == "Attribute" { - if let Some(attr) = current.fields.get("attr").and_then(|v| v.as_ref()).and_then(|v| v.as_str()) { - parts.push(attr.to_string()); + if let Some(attr) = current + .fields + .get("attr") + .and_then(|v| v.as_ref()) + .and_then(|v| v.as_str()) + { + parts.push(attr.to_string()); + } + if let Some(next_node) = current.children.get("value").and_then(|v| v.get(0)) { + current = next_node; + } else { + break; } - if let Some(next_node) = current.children.get("value").and_then(|v| v.get(0)) { - current = next_node; - } else { break; } } - if let Some(base) = get_name_from_node(current) { - parts.push(base); + if let Some(base) = get_name_from_node(current) { + parts.push(base); } parts.reverse(); return parts.join("."); @@ -546,9 +600,20 @@ fn get_full_call_name(call_node: &AstNode) -> String { String::new() } -fn report_issue(ruleset: &RuleSet, vuln_id: &str, file_path: &str, stmt: &AstNode, content: &str, issues: &mut Vec) { +fn report_issue( + ruleset: &RuleSet, + vuln_id: &str, + file_path: &str, + stmt: &AstNode, + content: &str, + issues: &mut Vec, +) { if let Some(vuln_rule) = ruleset.rules.iter().find(|r| r.id == vuln_id) { - let line_content = content.lines().nth(stmt.lineno.saturating_sub(1) as usize).unwrap_or("").to_string(); + let line_content = content + .lines() + .nth(stmt.lineno.saturating_sub(1) as usize) + .unwrap_or("") + .to_string(); issues.push(Issue::new( vuln_rule.id.clone(), vuln_rule.description.clone(), @@ -560,4 +625,4 @@ fn report_issue(ruleset: &RuleSet, vuln_id: &str, file_path: &str, stmt: &AstNod vuln_rule.remediation.clone(), )); } -} \ No newline at end of file +} diff --git a/src/pyspector/_rust_core/src/ast_parser.rs b/src/pyspector/_rust_core/src/ast_parser.rs index 825c75f..20a801a 100644 --- a/src/pyspector/_rust_core/src/ast_parser.rs +++ b/src/pyspector/_rust_core/src/ast_parser.rs @@ -28,4 +28,4 @@ impl PythonFile { ast, } } -} \ No newline at end of file +} diff --git a/src/pyspector/_rust_core/src/graph/call_graph_builder.rs b/src/pyspector/_rust_core/src/graph/call_graph_builder.rs index 312be4c..4130fee 100644 --- a/src/pyspector/_rust_core/src/graph/call_graph_builder.rs +++ b/src/pyspector/_rust_core/src/graph/call_graph_builder.rs @@ -14,18 +14,18 @@ pub struct CallGraph<'a> { // Builds a call graph from all parsed Python files. pub fn build_call_graph(py_files: &[PythonFile]) -> CallGraph { println!("[*] Building call graph from {} files", py_files.len()); - + let mut call_graph = CallGraph::default(); let mut all_funcs = HashMap::new(); // First pass: find all function definitions and store their content. for file in py_files { println!("[*] Processing file: {}", file.file_path); - + if let Some(ast) = &file.ast { let mut funcs_in_file = Vec::new(); find_functions(ast, &mut funcs_in_file); - + for func_node in funcs_in_file { if let Some(func_name) = get_name_from_node(func_node) { let func_id = format!("{}::{}", file.file_path, func_name); @@ -34,9 +34,11 @@ pub fn build_call_graph(py_files: &[PythonFile]) -> CallGraph { } } } - call_graph.file_contents.insert(file.file_path.clone(), file.content.clone()); + call_graph + .file_contents + .insert(file.file_path.clone(), file.content.clone()); } - + call_graph.functions = all_funcs; println!("[+] Found {} total functions", call_graph.functions.len()); @@ -45,7 +47,7 @@ pub fn build_call_graph(py_files: &[PythonFile]) -> CallGraph { let mut calls = HashSet::new(); let mut call_sites = Vec::new(); find_call_sites(func_node, &mut call_sites); - + for call_node in call_sites { let callee_name = get_full_call_name(call_node); for (potential_target_id, _) in &call_graph.functions { @@ -87,7 +89,8 @@ fn find_call_sites<'a>(node: &'a AstNode, sites: &mut Vec<&'a AstNode>) { fn get_name_from_node(node: &AstNode) -> Option { // For FunctionDef/AsyncFunctionDef nodes, the function name is in 'name' field // For Name nodes, the identifier is in 'id' field - node.fields.get("name") + node.fields + .get("name") .or_else(|| node.fields.get("id")) .and_then(|v| v.as_ref()) .and_then(|v| v.as_str().map(String::from)) @@ -101,12 +104,19 @@ fn get_full_call_name(call_node: &AstNode) -> String { let mut parts = Vec::new(); let mut current = func; while current.node_type == "Attribute" { - if let Some(attr) = current.fields.get("attr").and_then(|v| v.as_ref()).and_then(|v| v.as_str()) { + if let Some(attr) = current + .fields + .get("attr") + .and_then(|v| v.as_ref()) + .and_then(|v| v.as_str()) + { parts.push(attr.to_string()); } if let Some(next_node) = current.children.get("value").and_then(|v| v.get(0)) { current = next_node; - } else { break; } + } else { + break; + } } if let Some(base) = get_name_from_node(current) { parts.push(base); @@ -116,4 +126,4 @@ fn get_full_call_name(call_node: &AstNode) -> String { } } String::new() -} \ No newline at end of file +} diff --git a/src/pyspector/_rust_core/src/graph/cfg_builder.rs b/src/pyspector/_rust_core/src/graph/cfg_builder.rs index 9b62122..a9be8cc 100644 --- a/src/pyspector/_rust_core/src/graph/cfg_builder.rs +++ b/src/pyspector/_rust_core/src/graph/cfg_builder.rs @@ -26,32 +26,43 @@ fn build_from_statements( // Create blocks for the two branches and the merge point after the if/else let if_body_block_id = cfg.add_block().id; let merge_block_id = cfg.add_block().id; - + // The 'else' block is optional - let else_body_block_id = if stmt.children.get("orelse").map_or(false, |v| !v.is_empty()) { - cfg.add_block().id - } else { - merge_block_id // If no else, the false branch goes straight to merge - }; + let else_body_block_id = + if stmt.children.get("orelse").map_or(false, |v| !v.is_empty()) { + cfg.add_block().id + } else { + merge_block_id // If no else, the false branch goes straight to merge + }; // Add edges from the current block to the branches - cfg.add_edge(current_block_id, if_body_block_id, EdgeType::Conditional(true)); - cfg.add_edge(current_block_id, else_body_block_id, EdgeType::Conditional(false)); + cfg.add_edge( + current_block_id, + if_body_block_id, + EdgeType::Conditional(true), + ); + cfg.add_edge( + current_block_id, + else_body_block_id, + EdgeType::Conditional(false), + ); // Recursively build the CFG for the 'if' body if let Some(if_body) = stmt.children.get("body") { - let final_if_block = build_from_statements(cfg, if_body, if_body_block_id, loop_exits); + let final_if_block = + build_from_statements(cfg, if_body, if_body_block_id, loop_exits); cfg.add_edge(final_if_block, merge_block_id, EdgeType::Unconditional); } // Recursively build the CFG for the 'else' body if let Some(orelse_body) = stmt.children.get("orelse") { if !orelse_body.is_empty() { - let final_else_block = build_from_statements(cfg, orelse_body, else_body_block_id, loop_exits); - cfg.add_edge(final_else_block, merge_block_id, EdgeType::Unconditional); + let final_else_block = + build_from_statements(cfg, orelse_body, else_body_block_id, loop_exits); + cfg.add_edge(final_else_block, merge_block_id, EdgeType::Unconditional); } } - + current_block_id = merge_block_id; } "For" | "While" => { @@ -60,19 +71,20 @@ fn build_from_statements( // Edge from current block into the loop cfg.add_edge(current_block_id, loop_body_id, EdgeType::Unconditional); - + // Add the exit point for any 'break' statements loop_exits.insert(after_loop_id); if let Some(loop_body) = stmt.children.get("body") { - let final_loop_block = build_from_statements(cfg, loop_body, loop_body_id, loop_exits); + let final_loop_block = + build_from_statements(cfg, loop_body, loop_body_id, loop_exits); // Edge from the end of the loop body back to the start cfg.add_edge(final_loop_block, loop_body_id, EdgeType::Unconditional); } loop_exits.remove(&after_loop_id); - + // Edge to exit the loop - cfg.add_edge(current_block_id, after_loop_id, EdgeType::Unconditional); - + cfg.add_edge(current_block_id, after_loop_id, EdgeType::Unconditional); + current_block_id = after_loop_id; } "Break" => { @@ -93,4 +105,4 @@ fn build_from_statements( } cfg.exits.insert(current_block_id); current_block_id -} \ No newline at end of file +} diff --git a/src/pyspector/_rust_core/src/graph/mod.rs b/src/pyspector/_rust_core/src/graph/mod.rs index cacfd77..6a60748 100644 --- a/src/pyspector/_rust_core/src/graph/mod.rs +++ b/src/pyspector/_rust_core/src/graph/mod.rs @@ -1,3 +1,3 @@ pub mod call_graph_builder; pub mod cfg_builder; -pub mod representation; \ No newline at end of file +pub mod representation; diff --git a/src/pyspector/_rust_core/src/graph/representation.rs b/src/pyspector/_rust_core/src/graph/representation.rs index b6c417b..24aa684 100644 --- a/src/pyspector/_rust_core/src/graph/representation.rs +++ b/src/pyspector/_rust_core/src/graph/representation.rs @@ -19,7 +19,12 @@ pub struct BasicBlock { impl BasicBlock { pub fn new(id: BlockId) -> Self { - Self { id, statements: Vec::new(), predecessors: HashSet::new(), successors: HashMap::new() } + Self { + id, + statements: Vec::new(), + predecessors: HashSet::new(), + successors: HashMap::new(), + } } } @@ -36,16 +41,20 @@ impl ControlFlowGraph { let entry_block = BasicBlock::new(0); let entry_id = entry_block.id; blocks.insert(entry_id, entry_block); - Self { blocks, entry: entry_id, exits: HashSet::new() } + Self { + blocks, + entry: entry_id, + exits: HashSet::new(), + } } - + pub fn add_block(&mut self) -> &mut BasicBlock { let new_id = self.blocks.len(); let new_block = BasicBlock::new(new_id); self.blocks.insert(new_id, new_block); self.blocks.get_mut(&new_id).unwrap() } - + pub fn add_edge(&mut self, from: BlockId, to: BlockId, edge_type: EdgeType) { if let Some(from_block) = self.blocks.get_mut(&from) { from_block.successors.insert(to, edge_type); @@ -62,4 +71,4 @@ pub struct CallGraph<'a> { pub graph: HashMap>, pub functions: HashMap, pub file_contents: HashMap, -} \ No newline at end of file +} diff --git a/src/pyspector/_rust_core/src/issues.rs b/src/pyspector/_rust_core/src/issues.rs index f35885e..ba0ef43 100644 --- a/src/pyspector/_rust_core/src/issues.rs +++ b/src/pyspector/_rust_core/src/issues.rs @@ -1,6 +1,6 @@ use pyo3::prelude::*; use serde::Deserialize; -use sha1::{Sha1, Digest}; +use sha1::{Digest, Sha1}; #[pyclass] #[derive(Debug, Clone, Deserialize, PartialEq, Eq, Hash)] @@ -62,13 +62,16 @@ impl Issue { pub fn get_fingerprint(&self) -> String { let unique_string = format!( "{}|{}|{}|{}", - self.rule_id, self.file_path, self.line_number, self.code.trim() + self.rule_id, + self.file_path, + self.line_number, + self.code.trim() ); let mut hasher = Sha1::new(); hasher.update(unique_string.as_bytes()); let result = hasher.finalize(); - + format!("{:x}", result) } -} \ No newline at end of file +} diff --git a/src/pyspector/_rust_core/src/lib.rs b/src/pyspector/_rust_core/src/lib.rs index 571ea52..561f7a0 100644 --- a/src/pyspector/_rust_core/src/lib.rs +++ b/src/pyspector/_rust_core/src/lib.rs @@ -1,21 +1,20 @@ use pyo3::prelude::*; use pyo3::types::{PyDict, PyList}; +mod analysis; mod ast_parser; mod graph; mod issues; mod rules; -mod analysis; mod supply_chain; - -use issues::{Issue, Severity}; -use rules::RuleSet; use analysis::{run_analysis, AnalysisContext}; use ast_parser::PythonFile; +use issues::{Issue, Severity}; +use rules::RuleSet; #[pymodule] -fn _rust_core(m: &Bound<'_, PyModule>) -> PyResult<()> { +fn _rust_core(m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_class::()?; m.add_class::()?; @@ -28,8 +27,9 @@ fn _rust_core(m: &Bound<'_, PyModule>) -> PyResult<()> { config: &Bound<'_, PyDict>, python_files_data: &Bound<'_, PyList>, ) -> PyResult { - - let exclusions: Vec = config.get_item("exclude")?.map_or(Ok(Vec::new()), |v| v.extract())?; + let exclusions: Vec = config + .get_item("exclude")? + .map_or(Ok(Vec::new()), |v| v.extract())?; let ruleset: RuleSet = toml::from_str(&rules_toml_str).map_err(|e| { pyo3::exceptions::PyValueError::new_err(format!("Failed to parse rules: {}", e)) @@ -58,21 +58,14 @@ fn _rust_core(m: &Bound<'_, PyModule>) -> PyResult<()> { for issue in issues { py_issues.append(Py::new(py, issue)?)?; } - + Ok(py_issues.to_object(py)) } - - #[pyfn(m)] #[pyo3(name = "scan_supply_chain")] - fn scan_supply_chain_py( - py: Python, - project_path: String, - ) -> PyResult { - let vulnerabilities = py.allow_threads(|| { - supply_chain::scan_dependencies(&project_path) - }); + fn scan_supply_chain_py(py: Python, project_path: String) -> PyResult { + let vulnerabilities = py.allow_threads(|| supply_chain::scan_dependencies(&project_path)); let py_list = PyList::empty(py); for vuln in vulnerabilities { @@ -91,4 +84,4 @@ fn _rust_core(m: &Bound<'_, PyModule>) -> PyResult<()> { } Ok(()) -} \ No newline at end of file +} diff --git a/src/pyspector/_rust_core/src/rules.rs b/src/pyspector/_rust_core/src/rules.rs index 3d47f12..aa3d482 100644 --- a/src/pyspector/_rust_core/src/rules.rs +++ b/src/pyspector/_rust_core/src/rules.rs @@ -1,6 +1,6 @@ -use serde::Deserialize; use crate::issues::Severity; use regex::Regex; +use serde::Deserialize; #[derive(Debug, Deserialize, Clone)] pub struct Rule { @@ -19,7 +19,9 @@ pub struct Rule { pub file_pattern: Option, } -fn default_confidence() -> String { "Medium".to_string() } +fn default_confidence() -> String { + "Medium".to_string() +} #[derive(Debug, Deserialize)] pub struct TaintSourceRule { @@ -55,4 +57,4 @@ pub struct RuleSet { pub taint_sinks: Vec, #[serde(default, rename = "taint_sanitizer")] pub taint_sanitizers: Vec, -} \ No newline at end of file +} diff --git a/src/pyspector/_rust_core/src/supply_chain.rs b/src/pyspector/_rust_core/src/supply_chain.rs index c112f35..64e8ae1 100644 --- a/src/pyspector/_rust_core/src/supply_chain.rs +++ b/src/pyspector/_rust_core/src/supply_chain.rs @@ -4,56 +4,56 @@ use std::path::Path; use rayon::prelude::*; use serde::{Deserialize, Serialize}; -use walkdir::WalkDir; use serde_json::Value; +use walkdir::WalkDir; // Public API pub fn scan_dependencies(project_path: &str) -> Vec { let files = find_dependency_files(project_path); - + // Collect all dependencies from all files let mut all_dependencies = Vec::new(); for file in &files { all_dependencies.extend(parse_dependency_file(file)); } - + // Deduplicate: same package@version might be in multiple files. // We want to query each unique package@version only once. // Use a HashMap to keep track of which files a dependency was seen in (optional, but good for reporting) // For now, simpler approach: just unique by name+version for querying - + let mut unique_deps_map: HashMap = HashMap::new(); - + for dep in &all_dependencies { let key = format!("{}@{}", dep.name, dep.version); if !unique_deps_map.contains_key(&key) { unique_deps_map.insert(key, dep.clone()); } else { - // If we wanted to track multiple files, we'd need a different struct. - // The spec says "Deduplication: Collect all dependencies... to ensure each unique package is only queried once" - // But the report output shows "File: requirements.txt". - // If a vuln is in multiple files, we should probably report it for each, or list all files. - // However, for efficiency, we query once. - // Let's stick to: Query unique deps, then map results back? - // Or better: Just query unique deps, and user sees one instance. - // If I have requests in req.txt and pyproject.toml, reporting it once is probably fine. - // But wait, the `VulnerabilityMatch` has a `file` field. - // If I deduplicate, I lose the file info for the duplicates. - // - // OPTION 1: Query unique, then map back to all occurrences? - // That means I need to keep `all_dependencies` and join with query results. - - // Let's do that: - // 1. Get all dependencies (with their file source) - // 2. Extract unique (name, version, ecosystem) tuples - // 3. Query OSV for those unique tuples - // 4. For each original dependency, look up the vulnerabilities for its (name, version) + // If we wanted to track multiple files, we'd need a different struct. + // The spec says "Deduplication: Collect all dependencies... to ensure each unique package is only queried once" + // But the report output shows "File: requirements.txt". + // If a vuln is in multiple files, we should probably report it for each, or list all files. + // However, for efficiency, we query once. + // Let's stick to: Query unique deps, then map results back? + // Or better: Just query unique deps, and user sees one instance. + // If I have requests in req.txt and pyproject.toml, reporting it once is probably fine. + // But wait, the `VulnerabilityMatch` has a `file` field. + // If I deduplicate, I lose the file info for the duplicates. + // + // OPTION 1: Query unique, then map back to all occurrences? + // That means I need to keep `all_dependencies` and join with query results. + + // Let's do that: + // 1. Get all dependencies (with their file source) + // 2. Extract unique (name, version, ecosystem) tuples + // 3. Query OSV for those unique tuples + // 4. For each original dependency, look up the vulnerabilities for its (name, version) } } // Prepare unique items for querying let unique_deps: Vec<&Dependency> = unique_deps_map.values().collect(); - + // Build client once let client = reqwest::blocking::Client::builder() .timeout(std::time::Duration::from_secs(10)) @@ -69,30 +69,31 @@ pub fn scan_dependencies(project_path: &str) -> Vec { (key, matches) }) .collect(); - + let mut vulns_map: HashMap> = HashMap::new(); for (key, matches) in vuln_results { vulns_map.insert(key, matches); } - + // now we need to reconstruct the full list of matches. // But wait, the `query_osv` returns `VulnerabilityMatch` which effectively clones the Dependency info // including the file path. // If I passed a Dependency from `unique_deps`, it has *one* of the file paths. // So if I just use the results from `unique_deps`, I miss the others. - + // Better strategy: // 1. `files` -> `all_dependencies` (Vec) // 2. `unique_keys` -> HashSet of "name@version@ecosystem" // 3. Query OSV for each unique key -> Map> // 4. Iterate `all_dependencies`, look up in Map, create `VulnerabilityMatch` - - let unique_keys: HashSet<(String, String, String)> = all_dependencies.iter() + + let unique_keys: HashSet<(String, String, String)> = all_dependencies + .iter() .map(|d| (d.name.clone(), d.version.clone(), d.ecosystem.clone())) .collect(); - + let unique_list: Vec<(String, String, String)> = unique_keys.into_iter().collect(); - + let query_results: Vec<((String, String, String), Vec)> = unique_list .par_iter() .map(|(name, version, ecosystem)| { @@ -100,31 +101,31 @@ pub fn scan_dependencies(project_path: &str) -> Vec { ((name.clone(), version.clone(), ecosystem.clone()), vulns) }) .collect(); - + let mut vuln_lookup: HashMap<(String, String, String), Vec> = HashMap::new(); for (key, vulns) in query_results { vuln_lookup.insert(key, vulns); } - + let mut final_matches = Vec::new(); - + for dep in all_dependencies { - let key = (dep.name.clone(), dep.version.clone(), dep.ecosystem.clone()); - if let Some(vulns) = vuln_lookup.get(&key) { - for v in vulns { - final_matches.push(VulnerabilityMatch { - dependency: dep.name.clone(), - version: dep.version.clone(), - vulnerability_id: v.id.clone(), - severity: resolve_severity(v), - summary: v.summary.clone().unwrap_or_default(), - file: dep.file.clone(), - fixed_version: extract_fixed_version(v), - }); - } - } + let key = (dep.name.clone(), dep.version.clone(), dep.ecosystem.clone()); + if let Some(vulns) = vuln_lookup.get(&key) { + for v in vulns { + final_matches.push(VulnerabilityMatch { + dependency: dep.name.clone(), + version: dep.version.clone(), + vulnerability_id: v.id.clone(), + severity: resolve_severity(v), + summary: v.summary.clone().unwrap_or_default(), + file: dep.file.clone(), + fixed_version: extract_fixed_version(v), + }); + } + } } - + final_matches } @@ -200,15 +201,16 @@ struct OsvEvent { fn find_dependency_files(root: &str) -> Vec { let mut files = Vec::new(); let walker = WalkDir::new(root).max_depth(5); - + let root_path = Path::new(root); for entry in walker.into_iter().filter_map(|e| e.ok()) { if entry.file_type().is_file() { if let Some(name) = entry.file_name().to_str() { - if name == "requirements.txt" || - name == "pyproject.toml" || - name == "Pipfile" || - name == "Cargo.toml" { + if name == "requirements.txt" + || name == "pyproject.toml" + || name == "Pipfile" + || name == "Cargo.toml" + { let rel = entry.path().strip_prefix(root_path).unwrap_or(entry.path()); if let Some(path) = rel.to_str() { files.push(path.to_string()); @@ -248,23 +250,23 @@ fn parse_requirements_txt(filepath: &str) -> Vec { if line.is_empty() || line.starts_with('#') { continue; } - + // Strip inline comments let part = line.split(&['#', ';'][..]).next().unwrap_or("").trim(); if part.is_empty() { - continue; + continue; } // simplistic parsing for package==version, package>=version etc // Split by operators let operators = ["==", ">=", "<=", "~=", ">", "<"]; let mut found = false; - + for op in &operators { if let Some(idx) = part.find(op) { let name = part[..idx].trim().to_lowercase(); let version = part[idx + op.len()..].trim(); - + if !name.is_empty() && !version.is_empty() { deps.push(Dependency { name, @@ -277,7 +279,7 @@ fn parse_requirements_txt(filepath: &str) -> Vec { } } } - + if !found { // maybe it's just package (no version)? OSV needs version. // Ignore unpinned dependencies? The spec implies pinned versions. @@ -291,22 +293,24 @@ fn parse_pyproject_toml(filepath: &str) -> Vec { Ok(c) => c, Err(_) => return vec![], }; - + let toml_val: Value = match toml::from_str(&content) { Ok(v) => v, Err(_) => return vec![], }; - + let mut deps = Vec::new(); - + // Poetry: [tool.poetry.dependencies] if let Some(tool) = toml_val.get("tool") { if let Some(poetry) = tool.get("poetry") { if let Some(dependencies) = poetry.get("dependencies") { if let Some(table) = dependencies.as_object() { for (k, v) in table { - if k == "python" { continue; } - + if k == "python" { + continue; + } + let version_str = if let Some(s) = v.as_str() { s.to_string() } else if let Some(v_table) = v.as_object() { @@ -320,8 +324,11 @@ fn parse_pyproject_toml(filepath: &str) -> Vec { continue; }; - let clean_version = version_str.trim_start_matches('^').trim_start_matches('~').to_string(); - deps.push(Dependency { + let clean_version = version_str + .trim_start_matches('^') + .trim_start_matches('~') + .to_string(); + deps.push(Dependency { name: k.to_string(), version: clean_version, file: filepath.to_string(), @@ -332,19 +339,19 @@ fn parse_pyproject_toml(filepath: &str) -> Vec { } } } - + // PEP 621: [project] dependencies = [] if let Some(project) = toml_val.get("project") { if let Some(dependencies) = project.get("dependencies").and_then(|d| d.as_array()) { for dep_val in dependencies { if let Some(dep_str) = dep_val.as_str() { - // Reuse logic similar to req.txt parsing - let operators = ["==", ">=", "<=", "~=", ">", "<"]; - for op in &operators { + // Reuse logic similar to req.txt parsing + let operators = ["==", ">=", "<=", "~=", ">", "<"]; + for op in &operators { if let Some(idx) = dep_str.find(op) { let name = dep_str[..idx].trim().to_lowercase(); let version = dep_str[idx + op.len()..].trim(); - + if !name.is_empty() && !version.is_empty() { deps.push(Dependency { name, @@ -355,12 +362,12 @@ fn parse_pyproject_toml(filepath: &str) -> Vec { break; } } - } + } } } } } - + deps } @@ -374,21 +381,23 @@ fn parse_pipfile(filepath: &str) -> Vec { Ok(v) => v, Err(_) => return vec![], }; - + let mut deps = Vec::new(); - + if let Some(packages) = toml_val.get("packages").and_then(|p| p.as_object()) { for (k, v) in packages { let version_str = match v.as_str() { Some(s) => s, None => continue, }; - - if version_str == "*" { continue; } - + + if version_str == "*" { + continue; + } + let clean_version = version_str.trim_start_matches("=="); - - deps.push(Dependency { + + deps.push(Dependency { name: k.to_string(), version: clean_version.to_string(), file: filepath.to_string(), @@ -396,7 +405,7 @@ fn parse_pipfile(filepath: &str) -> Vec { }); } } - + deps } @@ -410,30 +419,33 @@ fn parse_cargo_toml(filepath: &str) -> Vec { Ok(v) => v, Err(_) => return vec![], }; - + let mut deps = Vec::new(); - + if let Some(dependencies) = toml_val.get("dependencies").and_then(|d| d.as_object()) { for (k, v) in dependencies { - let version_opt = if let Some(s) = v.as_str() { - Some(s.to_string()) - } else if let Some(table) = v.as_object() { - table.get("version").and_then(|x| x.as_str()).map(|s| s.to_string()) - } else { - None - }; - - if let Some(ver) = version_opt { - deps.push(Dependency { + let version_opt = if let Some(s) = v.as_str() { + Some(s.to_string()) + } else if let Some(table) = v.as_object() { + table + .get("version") + .and_then(|x| x.as_str()) + .map(|s| s.to_string()) + } else { + None + }; + + if let Some(ver) = version_opt { + deps.push(Dependency { name: k.to_string(), version: ver, file: filepath.to_string(), ecosystem: "crates.io".to_string(), }); - } + } } } - + deps } @@ -442,21 +454,27 @@ fn parse_cargo_toml(filepath: &str) -> Vec { // Kept this for reference, but we use raw_query_osv fn query_osv(client: &reqwest::blocking::Client, dep: &Dependency) -> Vec { let vulns = raw_query_osv(client, &dep.name, &dep.version, &dep.ecosystem); - - vulns.into_iter().map(|v| { - VulnerabilityMatch { - dependency: dep.name.clone(), - version: dep.version.clone(), - vulnerability_id: v.id.clone(), - severity: resolve_severity(&v), - summary: v.summary.clone().unwrap_or_default(), - file: dep.file.clone(), - fixed_version: extract_fixed_version(&v), - } - }).collect() + + vulns + .into_iter() + .map(|v| VulnerabilityMatch { + dependency: dep.name.clone(), + version: dep.version.clone(), + vulnerability_id: v.id.clone(), + severity: resolve_severity(&v), + summary: v.summary.clone().unwrap_or_default(), + file: dep.file.clone(), + fixed_version: extract_fixed_version(&v), + }) + .collect() } -fn raw_query_osv(client: &reqwest::blocking::Client, name: &str, version: &str, ecosystem: &str) -> Vec { +fn raw_query_osv( + client: &reqwest::blocking::Client, + name: &str, + version: &str, + ecosystem: &str, +) -> Vec { let url = "https://api.osv.dev/v1/query"; let body = serde_json::json!({ "package": { @@ -469,14 +487,14 @@ fn raw_query_osv(client: &reqwest::blocking::Client, name: &str, version: &str, match client.post(url).json(&body).send() { Ok(resp) => { if resp.status().is_success() { - match resp.json::() { - Ok(osv_resp) => osv_resp.vulns, - Err(_) => vec![], - } + match resp.json::() { + Ok(osv_resp) => osv_resp.vulns, + Err(_) => vec![], + } } else { vec![] } - }, + } Err(_) => vec![], } } @@ -487,18 +505,24 @@ fn resolve_severity(vuln: &OsvVulnerability) -> String { // Tier 1: Database Specific if let Some(db_spec) = &vuln.database_specific { if let Some(sev) = &db_spec.severity { - return sev.to_uppercase(); + return sev.to_uppercase(); } } - + // Tier 2: CVSS // Find CVSS v3 score for sev_entry in &vuln.severity { if sev_entry.type_ == "CVSS_V3" { if let Some(base_score) = compute_cvss_base_score(&sev_entry.score) { - if base_score >= 9.0 { return "CRITICAL".to_string(); } - if base_score >= 7.0 { return "HIGH".to_string(); } - if base_score >= 4.0 { return "MEDIUM".to_string(); } + if base_score >= 9.0 { + return "CRITICAL".to_string(); + } + if base_score >= 7.0 { + return "HIGH".to_string(); + } + if base_score >= 4.0 { + return "MEDIUM".to_string(); + } return "LOW".to_string(); } } @@ -511,7 +535,7 @@ fn compute_cvss_base_score(vector: &str) -> Option { // Example: CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:U/C:H/I:H/A:H // Parse metrics let parts: Vec<&str> = vector.split('/').collect(); - + let mut av = 0.0; let mut ac = 0.0; let mut pr = 0.0; @@ -520,19 +544,19 @@ fn compute_cvss_base_score(vector: &str) -> Option { let mut c = 0.0; let mut i = 0.0; let mut a = 0.0; - + let mut scope_changed = false; // Default values if missing - strictly simplistic parser for part in parts { if part.starts_with("AV:") { - match &part[3..] { - "N" => av = 0.85, - "A" => av = 0.62, - "L" => av = 0.55, - "P" => av = 0.20, - _ => {} - } + match &part[3..] { + "N" => av = 0.85, + "A" => av = 0.62, + "L" => av = 0.55, + "P" => av = 0.20, + _ => {} + } } else if part.starts_with("AC:") { match &part[3..] { "L" => ac = 0.77, @@ -542,7 +566,7 @@ fn compute_cvss_base_score(vector: &str) -> Option { } else if part.starts_with("PR:") { // Logic handled later based on scope match &part[3..] { - "N" => pr = 0.85, + "N" => pr = 0.85, "L" => pr = 0.62, // or 0.68 "H" => pr = 0.27, // or 0.50 _ => {} @@ -574,7 +598,7 @@ fn compute_cvss_base_score(vector: &str) -> Option { _ => {} } } else if part.starts_with("A:") { - match &part[2..] { + match &part[2..] { "N" => a = 0.0, "L" => a = 0.22, "H" => a = 0.56, @@ -582,10 +606,10 @@ fn compute_cvss_base_score(vector: &str) -> Option { } } } - + // Adjust PR based on scope // We need to re-parse PR because its value depends on scope - // But since I stored the mapped values, I need to know which actual string was used. + // But since I stored the mapped values, I need to know which actual string was used. // Let's re-scan for PR string for part in vector.split('/') { if part.starts_with("PR:") { @@ -602,27 +626,29 @@ fn compute_cvss_base_score(vector: &str) -> Option { } let iss: f64 = 1.0 - ((1.0 - c) * (1.0 - i) * (1.0 - a)); - + let impact = if scope_changed { 7.52 * (iss - 0.029) - 3.25 * (iss - 0.02f64).powi(15) } else { 6.42 * iss }; - - if impact <= 0.0 { return Some(0.0); } - + + if impact <= 0.0 { + return Some(0.0); + } + let exploitability = 8.22 * av * ac * pr * ui; - + let mut base_score: f64 = if scope_changed { 1.08 * (impact + exploitability).min(10.0) } else { (impact + exploitability).min(10.0) }; - + // Round up to 1 decimal place // Simple roundup function: ceil(x * 10) / 10 base_score = (base_score * 10.0).ceil() / 10.0; - + Some(base_score) } diff --git a/src/pyspector/cli.py b/src/pyspector/cli.py index c8cc775..07aeca9 100644 --- a/src/pyspector/cli.py +++ b/src/pyspector/cli.py @@ -19,11 +19,14 @@ try: from pyspector._rust_core import run_scan except ImportError: - click.echo(click.style("Error: PySpector's core engine module not found.", fg="red")) + click.echo( + click.style("Error: PySpector's core engine module not found.", fg="red") + ) exit(1) import random + def get_startup_note(): """Fetches a tech joke or returns a fallback if offline.""" fallbacks = [ @@ -31,7 +34,7 @@ def get_startup_note(): "💡 There are 10 types of people: those who understand binary and those who don't.", "💡 A SQL query walks into a bar, walks up to two tables, and asks... 'Can I join you?'", "💡 Cybersecurity is the only industry where the 'bad guys' have a better R&D budget.", - "💡 Hardware: The parts of a computer system that can be kicked." + "💡 Hardware: The parts of a computer system that can be kicked.", ] try: # Programming category, safe mode on, single line only @@ -41,29 +44,31 @@ def get_startup_note(): if response.status_code == 200: return f"💡 {response.json()['joke']}" except Exception: - pass + pass return random.choice(fallbacks) + _list = list _tuple = tuple _ast_AST = ast.AST _ast_iter_fields = ast.iter_fields + # --- Helper function for AST serialization --- class AstEncoder(json.JSONEncoder): def default(self, node): if isinstance(node, _ast_AST): fields = { "node_type": node.__class__.__name__, - "lineno": getattr(node, 'lineno', -1), - "col_offset": getattr(node, 'col_offset', -1), + "lineno": getattr(node, "lineno", -1), + "col_offset": getattr(node, "col_offset", -1), } # Separate fields from children nodes for clarity in Rust child_nodes = {} simple_fields = {} for field, value in _ast_iter_fields(node): # Check if it's a list of AST nodes - if type(value).__name__ == 'list': + if type(value).__name__ == "list": if value and all(isinstance(n, _ast_AST) for n in value): child_nodes[field] = value else: @@ -73,7 +78,7 @@ def default(self, node): else: # Handle non-JSON serializable types if isinstance(value, bytes): - simple_fields[field] = value.decode('utf-8', errors='replace') + simple_fields[field] = value.decode("utf-8", errors="replace") elif isinstance(value, int) and value.bit_length() > 14000: simple_fields[field] = 0 elif isinstance(value, (int, float, str, bool)) or value is None: @@ -81,13 +86,13 @@ def default(self, node): else: # Convert other types to string representation simple_fields[field] = str(value) - + fields["children"] = child_nodes fields["fields"] = simple_fields return fields elif isinstance(node, bytes): - return node.decode('utf-8', errors='replace') - elif hasattr(node, '__dict__'): + return node.decode("utf-8", errors="replace") + elif hasattr(node, "__dict__"): # Handle other objects that might not be JSON serializable return str(node) return super().default(node) @@ -99,64 +104,78 @@ def should_skip_file(file_path: Path) -> bool: Excludes test fixtures and other files with intentionally malformed syntax. """ path_str = str(file_path) - + # Skip test fixture directories skip_patterns = [ - '/tests/fixtures/', - '/test/fixtures/', - '/testdata/', - '/_fixtures/', - '/fixtures/', + "/tests/fixtures/", + "/test/fixtures/", + "/testdata/", + "/_fixtures/", + "/fixtures/", ] - + for pattern in skip_patterns: - if pattern in path_str.replace('\\', '/'): + if pattern in path_str.replace("\\", "/"): return True - + # Skip common test file patterns filename = file_path.name - if filename.startswith('test_') or filename.endswith('_test.py'): + if filename.startswith("test_") or filename.endswith("_test.py"): # Only skip if in a tests directory - if '/tests/' in path_str.replace('\\', '/') or '/test/' in path_str.replace('\\', '/'): + if "/tests/" in path_str.replace("\\", "/") or "/test/" in path_str.replace( + "\\", "/" + ): return True - + return False def get_python_file_asts(path: Path) -> List[Dict[str, Any]]: """Recursively finds Python files and returns their content and AST.""" results = [] - files_to_scan = list(path.glob('**/*.py')) if path.is_dir() else [path] + files_to_scan = list(path.glob("**/*.py")) if path.is_dir() else [path] # Suppress Python's SyntaxWarning during AST parsing with warnings.catch_warnings(): - warnings.filterwarnings('ignore', category=SyntaxWarning) - + warnings.filterwarnings("ignore", category=SyntaxWarning) + for py_file in files_to_scan: if py_file.is_file(): # Skip test fixtures if should_skip_file(py_file): continue - + try: - content = py_file.read_text(encoding='utf-8') + content = py_file.read_text(encoding="utf-8") parsed_ast = ast.parse(content, filename=str(py_file)) ast_json = json.dumps(parsed_ast, cls=AstEncoder) - results.append({ - "file_path": str(py_file.relative_to(path)) if path.is_dir() else py_file.name, - "content": content, - "ast_json": ast_json - }) + results.append( + { + "file_path": ( + str(py_file.relative_to(path)) + if path.is_dir() + else py_file.name + ), + "content": content, + "ast_json": ast_json, + } + ) except SyntaxError as e: # Only warn about syntax errors in non-test files if not should_skip_file(py_file): - click.echo(click.style( - f"Warning: Could not parse {py_file.relative_to(path) if path.is_dir() else py_file.name}: {e.msg} ({py_file.name}, line {e.lineno})", - fg="yellow" - )) + click.echo( + click.style( + f"Warning: Could not parse {py_file.relative_to(path) if path.is_dir() else py_file.name}: {e.msg} ({py_file.name}, line {e.lineno})", + fg="yellow", + ) + ) except UnicodeDecodeError as e: - click.echo(click.style(f"Warning: Could not read {py_file}: {e}", fg="yellow")) - + click.echo( + click.style( + f"Warning: Could not read {py_file}: {e}", fg="yellow" + ) + ) + return results @@ -180,6 +199,7 @@ def _normalize_plugin_name_cli(raw_name: str) -> tuple[str, bool]: return normalised, normalised != stripped + def execute_plugins( findings: list, scan_path: Path, @@ -245,8 +265,10 @@ def execute_plugins( ) ) + # --- Main CLI Logic --- + @click.group() def cli(): """ @@ -273,13 +295,14 @@ def cli(): note = get_startup_note() click.echo(click.style(f"{note}\n", fg="bright_black", italic=True)) + def run_wizard(): click.echo("\n🧙 PySpector Scan Wizard\n") mode = click.prompt( "What do you want to scan?", type=click.Choice(["local", "repo"]), - default="local" + default="local", ) scan_path = None @@ -295,23 +318,22 @@ def run_wizard(): severity_level = click.prompt( "Minimum severity level", type=click.Choice(["LOW", "MEDIUM", "HIGH", "CRITICAL"]), - default="LOW" + default="LOW", ) report_format = click.prompt( "Report format", type=click.Choice(["console", "json", "sarif", "html"]), - default="console" + default="console", ) - supply_chain = click.confirm("Check dependencies for CVE vulnerabilities?", default=False) - + supply_chain = click.confirm( + "Check dependencies for CVE vulnerabilities?", default=False + ) output_file = None if report_format != "console": - output_file = Path( - click.prompt("Output file path", type=str) - ) + output_file = Path(click.prompt("Output file path", type=str)) click.echo("\n[*] Wizard completed. Starting scan...\n") @@ -326,34 +348,101 @@ def run_wizard(): } - - -@click.command(help="Scan a directory, file, or remote Git repository for vulnerabilities.") -@click.argument('path', type=click.Path(exists=True, file_okay=True, dir_okay=True, readable=True, path_type=Path), required=False) -@click.option('-u', '--url', 'repo_url', type=str, help="URL of a public GitHub/GitLab repository to clone and scan.") -@click.option('-c', '--config', 'config_path', type=click.Path(exists=True, path_type=Path), help="Path to a pyspector.toml config file.") -@click.option('-o', '--output', 'output_file', type=click.Path(path_type=Path), help="Path to write the report to.") -@click.option('-f', '--format', 'report_format', type=click.Choice(['console', 'json', 'sarif', 'html']), default='console', help="Format of the report.") -@click.option('-s', '--severity', 'severity_level', type=click.Choice(['LOW', 'MEDIUM', 'HIGH', 'CRITICAL']), default='LOW', help="Minimum severity level to report.") -@click.option('--ai', 'ai_scan', is_flag=True, default=False, help="Enable specialized scanning for AI/LLM vulnerabilities.") -@click.option('--plugin', 'plugins', multiple=True, help="Load and execute a plugin (can be specified multiple times)") -@click.option('--plugin-config', 'plugin_config_file', type=click.Path(exists=True, path_type=Path), help="Path to plugin configuration JSON file") -@click.option('--list-plugins', 'list_plugins', is_flag=True, help="List available plugins and exit") -@click.option('--supply-chain', 'supply_chain_scan', is_flag=True, default=False, help="Scan dependencies for known CVE vulnerabilities.") -@click.option('--wizard', is_flag=True, help="Interactive guided scan for first-time users") +@click.command( + help="Scan a directory, file, or remote Git repository for vulnerabilities." +) +@click.argument( + "path", + type=click.Path( + exists=True, file_okay=True, dir_okay=True, readable=True, path_type=Path + ), + required=False, +) +@click.option( + "-u", + "--url", + "repo_url", + type=str, + help="URL of a public GitHub/GitLab repository to clone and scan.", +) +@click.option( + "-c", + "--config", + "config_path", + type=click.Path(exists=True, path_type=Path), + help="Path to a pyspector.toml config file.", +) +@click.option( + "-o", + "--output", + "output_file", + type=click.Path(path_type=Path), + help="Path to write the report to.", +) +@click.option( + "-f", + "--format", + "report_format", + type=click.Choice(["console", "json", "sarif", "html"]), + default="console", + help="Format of the report.", +) +@click.option( + "-s", + "--severity", + "severity_level", + type=click.Choice(["LOW", "MEDIUM", "HIGH", "CRITICAL"]), + default="LOW", + help="Minimum severity level to report.", +) +@click.option( + "--ai", + "ai_scan", + is_flag=True, + default=False, + help="Enable specialized scanning for AI/LLM vulnerabilities.", +) +@click.option( + "--plugin", + "plugins", + multiple=True, + help="Load and execute a plugin (can be specified multiple times)", +) +@click.option( + "--plugin-config", + "plugin_config_file", + type=click.Path(exists=True, path_type=Path), + help="Path to plugin configuration JSON file", +) +@click.option( + "--list-plugins", + "list_plugins", + is_flag=True, + help="List available plugins and exit", +) +@click.option( + "--supply-chain", + "supply_chain_scan", + is_flag=True, + default=False, + help="Scan dependencies for known CVE vulnerabilities.", +) +@click.option( + "--wizard", is_flag=True, help="Interactive guided scan for first-time users" +) def run_scan_command( - path: Optional[Path], - repo_url: Optional[str], - config_path: Optional[Path], - output_file: Optional[Path], - report_format: str, - severity_level: str, + path: Optional[Path], + repo_url: Optional[str], + config_path: Optional[Path], + output_file: Optional[Path], + report_format: str, + severity_level: str, ai_scan: bool, plugins: tuple, plugin_config_file: Optional[Path], list_plugins: bool, supply_chain_scan: bool, - wizard: bool + wizard: bool, ): """The main scan command with plugin support.""" # --- Wizard Mode --- @@ -363,12 +452,14 @@ def run_scan_command( # Repo scan if params["repo_url"]: with tempfile.TemporaryDirectory() as temp_dir: - click.echo(f"[*] Cloning '{params['repo_url']}' into temporary directory...") + click.echo( + f"[*] Cloning '{params['repo_url']}' into temporary directory..." + ) subprocess.run( - ['git', 'clone', '--depth', '1', params["repo_url"], temp_dir], + ["git", "clone", "--depth", "1", params["repo_url"], temp_dir], check=True, capture_output=True, - text=True + text=True, ) _execute_scan( Path(temp_dir), @@ -379,7 +470,7 @@ def run_scan_command( params["ai_scan"], plugins=(), plugin_config={}, - supply_chain_scan=params["supply_chain_scan"] + supply_chain_scan=params["supply_chain_scan"], ) else: _execute_scan( @@ -392,7 +483,7 @@ def run_scan_command( params["ai_scan"], plugins=(), plugin_config={}, - supply_chain_scan=params["supply_chain_scan"] + supply_chain_scan=params["supply_chain_scan"], ) return @@ -401,7 +492,7 @@ def run_scan_command( plugin_manager = get_plugin_manager() available = plugin_manager.list_available_plugins() registered = plugin_manager.registry.list_plugins() - + click.echo("\n=== Available Plugins ===") if not available: click.echo("No plugins found") @@ -418,7 +509,7 @@ def run_scan_command( click.echo(f" - {plugin_name} (not registered)") click.echo() return - + if not path and not repo_url: raise click.UsageError("You must provide either a PATH or a --url to scan.") if path and repo_url: @@ -428,141 +519,214 @@ def run_scan_command( plugin_config = {} if plugin_config_file: try: - with open(plugin_config_file, 'r') as f: + with open(plugin_config_file, "r") as f: plugin_config = json.load(f) except (json.JSONDecodeError, IOError) as e: - click.echo(click.style(f"Warning: Could not load plugin config: {e}", fg="yellow")) + click.echo( + click.style(f"Warning: Could not load plugin config: {e}", fg="yellow") + ) if repo_url: # Handle Git URL cloning if not ("github.com" in repo_url or "gitlab.com" in repo_url): - raise click.BadParameter("URL must be a public GitHub or GitLab repository.") - + raise click.BadParameter( + "URL must be a public GitHub or GitLab repository." + ) + with tempfile.TemporaryDirectory() as temp_dir: click.echo(f"[*] Cloning '{repo_url}' into temporary directory...") try: subprocess.run( - ['git', 'clone', '--depth', '1', repo_url, temp_dir], + ["git", "clone", "--depth", "1", repo_url, temp_dir], check=True, capture_output=True, - text=True + text=True, ) scan_path = Path(temp_dir) scan_path = Path(temp_dir) - _execute_scan(scan_path, config_path, output_file, report_format, severity_level, ai_scan, plugins, plugin_config, supply_chain_scan) + _execute_scan( + scan_path, + config_path, + output_file, + report_format, + severity_level, + ai_scan, + plugins, + plugin_config, + supply_chain_scan, + ) except subprocess.CalledProcessError as e: - click.echo(click.style(f"Error: Failed to clone repository.\n{e.stderr}", fg="red")) + click.echo( + click.style( + f"Error: Failed to clone repository.\n{e.stderr}", fg="red" + ) + ) sys.exit(1) except FileNotFoundError: - click.echo(click.style("Error: 'git' command not found. Please ensure Git is installed and in your PATH.", fg="red")) + click.echo( + click.style( + "Error: 'git' command not found. Please ensure Git is installed and in your PATH.", + fg="red", + ) + ) sys.exit(1) else: # Handle local path scan scan_path = path scan_path = path - _execute_scan(scan_path, config_path, output_file, report_format, severity_level, ai_scan, plugins, plugin_config, supply_chain_scan) + _execute_scan( + scan_path, + config_path, + output_file, + report_format, + severity_level, + ai_scan, + plugins, + plugin_config, + supply_chain_scan, + ) return def _execute_scan( - scan_path: Path, - config_path: Optional[Path], - output_file: Optional[Path], - report_format: str, - severity_level: str, + scan_path: Path, + config_path: Optional[Path], + output_file: Optional[Path], + report_format: str, + severity_level: str, ai_scan: bool, plugins: tuple, plugin_config: dict, - supply_chain_scan: bool = False + supply_chain_scan: bool = False, ): """Helper function to run the actual scan and reporting.""" start_time = time.time() - + config = load_config(config_path) rules_toml_str = get_default_rules(ai_scan) click.echo(f"[*] Starting PySpector scan on '{scan_path}'...") - + # --- Load Baseline --- - baseline_path = scan_path / ".pyspector_baseline.json" if scan_path.is_dir() else scan_path.parent / ".pyspector_baseline.json" + baseline_path = ( + scan_path / ".pyspector_baseline.json" + if scan_path.is_dir() + else scan_path.parent / ".pyspector_baseline.json" + ) ignored_fingerprints = set() if baseline_path.exists(): try: - with baseline_path.open('r') as f: + with baseline_path.open("r") as f: baseline_data = json.load(f) - ignored_fingerprints = set(baseline_data.get("ignored_fingerprints", [])) - click.echo(f"[*] Loaded baseline from '{baseline_path}', ignoring {len(ignored_fingerprints)} known issues.") + ignored_fingerprints = set( + baseline_data.get("ignored_fingerprints", []) + ) + click.echo( + f"[*] Loaded baseline from '{baseline_path}', ignoring {len(ignored_fingerprints)} known issues." + ) except json.JSONDecodeError: - click.echo(click.style(f"Warning: Could not parse baseline file '{baseline_path}'.", fg="yellow")) - + click.echo( + click.style( + f"Warning: Could not parse baseline file '{baseline_path}'.", + fg="yellow", + ) + ) + # --- AST Generation for Python files --- python_files_data = get_python_file_asts(scan_path) click.echo(f"[*] Successfully parsed {len(python_files_data)} Python files") - + # --- Supply Chain Scanning --- if supply_chain_scan: try: from pyspector._rust_core import scan_supply_chain + click.echo("\n[*] Scanning dependencies for known vulnerabilities...") dep_vulns = scan_supply_chain(str(scan_path.resolve())) - + if dep_vulns: click.echo(f"\n{'='*60}") click.echo(f" SUPPLY CHAIN VULNERABILITIES ({len(dep_vulns)} found)") click.echo(f"{'='*60}") - + for vuln in dep_vulns: sev_color = { - 'CRITICAL': 'bright_red', - 'HIGH': 'red', - 'MEDIUM': 'yellow', - 'LOW': 'blue', - 'UNKNOWN': 'white' - }.get(vuln['severity'], 'white') - - click.echo(f"\n[{click.style(vuln['severity'], fg=sev_color)}] " - f"{vuln['dependency']} @ {vuln['version']}") + "CRITICAL": "bright_red", + "HIGH": "red", + "MEDIUM": "yellow", + "LOW": "blue", + "UNKNOWN": "white", + }.get(vuln["severity"], "white") + + click.echo( + f"\n[{click.style(vuln['severity'], fg=sev_color)}] " + f"{vuln['dependency']} @ {vuln['version']}" + ) click.echo(f" Vulnerability: {vuln['vulnerability_id']}") click.echo(f" File: {vuln['file']}") click.echo(f" Summary: {vuln['summary'][:100]}...") - if vuln.get('fixed_version'): + if vuln.get("fixed_version"): click.echo(f" Fixed in: {vuln['fixed_version']}") click.echo() else: click.echo("[+] No known vulnerabilities found in dependencies") except ImportError: - click.echo(click.style("Error: Supply chain scanner not available. Reinstall PySpector.", fg="red")) + click.echo( + click.style( + "Error: Supply chain scanner not available. Reinstall PySpector.", + fg="red", + ) + ) except Exception as e: click.echo(click.style(f"Error during supply chain scan: {e}", fg="red")) # --- Run Scan --- try: - raw_issues = run_scan(str(scan_path.resolve()), rules_toml_str, config, python_files_data) + raw_issues = run_scan( + str(scan_path.resolve()), rules_toml_str, config, python_files_data + ) except ValueError as e: - click.echo(click.style(f"Configuration error: {e}\n" - "Invalid configuration detected. Please verify your settings and retry.",fg = "red")) + click.echo( + click.style( + f"Configuration error: {e}\n" + "Invalid configuration detected. Please verify your settings and retry.", + fg="red", + ) + ) return - + except RuntimeError as e: - click.echo(click.style(f"Runtime error during execution: {e}\n" - "The scan engine encountered an operational error. Please retry or open an Issue, if the problem persists.", - fg="red")) + click.echo( + click.style( + f"Runtime error during execution: {e}\n" + "The scan engine encountered an operational error. Please retry or open an Issue, if the problem persists.", + fg="red", + ) + ) return - + except Exception as e: - click.echo(click.style(f"A critical Exception was raised during the scan process: {e}", fg="red")) + click.echo( + click.style( + f"A critical Exception was raised during the scan process: {e}", + fg="red", + ) + ) return # --- Filter by Severity and Baseline --- - severity_map = {'LOW': 0, 'MEDIUM': 1, 'HIGH': 2, 'CRITICAL': 3} + severity_map = {"LOW": 0, "MEDIUM": 1, "HIGH": 2, "CRITICAL": 3} min_severity_val = severity_map[severity_level.upper()] final_issues = [ - issue for issue in raw_issues - if (severity_map[str(issue.severity).split('.')[-1].upper()] >= min_severity_val - and issue.get_fingerprint() not in ignored_fingerprints) + issue + for issue in raw_issues + if ( + severity_map[str(issue.severity).split(".")[-1].upper()] >= min_severity_val + and issue.get_fingerprint() not in ignored_fingerprints + ) ] - + # Convert issues to dictionaries for plugins findings_dict = [ { @@ -571,24 +735,25 @@ def _execute_scan( "file": issue.file_path, "line": issue.line_number, "code": issue.code, - "severity": str(issue.severity).split('.')[-1], + "severity": str(issue.severity).split(".")[-1], "remediation": issue.remediation, - } for issue in final_issues + } + for issue in final_issues ] - + if plugins: try: execute_plugins(findings_dict, scan_path, list(plugins), plugin_config) except click.ClickException as exc: click.echo(click.style(f"[!] Plugin error: {exc}", fg="red")) - + # --- Generate Report --- reporter = Reporter(final_issues, report_format) output = reporter.generate() - + if output_file: try: - output_file.write_text(output, encoding='utf-8') + output_file.write_text(output, encoding="utf-8") click.echo(f"\n[+] Report saved to '{output_file}'") except IOError as e: click.echo(click.style(f"Error writing to output file: {e}", fg="red")) @@ -596,29 +761,40 @@ def _execute_scan( click.echo(output) end_time = time.time() - click.echo(f"\n[*] Scan finished in {end_time - start_time:.2f} seconds. Found {len(final_issues)} issues.") + click.echo( + f"\n[*] Scan finished in {end_time - start_time:.2f} seconds. Found {len(final_issues)} issues." + ) if len(raw_issues) > len(final_issues): - click.echo(f"[*] Ignored {len(raw_issues) - len(final_issues)} issues based on severity level or baseline.") + click.echo( + f"[*] Ignored {len(raw_issues) - len(final_issues)} issues based on severity level or baseline." + ) sys.stdout.flush() sys.stderr.flush() return @click.command(help="Start the interactive TUI to review and baseline findings.") -@click.argument('report_file', type=click.Path(exists=True, readable=True, path_type=Path)) +@click.argument( + "report_file", type=click.Path(exists=True, readable=True, path_type=Path) +) def triage_command(report_file: Path): """The TUI command for baselining.""" - if not report_file.name.endswith('.json'): - click.echo(click.style("Error: Triage mode only supports JSON report files generated by PySpector.", fg="red")) + if not report_file.name.endswith(".json"): + click.echo( + click.style( + "Error: Triage mode only supports JSON report files generated by PySpector.", + fg="red", + ) + ) return try: - with report_file.open('r', encoding='utf-8') as f: + with report_file.open("r", encoding="utf-8") as f: issues_data = json.load(f) - + # Determine baseline path relative to the report file baseline_path = report_file.parent / ".pyspector_baseline.json" - + run_triage_tui(issues_data.get("issues", []), baseline_path) except (json.JSONDecodeError, IOError) as e: @@ -627,6 +803,7 @@ def triage_command(report_file: Path): # --- Plugin Management Commands --- + @click.group(help="Manage PySpector plugins") def plugin(): """Plugin management commands""" @@ -639,17 +816,17 @@ def list_plugins_command(): plugin_manager = get_plugin_manager() available = plugin_manager.list_available_plugins() registered = plugin_manager.registry.list_plugins() - - click.echo("\n" + "="*60) + + click.echo("\n" + "=" * 60) click.echo("PySpector Plugins") - click.echo("="*60) - + click.echo("=" * 60) + if not available: click.echo("\nNo plugins found in plugin directory") click.echo(f"Plugin directory: {plugin_manager.plugin_dir}") else: click.echo(f"\nFound {len(available)} plugin(s):\n") - + for plugin_name in available: info = next((p for p in registered if p["name"] == plugin_name), None) @@ -668,13 +845,13 @@ def list_plugins_command(): click.echo(f" Status: {click.style('not registered', fg='red')}") click.echo() - + click.echo(f"Plugin directory: {plugin_manager.plugin_dir}") - click.echo("="*60 + "\n") + click.echo("=" * 60 + "\n") @plugin.command(help="Trust a plugin") -@click.argument('plugin_name') +@click.argument("plugin_name") def trust(plugin_name: str): """Trust a plugin""" plugin_manager = get_plugin_manager() @@ -685,7 +862,7 @@ def trust(plugin_name: str): @plugin.command(help="Show plugin information") -@click.argument('plugin_name') +@click.argument("plugin_name") def info(plugin_name: str): """Show detailed plugin information""" plugin_manager = get_plugin_manager() @@ -694,29 +871,33 @@ def info(plugin_name: str): click.echo(f"[*] Normalised plugin name to '{plugin_name}'") plugin_path = plugin_manager.plugin_dir / f"{plugin_name}.py" - + if not plugin_path.exists(): click.echo(click.style(f"Plugin '{plugin_name}' not found", fg="red")) return - + info_data = plugin_manager.registry.get_plugin_info(plugin_name) - + click.echo(f"\n{'='*60}") click.echo(f"Plugin: {plugin_name}") - click.echo('='*60) - + click.echo("=" * 60) + if info_data: - trusted = click.style("Yes", fg="green") if info_data.get('trusted') else click.style("No", fg="red") + trusted = ( + click.style("Yes", fg="green") + if info_data.get("trusted") + else click.style("No", fg="red") + ) click.echo(f"Trusted: {trusted}") click.echo(f"Version: {info_data.get('version', 'unknown')}") click.echo(f"Author: {info_data.get('author', 'unknown')}") click.echo(f"Category: {info_data.get('category', 'general')}") click.echo(f"Path: {info_data.get('path', 'unknown')}") - + # Show checksum current_checksum = PluginSecurity.calculate_checksum(plugin_path) - stored_checksum = info_data.get('checksum', '') - + stored_checksum = info_data.get("checksum", "") + if current_checksum == stored_checksum: click.echo(f"Checksum: {click.style('valid', fg='green')}") else: @@ -724,14 +905,14 @@ def info(plugin_name: str): else: click.echo(click.style("Not registered", fg="yellow")) click.echo(f"Path: {plugin_path}") - + click.echo(f"\n{'='*60}\n") @plugin.command(help="Install a plugin from a file") -@click.argument('plugin_file', type=click.Path(exists=True, path_type=Path)) -@click.option('--name', help="Custom name for the plugin") -@click.option('--trust', is_flag=True, help="Automatically trust the plugin") +@click.argument("plugin_file", type=click.Path(exists=True, path_type=Path)) +@click.option("--name", help="Custom name for the plugin") +@click.option("--trust", is_flag=True, help="Automatically trust the plugin") def install(plugin_file: Path, name: str, trust: bool): """Install a plugin from a file""" plugin_manager = get_plugin_manager() @@ -755,7 +936,9 @@ def install(plugin_file: Path, name: str, trust: bool): return if trust: - if not plugin_manager.trust_plugin(plugin_name, plugin_file, overwrite=overwrite_allowed): + if not plugin_manager.trust_plugin( + plugin_name, plugin_file, overwrite=overwrite_allowed + ): return click.echo(click.style(f"[+] Plugin stored at {target_path}", fg="green")) else: @@ -771,8 +954,8 @@ def install(plugin_file: Path, name: str, trust: bool): @plugin.command(help="Remove a plugin") -@click.argument('plugin_name') -@click.option('--force', is_flag=True, help="Force removal without confirmation") +@click.argument("plugin_name") +@click.option("--force", is_flag=True, help="Force removal without confirmation") def remove(plugin_name: str, force: bool): """Remove a plugin""" plugin_manager = get_plugin_manager() @@ -781,25 +964,25 @@ def remove(plugin_name: str, force: bool): click.echo(f"[*] Normalised plugin name to '{plugin_name}'") plugin_path = plugin_manager.plugin_dir / f"{plugin_name}.py" - + if not plugin_path.exists(): click.echo(click.style(f"Plugin '{plugin_name}' not found", fg="red")) return - + if not force: if not click.confirm(f"Remove plugin '{plugin_name}'?"): return - + try: plugin_path.unlink() - + # Remove from registry if plugin_name in plugin_manager.registry.plugins: del plugin_manager.registry.plugins[plugin_name] plugin_manager.registry.save_registry() - + click.echo(click.style(f"[+] Plugin '{plugin_name}' removed", fg="green")) - + except Exception as e: click.echo(click.style(f"Error removing plugin: {e}", fg="red")) diff --git a/src/pyspector/config.py b/src/pyspector/config.py index fac1241..742ac66 100644 --- a/src/pyspector/config.py +++ b/src/pyspector/config.py @@ -1,16 +1,23 @@ from pathlib import Path -import toml # type: ignore -import click # type: ignore +import toml # type: ignore +import click # type: ignore + try: # Python 3.9+ import importlib.resources as pkg_resources except ImportError: # Fallback for older Python versions - import importlib_resources as pkg_resources # type: ignore + import importlib_resources as pkg_resources # type: ignore DEFAULT_CONFIG = { "exclude": [ - ".venv", "venv", ".git", "__pycache__", "build", "dist", "*.egg-info", + ".venv", + "venv", + ".git", + "__pycache__", + "build", + "dist", + "*.egg-info", # Add test fixture exclusions "*/tests/fixtures/*", "*/test/fixtures/*", @@ -23,28 +30,45 @@ "severity": "LOW", } + def load_config(config_path: Path) -> dict: """Loads configuration from a TOML file or returns defaults.""" if config_path and config_path.exists(): try: - with config_path.open('r') as f: - user_config = toml.load(f).get('tool', {}).get('pyspector', {}) + with config_path.open("r") as f: + user_config = toml.load(f).get("tool", {}).get("pyspector", {}) config = DEFAULT_CONFIG.copy() config.update(user_config) return config except Exception as e: - click.echo(click.style(f"Warning: Could not parse config file '{config_path}'. Using defaults. Error: {e}", fg="yellow")) + click.echo( + click.style( + f"Warning: Could not parse config file '{config_path}'. Using defaults. Error: {e}", + fg="yellow", + ) + ) return DEFAULT_CONFIG + def get_default_rules(ai_scan: bool = False) -> str: """Loads the built-in TOML rules file from package resources.""" try: - base_rules = pkg_resources.files('pyspector.rules').joinpath('built-in-rules.toml').read_text(encoding='utf-8') + base_rules = ( + pkg_resources.files("pyspector.rules") + .joinpath("built-in-rules.toml") + .read_text(encoding="utf-8") + ) if ai_scan: click.echo("[*] AI scanning enabled. Loading additional AI/LLM rules.") - ai_rules = pkg_resources.files('pyspector.rules').joinpath('built-in-rules-ai.toml').read_text(encoding='utf-8') + ai_rules = ( + pkg_resources.files("pyspector.rules") + .joinpath("built-in-rules-ai.toml") + .read_text(encoding="utf-8") + ) # Combine the two rulesets return base_rules + "\n" + ai_rules return base_rules except Exception as e: - raise FileNotFoundError(f"Could not load built-in-rules.toml from package data! Error: {e}") + raise FileNotFoundError( + f"Could not load built-in-rules.toml from package data! Error: {e}" + ) diff --git a/src/pyspector/plugin_system.py b/src/pyspector/plugin_system.py index 00e5eb8..8c1368a 100644 --- a/src/pyspector/plugin_system.py +++ b/src/pyspector/plugin_system.py @@ -19,6 +19,7 @@ class PluginMetadata: """Metadata for a plugin""" + def __init__( self, name: str, @@ -26,7 +27,7 @@ def __init__( author: str, description: str, requires: List[str] = None, - category: str = "general" + category: str = "general", ): self.name = name self.version = version @@ -41,41 +42,38 @@ class PySpectorPlugin(ABC): Base class for all PySpector plugins. Plugins must inherit from this class and implement required methods. """ - + @property @abstractmethod def metadata(self) -> PluginMetadata: """Return plugin metadata""" pass - + @abstractmethod def initialize(self, config: Dict[str, Any]) -> bool: """ Initialize the plugin with configuration. - + Args: config: Plugin-specific configuration - + Returns: True if initialization succeeded, False otherwise """ pass - + @abstractmethod def process_findings( - self, - findings: List[Dict[str, Any]], - scan_path: Path, - **kwargs + self, findings: List[Dict[str, Any]], scan_path: Path, **kwargs ) -> Dict[str, Any]: """ Process PySpector findings. - + Args: findings: List of vulnerability findings from PySpector scan_path: Path that was scanned **kwargs: Additional arguments passed from CLI - + Returns: Dictionary with plugin results: { @@ -86,21 +84,21 @@ def process_findings( } """ pass - + def cleanup(self) -> None: """ Cleanup resources before plugin unload. Called automatically by the plugin manager. """ pass - + def validate_config(self, config: Dict[str, Any]) -> tuple[bool, str]: """ Validate plugin configuration. - + Args: config: Configuration to validate - + Returns: Tuple of (is_valid, error_message) """ @@ -109,26 +107,37 @@ def validate_config(self, config: Dict[str, Any]) -> tuple[bool, str]: class PluginSecurity: """Security utilities for plugin system""" - + DANGEROUS_MODULES = { - 'os.system', 'subprocess.Popen', 'eval', 'exec', - '__import__', 'compile' + "os.system", + "subprocess.Popen", + "eval", + "exec", + "__import__", + "compile", } - + ALLOWED_IMPORTS = { - 'json', 'pathlib', 'typing', 'dataclasses', 're', - 'datetime', 'collections', 'itertools', 'functools' + "json", + "pathlib", + "typing", + "dataclasses", + "re", + "datetime", + "collections", + "itertools", + "functools", } - + @staticmethod def calculate_checksum(file_path: Path) -> str: """Calculate SHA256 checksum of a plugin file""" sha256 = hashlib.sha256() - with open(file_path, 'rb') as f: - for chunk in iter(lambda: f.read(4096), b''): + with open(file_path, "rb") as f: + for chunk in iter(lambda: f.read(4096), b""): sha256.update(chunk) return sha256.hexdigest() - + @staticmethod def validate_plugin_code(plugin_path: Path) -> tuple[bool, str]: """ @@ -214,7 +223,9 @@ def visit_Call(self, node: ast.Call) -> None: parts = simplified.split(".") if parts: root = alias_map.get(parts[0], parts[0]) - normalised = ".".join([root, *parts[1:]]) if len(parts) > 1 else root + normalised = ( + ".".join([root, *parts[1:]]) if len(parts) > 1 else root + ) normalised = normalised.replace("builtins.", "") if normalised in fatal_calls: @@ -235,7 +246,7 @@ def visit_Call(self, node: ast.Call) -> None: return True, f"Plugin uses sensitive operations: {ordered}" return True, "" - + @staticmethod def verify_checksum(plugin_path: Path, expected_checksum: str) -> bool: """Verify plugin file checksum""" @@ -267,9 +278,14 @@ def find_root(start: Path) -> Optional[Path]: start = start.resolve() for candidate in (start, *start.parents): if any((candidate / marker).exists() for marker in markers): - if (candidate / "src" / "pyspector").exists() or (candidate / "plugins").exists(): + if (candidate / "src" / "pyspector").exists() or ( + candidate / "plugins" + ).exists(): return candidate - if any((candidate / marker).exists() for marker in ("pyproject.toml", "setup.cfg", ".git")): + if any( + (candidate / marker).exists() + for marker in ("pyproject.toml", "setup.cfg", ".git") + ): return candidate return None @@ -302,66 +318,63 @@ def _resolve_registry_path() -> Path: class PluginRegistry: """Registry for tracking installed and trusted plugins""" - + def __init__(self, registry_path: Path): self.registry_path = registry_path self.plugins: Dict[str, Dict[str, Any]] = {} self._load_registry() - + def _load_registry(self): """Load plugin registry from disk""" if self.registry_path.exists(): try: - with open(self.registry_path, 'r') as f: + with open(self.registry_path, "r") as f: self.plugins = json.load(f) except (json.JSONDecodeError, IOError): self.plugins = {} - + def save_registry(self): """Save plugin registry to disk""" self.registry_path.parent.mkdir(parents=True, exist_ok=True) - with open(self.registry_path, 'w') as f: + with open(self.registry_path, "w") as f: json.dump(self.plugins, f, indent=2) - + def register_plugin( self, name: str, path: str, checksum: str, metadata: PluginMetadata, - trusted: bool = False + trusted: bool = False, ): """Register a plugin in the registry""" self.plugins[name] = { - 'path': path, - 'checksum': checksum, - 'version': metadata.version, - 'author': metadata.author, - 'category': metadata.category, - 'trusted': trusted, - 'enabled': True + "path": path, + "checksum": checksum, + "version": metadata.version, + "author": metadata.author, + "category": metadata.category, + "trusted": trusted, + "enabled": True, } self.save_registry() - + def is_trusted(self, name: str) -> bool: """Check if a plugin is trusted""" - return self.plugins.get(name, {}).get('trusted', False) - + return self.plugins.get(name, {}).get("trusted", False) + def get_plugin_info(self, name: str) -> Optional[Dict[str, Any]]: """Get plugin information from registry""" return self.plugins.get(name) - + def list_plugins(self) -> List[Dict[str, Any]]: """List all registered plugins""" - return [ - {'name': name, **info} - for name, info in self.plugins.items() - ] + return [{"name": name, **info} for name, info in self.plugins.items()] class PluginManager: """Manages plugin loading, validation, and execution""" - + def __init__(self, plugin_dir: Path, registry_path: Path): self.plugin_dir = plugin_dir self.plugin_dir.mkdir(parents=True, exist_ok=True) @@ -383,7 +396,9 @@ def _plugin_file_path(self, plugin_name: str) -> Path: """Return the on-disk path for a plugin.""" return self.plugin_dir / f"{plugin_name}.py" - def _stage_plugin(self, source_path: Path, plugin_name: str, overwrite: bool) -> Optional[Path]: + def _stage_plugin( + self, source_path: Path, plugin_name: str, overwrite: bool + ) -> Optional[Path]: """ Copy a plugin source file into the managed plugins directory. Returns the destination path on success. @@ -440,40 +455,43 @@ def _stage_plugin(self, source_path: Path, plugin_name: str, overwrite: bool) -> return destination - def install_plugin_file(self, plugin_name: str, source_path: Path, overwrite: bool = False) -> Optional[Path]: + def install_plugin_file( + self, plugin_name: str, source_path: Path, overwrite: bool = False + ) -> Optional[Path]: """ Install or update a plugin file in the managed directory without trusting it. """ normalised = self._normalize_plugin_name(plugin_name) if not normalised: - print(f"[!] Invalid plugin name '{plugin_name}'. Use letters, numbers, and underscores.") + print( + f"[!] Invalid plugin name '{plugin_name}'. Use letters, numbers, and underscores." + ) return None return self._stage_plugin(source_path, normalised, overwrite) - + def discover_plugins(self) -> List[Path]: """Discover all plugin files in the plugin directory""" return list(self.plugin_dir.glob("*.py")) - + def load_plugin( - self, - plugin_name: str, - require_trusted: bool = True, - force_load: bool = False + self, plugin_name: str, require_trusted: bool = True, force_load: bool = False ) -> Optional[PySpectorPlugin]: """ Load a plugin by name. - + Args: plugin_name: Name of the plugin to load require_trusted: Only load trusted plugins force_load: Force load even if security checks fail - + Returns: Loaded plugin instance or None """ normalised = self._normalize_plugin_name(plugin_name) if not normalised: - print(f"[!] Invalid plugin name '{plugin_name}'. Plugins must use letters, numbers, or underscores.") + print( + f"[!] Invalid plugin name '{plugin_name}'. Plugins must use letters, numbers, or underscores." + ) return None # Check if already loaded @@ -486,15 +504,15 @@ def load_plugin( if not plugin_path.exists(): print(f"[!] Plugin '{normalised}' not found at {plugin_path}") return None - + # Security checks plugin_info = self.registry.get_plugin_info(normalised) - + if require_trusted and not force_load: - if not plugin_info or not plugin_info.get('trusted'): + if not plugin_info or not plugin_info.get("trusted"): print(f"[!] Plugin '{normalised}' is not trusted.") print(f"[*] Use 'pyspector plugin trust {normalised}' to trust it.") - + # Perform security scan is_safe, warning = PluginSecurity.validate_plugin_code(plugin_path) if not is_safe: @@ -502,42 +520,46 @@ def load_plugin( return None if warning: print(f"[*] Security note: {warning}") - + # Ask for confirmation - response = input(f"Do you want to trust and load '{normalised}'? [y/N]: ").strip().lower() - if response not in ['y', 'yes']: + response = ( + input(f"Do you want to trust and load '{normalised}'? [y/N]: ") + .strip() + .lower() + ) + if response not in ["y", "yes"]: return None - + # Verify checksum if plugin is registered if plugin_info: checksum = PluginSecurity.calculate_checksum(plugin_path) - if checksum != plugin_info.get('checksum'): + if checksum != plugin_info.get("checksum"): print(f"[!] WARNING: Plugin '{normalised}' checksum mismatch!") print("[!] File may have been modified.") if not force_load: response = input("Load anyway? [y/N]: ").strip().lower() - if response not in ['y', 'yes']: + if response not in ["y", "yes"]: return None - + # Load the plugin module try: spec = importlib.util.spec_from_file_location(normalised, plugin_path) if not spec or not spec.loader: raise ImportError(f"Cannot load plugin spec for {normalised}") - + module = importlib.util.module_from_spec(spec) sys.modules[normalised] = module spec.loader.exec_module(module) - + # Find plugin class plugin_class = self._find_plugin_class(module) if not plugin_class: print(f"[!] No valid plugin class found in '{normalised}'") return None - + # Instantiate plugin plugin_instance = plugin_class() - + # Register if not already registered if not plugin_info: checksum = PluginSecurity.calculate_checksum(plugin_path) @@ -546,92 +568,98 @@ def load_plugin( str(plugin_path), checksum, plugin_instance.metadata, - trusted=force_load + trusted=force_load, ) - + self.loaded_plugins[normalised] = plugin_instance - - print(f"[+] Loaded plugin: {plugin_instance.metadata.name} v{plugin_instance.metadata.version}") - + + print( + f"[+] Loaded plugin: {plugin_instance.metadata.name} v{plugin_instance.metadata.version}" + ) + return plugin_instance - + except Exception as e: print(f"[!] Error loading plugin '{normalised}': {e}") import traceback + traceback.print_exc() return None - + def _find_plugin_class(self, module) -> Optional[Type[PySpectorPlugin]]: """Find the plugin class in a module""" for name, obj in inspect.getmembers(module, inspect.isclass): - if (issubclass(obj, PySpectorPlugin) and - obj != PySpectorPlugin and - not inspect.isabstract(obj)): + if ( + issubclass(obj, PySpectorPlugin) + and obj != PySpectorPlugin + and not inspect.isabstract(obj) + ): return obj return None - + def execute_plugin( self, plugin: PySpectorPlugin, findings: List[Dict[str, Any]], scan_path: Path, plugin_config: Dict[str, Any] = None, - **kwargs + **kwargs, ) -> Dict[str, Any]: """ Execute a plugin. - + Args: plugin: Plugin instance to execute findings: PySpector findings scan_path: Path that was scanned plugin_config: Plugin-specific configuration **kwargs: Additional arguments - + Returns: Plugin execution results """ try: # Initialize plugin config = plugin_config or {} - + # Validate config is_valid, error_msg = plugin.validate_config(config) if not is_valid: return { - 'success': False, - 'message': f"Invalid plugin configuration: {error_msg}", - 'data': None + "success": False, + "message": f"Invalid plugin configuration: {error_msg}", + "data": None, } - + if not plugin.initialize(config): return { - 'success': False, - 'message': "Plugin initialization failed", - 'data': None + "success": False, + "message": "Plugin initialization failed", + "data": None, } - + print(f"[*] Executing plugin: {plugin.metadata.name}") - + # Execute plugin result = plugin.process_findings(findings, scan_path, **kwargs) - + # Cleanup plugin.cleanup() - + return result - + except Exception as e: print(f"[!] Plugin execution error: {e}") import traceback + traceback.print_exc() - + return { - 'success': False, - 'message': f"Plugin execution failed: {e}", - 'data': None + "success": False, + "message": f"Plugin execution failed: {e}", + "data": None, } - + def unload_plugin(self, plugin_name: str): """Unload a plugin""" normalised = self._normalize_plugin_name(plugin_name) @@ -642,20 +670,29 @@ def unload_plugin(self, plugin_name: str): del self.loaded_plugins[normalised] if normalised in sys.modules: del sys.modules[normalised] - + def list_available_plugins(self) -> List[str]: """List all available plugins""" return [p.stem for p in self.discover_plugins()] - - def trust_plugin(self, plugin_name: str, source_path: Optional[Path] = None, overwrite: bool = False) -> bool: + + def trust_plugin( + self, + plugin_name: str, + source_path: Optional[Path] = None, + overwrite: bool = False, + ) -> bool: """Mark a plugin as trusted and ensure it resides in the managed directory.""" normalised = self._normalize_plugin_name(plugin_name) if not normalised: - print(f"[!] Invalid plugin name '{plugin_name}'. Use letters, numbers, and underscores.") + print( + f"[!] Invalid plugin name '{plugin_name}'. Use letters, numbers, and underscores." + ) return False if source_path is not None: - plugin_path = self._stage_plugin(source_path, normalised, overwrite=overwrite) + plugin_path = self._stage_plugin( + source_path, normalised, overwrite=overwrite + ) if not plugin_path: return False else: @@ -663,18 +700,18 @@ def trust_plugin(self, plugin_name: str, source_path: Optional[Path] = None, ove if not plugin_path.exists(): print(f"[!] Plugin '{normalised}' not found at {plugin_path}") return False - + # Validate plugin code is_safe, warning = PluginSecurity.validate_plugin_code(plugin_path) - + if not is_safe: print(f"[!] Security warning: {warning}") response = input("Trust this plugin anyway? [y/N]: ").strip().lower() - if response not in ['y', 'yes']: + if response not in ["y", "yes"]: return False elif warning: print(f"[*] Security note: {warning}") - + # Calculate checksum checksum = PluginSecurity.calculate_checksum(plugin_path) @@ -682,25 +719,21 @@ def trust_plugin(self, plugin_name: str, source_path: Optional[Path] = None, ove plugin = self.load_plugin(normalised, require_trusted=False, force_load=True) if not plugin: return False - + # Update registry self.registry.register_plugin( - normalised, - str(plugin_path), - checksum, - plugin.metadata, - trusted=True + normalised, str(plugin_path), checksum, plugin.metadata, trusted=True ) - + print(f"[+] Plugin '{normalised}' is now trusted") - + return True # Example plugin for reference class ExamplePlugin(PySpectorPlugin): """Example plugin implementation""" - + @property def metadata(self) -> PluginMetadata: return PluginMetadata( @@ -708,25 +741,22 @@ def metadata(self) -> PluginMetadata: version="1.0.0", author="PySpector Team", description="Example plugin for demonstration", - category="example" + category="example", ) - + def initialize(self, config: Dict[str, Any]) -> bool: self.config = config return True - + def process_findings( - self, - findings: List[Dict[str, Any]], - scan_path: Path, - **kwargs + self, findings: List[Dict[str, Any]], scan_path: Path, **kwargs ) -> Dict[str, Any]: return { - 'success': True, - 'message': f"Processed {len(findings)} findings", - 'data': {'count': len(findings)} + "success": True, + "message": f"Processed {len(findings)} findings", + "data": {"count": len(findings)}, } - + def cleanup(self) -> None: pass diff --git a/src/pyspector/reporting.py b/src/pyspector/reporting.py index f41fde2..64bcc74 100644 --- a/src/pyspector/reporting.py +++ b/src/pyspector/reporting.py @@ -1,20 +1,33 @@ import json + # Added 'Region' to imports for better SARIF compliance -from sarif_om import SarifLog, Tool, Run, ReportingDescriptor, Result, ArtifactLocation, Location, PhysicalLocation, Region +from sarif_om import ( + SarifLog, + Tool, + Run, + ReportingDescriptor, + Result, + ArtifactLocation, + Location, + PhysicalLocation, + Region, +) + # Removed 'asdict' from imports as it is not needed for sarif_om from dataclasses import asdict, is_dataclass + class Reporter: def __init__(self, issues: list, report_format: str): self.issues = issues self.format = report_format def generate(self) -> str: - if self.format == 'json': + if self.format == "json": return self.to_json() - if self.format == 'sarif': + if self.format == "sarif": return self.to_sarif() - if self.format == 'html': + if self.format == "html": return self.to_html() return self.to_console() @@ -25,12 +38,12 @@ def to_console(self) -> str: output = [] # Define severity order (highest to lowest priority) - severity_order = ['CRITICAL', 'HIGH', 'MEDIUM', 'LOW'] + severity_order = ["CRITICAL", "HIGH", "MEDIUM", "LOW"] # Group issues by severity issues_by_severity = {} for issue in self.issues: - severity = str(issue.severity).split('.')[-1].upper() + severity = str(issue.severity).split(".")[-1].upper() if severity not in issues_by_severity: issues_by_severity[severity] = [] issues_by_severity[severity].append(issue) @@ -46,7 +59,9 @@ def to_console(self) -> str: # Add severity header output.append(f"\n{'='*60}") - output.append(f" {severity} ({len(sorted_issues)} issue{'s' if len(sorted_issues) != 1 else ''})") + output.append( + f" {severity} ({len(sorted_issues)} issue{'s' if len(sorted_issues) != 1 else ''})" + ) output.append(f"{'='*60}") for issue in sorted_issues: @@ -69,10 +84,11 @@ def to_json(self) -> str: "file_path": issue.file_path, "line_number": issue.line_number, "code": issue.code, - "severity": str(issue.severity).split('.')[-1], + "severity": str(issue.severity).split(".")[-1], "remediation": issue.remediation, - } for issue in self.issues - ] + } + for issue in self.issues + ], } return json.dumps(report, indent=2) @@ -80,34 +96,46 @@ def to_sarif(self) -> str: tool = Tool(driver=ReportingDescriptor(id="pyspector", name="PySpector")) rules = [] results = [] - + # Create a unique list of rules for the SARIF report rule_map = {} for issue in self.issues: if issue.rule_id not in rule_map: - rule_map[issue.rule_id] = ReportingDescriptor(id=issue.rule_id, name=issue.description) - + rule_map[issue.rule_id] = ReportingDescriptor( + id=issue.rule_id, name=issue.description + ) + # sarif_om expects lists, not values view tool.driver.rules = list(rule_map.values()) for issue in self.issues: # FIX: Use the Region object from sarif_om instead of a raw dict region = Region(start_line=issue.line_number) - + location = Location( physical_location=PhysicalLocation( artifact_location=ArtifactLocation(uri=issue.file_path), - region=region + region=region, + ) + ) + results.append( + Result( + rule_id=issue.rule_id, + message={"text": issue.description}, + locations=[location], ) ) - results.append(Result(rule_id=issue.rule_id, message={"text": issue.description}, locations=[location])) - + run = Run(tool=tool, results=results) - log = SarifLog(version="2.1.0", schema_uri="https://schemastore.azurewebsites.net/schemas/json/sarif-2.1.0-rtm.5.json", runs=[run]) - + log = SarifLog( + version="2.1.0", + schema_uri="https://schemastore.azurewebsites.net/schemas/json/sarif-2.1.0-rtm.5.json", + runs=[run], + ) + # FIX: Remove asdict(). Use default lambda to serialize non-dataclass objects. return json.dumps(log, default=lambda o: o.__dict__, indent=2) - + def to_html(self) -> str: # A simple HTML report html = f""" @@ -136,4 +164,4 @@ def to_html(self) -> str: """ html += "" - return html \ No newline at end of file + return html diff --git a/src/pyspector/triage.py b/src/pyspector/triage.py index b50e1d1..a780dd9 100644 --- a/src/pyspector/triage.py +++ b/src/pyspector/triage.py @@ -4,17 +4,19 @@ from typing import List, Dict, Any import hashlib -from textual.app import App, ComposeResult # type: ignore -from textual.widgets import Header, Footer, DataTable, Static, Label # type: ignore -from textual.containers import Vertical # type: ignore -from textual.binding import Binding # type: ignore +from textual.app import App, ComposeResult # type: ignore +from textual.widgets import Header, Footer, DataTable, Static, Label # type: ignore +from textual.containers import Vertical # type: ignore +from textual.binding import Binding # type: ignore + # Helper to create a unique, stable fingerprint for an issue def create_fingerprint(issue: Dict[str, Any]) -> str: # Use rule ID, file path relative to a potential project root, and the line content # This makes the fingerprint stable across different checkout directories unique_string = f"{issue.get('rule_id', '')}|{issue.get('file_path', '')}|{issue.get('line_number', '')}|{issue.get('code', '').strip()}" - return hashlib.sha1(unique_string.encode('utf-8')).hexdigest() + return hashlib.sha1(unique_string.encode("utf-8")).hexdigest() + class PySpectorTriage(App): """An interactive TUI for triaging PySpector findings.""" @@ -39,9 +41,11 @@ def _load_baseline_data(self): """Load baseline data without touching any UI elements.""" if self.baseline_path.exists(): try: - with self.baseline_path.open('r') as f: + with self.baseline_path.open("r") as f: data = json.load(f) - self.ignored_fingerprints = set(data.get("ignored_fingerprints", [])) + self.ignored_fingerprints = set( + data.get("ignored_fingerprints", []) + ) self.initial_status = f"Status: Loaded baseline with {len(self.ignored_fingerprints)} ignored issues" except (json.JSONDecodeError, IOError) as e: self.initial_status = f"Status: Error loading baseline: {str(e)}" @@ -52,7 +56,10 @@ def compose(self) -> ComposeResult: yield Header() yield Vertical( Label("PySpector Triage Mode", id="title"), - Label("Navigate with arrows. 'i' = Ignore/Unignore. 's' = Save & Quit. 'q' = Quit without saving.", id="instructions"), + Label( + "Navigate with arrows. 'i' = Ignore/Unignore. 's' = Save & Quit. 'q' = Quit without saving.", + id="instructions", + ), ) yield DataTable(id="issue_table") yield Footer() @@ -62,22 +69,24 @@ def on_mount(self) -> None: """Called after all widgets are mounted and ready.""" table = self.query_one("#issue_table", DataTable) table.cursor_type = "row" - table.add_columns("Status", "Severity", "File", "Line", "Rule ID", "Description") - + table.add_columns( + "Status", "Severity", "File", "Line", "Rule ID", "Description" + ) + # Update status bar with baseline loading result status_bar = self.query_one("#status_bar", Static) status_bar.update(self.initial_status) - + # Populate the table self.update_table() - + # Ensure the table has focus so key bindings work table.focus() def on_key(self, event): """Handle key presses directly.""" print(f"KEY PRESSED: {event.key}") # Debug output - + if event.key == "i": print("I KEY DETECTED - calling toggle_ignore") self.toggle_ignore() @@ -90,7 +99,7 @@ def on_key(self, event): print("Q KEY DETECTED - calling quit") self.exit("Exited triage mode.") return True - + # Let other keys pass through (like arrow keys for navigation) return False @@ -98,40 +107,45 @@ def update_table(self): table = self.query_one("#issue_table", DataTable) current_cursor = table.cursor_row if table.row_count > 0 else 0 table.clear() - + severity_order = {"CRITICAL": 0, "HIGH": 1, "MEDIUM": 2, "LOW": 3} - + # Sort issues by severity then by file path sorted_issues = sorted( self.issues, - key=lambda x: (severity_order.get(x.get("severity", "LOW").split('.')[-1], 4), x.get("file_path", "")) + key=lambda x: ( + severity_order.get(x.get("severity", "LOW").split(".")[-1], 4), + x.get("file_path", ""), + ), ) for issue in sorted_issues: fingerprint = create_fingerprint(issue) - status = "[Ignored]" if fingerprint in self.ignored_fingerprints else "[Active]" - + status = ( + "[Ignored]" if fingerprint in self.ignored_fingerprints else "[Active]" + ) + # Handle severity display - sev = issue.get('severity', 'N/A') - if '.' in sev: - sev = sev.split('.')[-1] - - fpath = issue.get('file_path', 'N/A') - line = str(issue.get('line_number', 'N/A')) - rule = issue.get('rule_id', 'N/A') - desc = issue.get('description', 'N/A') - + sev = issue.get("severity", "N/A") + if "." in sev: + sev = sev.split(".")[-1] + + fpath = issue.get("file_path", "N/A") + line = str(issue.get("line_number", "N/A")) + rule = issue.get("rule_id", "N/A") + desc = issue.get("description", "N/A") + # Truncate long file paths for better display if len(fpath) > 40: fpath = "..." + fpath[-37:] - + # Truncate long descriptions if len(desc) > 60: desc = desc[:57] + "..." - + styled_sev = f"[{self._get_severity_color(sev)}]{sev}[/]" table.add_row(status, styled_sev, fpath, line, rule, desc, key=fingerprint) - + # Restore cursor position if table.row_count > 0: if current_cursor >= table.row_count: @@ -143,37 +157,42 @@ def toggle_ignore(self): print("TOGGLE IGNORE FUNCTION CALLED!") table = self.query_one("#issue_table", DataTable) status_bar = self.query_one("#status_bar", Static) - + print(f"Table row count: {table.row_count}, cursor: {table.cursor_row}") - + if table.row_count == 0: status_bar.update("Status: No issues to toggle") return - + if table.cursor_row < 0 or table.cursor_row >= table.row_count: - status_bar.update(f"Status: Invalid cursor position: {table.cursor_row}/{table.row_count}") + status_bar.update( + f"Status: Invalid cursor position: {table.cursor_row}/{table.row_count}" + ) return - + try: # Get the sorted issues list (same as in update_table) severity_order = {"CRITICAL": 0, "HIGH": 1, "MEDIUM": 2, "LOW": 3} sorted_issues = sorted( self.issues, - key=lambda x: (severity_order.get(x.get("severity", "LOW").split('.')[-1], 4), x.get("file_path", "")) + key=lambda x: ( + severity_order.get(x.get("severity", "LOW").split(".")[-1], 4), + x.get("file_path", ""), + ), ) - + # Get the current issue by cursor position current_issue = sorted_issues[table.cursor_row] fingerprint = create_fingerprint(current_issue) - + print(f"Working with fingerprint: {fingerprint}") - + # Get issue info for status message - file_path = current_issue.get('file_path', 'Unknown') - rule_id = current_issue.get('rule_id', 'Unknown') - line_num = current_issue.get('line_number', 'Unknown') + file_path = current_issue.get("file_path", "Unknown") + rule_id = current_issue.get("rule_id", "Unknown") + line_num = current_issue.get("line_number", "Unknown") issue_desc = f" ({Path(file_path).name}:{line_num} - {rule_id})" - + # Toggle the ignore status if fingerprint in self.ignored_fingerprints: self.ignored_fingerprints.remove(fingerprint) @@ -183,26 +202,29 @@ def toggle_ignore(self): self.ignored_fingerprints.add(fingerprint) status_bar.update(f"Status: Marked issue as IGNORED{issue_desc}") print(f"Marked as IGNORED: {fingerprint}") - + # Refresh the table self.update_table() - + except Exception as e: import traceback + error_details = traceback.format_exc() status_bar.update(f"Status: Error: {str(e)}") print(f"Toggle error: {error_details}") - + def save_and_quit(self): """Save the baseline and exit.""" try: baseline_data = { "ignored_fingerprints": sorted(list(self.ignored_fingerprints)) } - with self.baseline_path.open('w') as f: + with self.baseline_path.open("w") as f: json.dump(baseline_data, f, indent=2) - - self.exit(f"Baseline saved to '{self.baseline_path}' with {len(self.ignored_fingerprints)} ignored issues.") + + self.exit( + f"Baseline saved to '{self.baseline_path}' with {len(self.ignored_fingerprints)} ignored issues." + ) except IOError as e: status_bar = self.query_one("#status_bar", Static) status_bar.update(f"Status: Error saving baseline: {str(e)}") @@ -215,15 +237,16 @@ def _get_severity_color(self, severity: str) -> str: "LOW": "bold blue", }.get(severity.upper(), "white") + def run_triage_tui(issues_data: List[Dict[str, Any]], baseline_path: Path): """Initializes and runs the Textual Triage App.""" if not issues_data: print("No issues found in the report to triage.") return - + print(f"Starting triage mode with {len(issues_data)} issues...") print(f"Baseline file: {baseline_path}") - + try: app = PySpectorTriage(issues=issues_data, baseline_path=baseline_path) result = app.run() @@ -232,4 +255,5 @@ def run_triage_tui(issues_data: List[Dict[str, Any]], baseline_path: Path): except Exception as e: print(f"Error running triage app: {e}") import traceback - traceback.print_exc() \ No newline at end of file + + traceback.print_exc() diff --git a/tests/examples/hardcoded_anthropic_key.py b/tests/examples/hardcoded_anthropic_key.py index 65e19d8..34ad411 100644 --- a/tests/examples/hardcoded_anthropic_key.py +++ b/tests/examples/hardcoded_anthropic_key.py @@ -1,4 +1,5 @@ ANTHROPIC_API_KEY = "sk-ant-api03-FAKEKEYFORTESTING-ABCDEF1234567890" + def example(): pass diff --git a/tests/unit/reporting_test.py b/tests/unit/reporting_test.py index aee2a79..d4c35df 100644 --- a/tests/unit/reporting_test.py +++ b/tests/unit/reporting_test.py @@ -5,6 +5,7 @@ from types import SimpleNamespace from pyspector.reporting import Reporter + class TestReporter(unittest.TestCase): test_issue = SimpleNamespace( @@ -14,7 +15,7 @@ class TestReporter(unittest.TestCase): line_number=1, code='eval("a=5 print(a)")', severity="High", - remediation="Avoid 'eval()'. Use safer alternatives like 'ast.literal_eval' for data parsing." + remediation="Avoid 'eval()'. Use safer alternatives like 'ast.literal_eval' for data parsing.", ) def test_to_json(self): @@ -22,7 +23,7 @@ def test_to_json(self): output = reporter.to_json() output_json = json.loads(output) - + # Check issues summary self.assertEqual(output_json["summary"]["issue_count"], 1) @@ -36,7 +37,6 @@ def test_to_json(self): self.assertEqual(issue_json["severity"], self.test_issue.severity) self.assertEqual(issue_json["remediation"], self.test_issue.remediation) - def test_to_sarif(self): reporter = Reporter([self.test_issue], "sarif") output = reporter.to_sarif() @@ -45,7 +45,10 @@ def test_to_sarif(self): # Check top level SARIF fields self.assertEqual(output_json.get("version"), "2.1.0") - self.assertEqual(output_json.get("schema_uri"), "https://schemastore.azurewebsites.net/schemas/json/sarif-2.1.0-rtm.5.json") + self.assertEqual( + output_json.get("schema_uri"), + "https://schemastore.azurewebsites.net/schemas/json/sarif-2.1.0-rtm.5.json", + ) # Check runs self.assertIn("runs", output_json) @@ -68,7 +71,7 @@ def test_to_sarif(self): # Check rule id self.assertEqual(result.get("rule_id"), self.test_issue.rule_id) self.assertEqual(result.get("kind"), "fail") - + # Check description self.assertIn("message", result) self.assertEqual(result["message"].get("text"), self.test_issue.description) @@ -83,7 +86,6 @@ def test_to_sarif(self): artifact = physical["artifact_location"] self.assertEqual(artifact.get("uri"), self.test_issue.file_path) - def test_to_html(self): reporter = Reporter([self.test_issue], "html") output = reporter.to_html() @@ -91,7 +93,7 @@ def test_to_html(self): soup = BeautifulSoup(output, "html.parser") self.assertEqual(soup.title.string, "PySpector Scan Report") - + # Check header h1 h1 = soup.find("h1") self.assertIsNotNone(h1) @@ -120,10 +122,11 @@ def test_to_html(self): self.assertEqual(cells[1].text.strip(), str(self.test_issue.line_number)) self.assertEqual(cells[2].text.strip(), self.test_issue.severity) self.assertEqual(cells[3].text.strip(), self.test_issue.description) - + code_cell = cells[4].find("code") self.assertIsNotNone(code_cell) self.assertEqual(code_cell.text.strip(), self.test_issue.code) + if __name__ == "__main__": unittest.main()