Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 85 additions & 13 deletions strix/runtime/docker_runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,30 @@ def __init__(self) -> None:
def _generate_sandbox_token(self) -> str:
return secrets.token_urlsafe(32)

def _find_available_port(self) -> int:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind(("", 0))
return cast("int", s.getsockname()[1])
def _find_available_port(self, max_attempts: int = 5) -> int:
"""Find an available port with retry logic to handle TOCTOU race conditions.

The port is verified to be available at the time of check, but may be taken
by the time it's used. The caller should handle port-in-use errors and retry
container creation if needed.
"""
last_port = 0
for attempt in range(max_attempts):
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind(("", 0))
port = cast("int", s.getsockname()[1])

# Avoid returning the same port if we're retrying
if port != last_port:
return port
last_port = port

if attempt < max_attempts - 1:
time.sleep(0.1)

# If we somehow keep getting the same port, just return it
return last_port

def _get_scan_id(self, agent_id: str) -> str:
try:
Expand Down Expand Up @@ -239,44 +259,96 @@ def _initialize_container(
)
caido_token = result.output.decode().strip() if result.exit_code == 0 else ""

# Security: Pass token via environment variable instead of CLI argument
# to prevent exposure in process listings (ps aux)
container.exec_run(
f"bash -c 'source /etc/profile.d/proxy.sh && cd /app && "
f"STRIX_SANDBOX_MODE=true CAIDO_API_TOKEN={caido_token} CAIDO_PORT={caido_port} "
f"poetry run python strix/runtime/tool_server.py --token {tool_server_token} "
f"TOOL_SERVER_TOKEN={tool_server_token} "
f"poetry run python strix/runtime/tool_server.py "
f"--host 0.0.0.0 --port {tool_server_port} &'",
detach=True,
user="pentester",
)

time.sleep(5)

def _validate_path_safety(self, local_path: Path, resolved_path: Path) -> bool:
"""Validate that a path is safe to copy (no symlink escapes or sensitive directories)."""
# Check if resolved path escapes the original path's parent (symlink attack)
try:
resolved_path.relative_to(local_path.parent)
except ValueError:
logger.warning(
f"Security: Path {local_path} resolves outside its parent directory "
f"(symlink escape attempt?): {resolved_path}"
)
return False

# Block sensitive system directories
sensitive_prefixes = (
"/etc",
"/var",
"/root",
"/home",
"/proc",
"/sys",
"/dev",
"/boot",
"/usr",
"/lib",
"/bin",
"/sbin",
)
resolved_str = str(resolved_path)
if any(resolved_str.startswith(prefix) for prefix in sensitive_prefixes):
# Allow if the original path explicitly targets these (user intent)
if not str(local_path).startswith(tuple(sensitive_prefixes)):
logger.warning(
f"Security: Path {local_path} resolves to sensitive location: {resolved_path}"
)
return False

return True

def _copy_local_directory_to_container(
self, container: Container, local_path: str, target_name: str | None = None
) -> None:
import tarfile
from io import BytesIO

try:
local_path_obj = Path(local_path).resolve()
if not local_path_obj.exists() or not local_path_obj.is_dir():
logger.warning(f"Local path does not exist or is not directory: {local_path_obj}")
local_path_obj = Path(local_path)
resolved_path = local_path_obj.resolve()

if not resolved_path.exists() or not resolved_path.is_dir():
logger.warning(f"Local path does not exist or is not directory: {resolved_path}")
return

# Security: Validate the resolved path is safe
if not self._validate_path_safety(local_path_obj, resolved_path):
logger.error(f"Security: Refusing to copy potentially unsafe path: {local_path}")
return

if target_name:
logger.info(
f"Copying local directory {local_path_obj} to container at "
f"Copying local directory {resolved_path} to container at "
f"/workspace/{target_name}"
)
else:
logger.info(f"Copying local directory {local_path_obj} to container")
logger.info(f"Copying local directory {resolved_path} to container")

tar_buffer = BytesIO()
with tarfile.open(fileobj=tar_buffer, mode="w") as tar:
for item in local_path_obj.rglob("*"):
for item in resolved_path.rglob("*"):
# Security: Skip symlinks to prevent symlink attacks within the directory
if item.is_symlink():
logger.debug(f"Skipping symlink: {item}")
continue
if item.is_file():
rel_path = item.relative_to(local_path_obj)
rel_path = item.relative_to(resolved_path)
arcname = Path(target_name) / rel_path if target_name else rel_path
tar.add(item, arcname=arcname)
tar.add(item, arcname=str(arcname))

tar_buffer.seek(0)
container.put_archive("/workspace", tar_buffer.getvalue())
Expand Down
28 changes: 24 additions & 4 deletions strix/runtime/tool_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,23 @@
raise RuntimeError("Tool server should only run in sandbox mode (STRIX_SANDBOX_MODE=true)")

parser = argparse.ArgumentParser(description="Start Strix tool server")
parser.add_argument("--token", required=True, help="Authentication token")
parser.add_argument(
"--token",
required=False,
help="Authentication token (prefer TOOL_SERVER_TOKEN env var for security)",
)
parser.add_argument("--host", default="0.0.0.0", help="Host to bind to") # nosec
parser.add_argument("--port", type=int, required=True, help="Port to bind to")

args = parser.parse_args()
EXPECTED_TOKEN = args.token

# Security: Prefer environment variable over CLI argument to avoid token exposure in process list
EXPECTED_TOKEN = os.getenv("TOOL_SERVER_TOKEN") or args.token
if not EXPECTED_TOKEN:
raise RuntimeError(
"Authentication token required. Set TOOL_SERVER_TOKEN environment variable "
"or use --token argument (env var preferred for security)."
)

app = FastAPI()
security = HTTPBearer()
Expand Down Expand Up @@ -154,12 +165,21 @@ async def register_agent(


@app.get("/health")
async def health_check() -> dict[str, Any]:
async def health_check() -> dict[str, str]:
"""Public health check - returns minimal information for liveness probes."""
return {"status": "healthy"}


@app.get("/health/detailed")
async def health_check_detailed(
credentials: HTTPAuthorizationCredentials = security_dependency,
) -> dict[str, Any]:
"""Authenticated detailed health check - returns internal state for debugging."""
verify_token(credentials)
return {
"status": "healthy",
"sandbox_mode": str(SANDBOX_MODE),
"environment": "sandbox" if SANDBOX_MODE else "main",
"auth_configured": "true" if EXPECTED_TOKEN else "false",
"active_agents": len(agent_processes),
"agents": list(agent_processes.keys()),
}
Expand Down
22 changes: 20 additions & 2 deletions strix/tools/proxy/proxy_manager.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import base64
import logging
import os
import re
import time
Expand All @@ -16,6 +17,23 @@
from collections.abc import Callable


logger = logging.getLogger(__name__)

# Security: TLS verification is disabled by default for penetration testing
# to allow intercepting HTTPS traffic. Set STRIX_VERIFY_TLS=true to enable.
VERIFY_TLS = os.getenv("STRIX_VERIFY_TLS", "false").lower() == "true"

if not VERIFY_TLS:
# Suppress urllib3 InsecureRequestWarning when TLS verification is disabled
import urllib3

urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
logger.debug(
"TLS verification disabled for proxy requests (expected for penetration testing). "
"Set STRIX_VERIFY_TLS=true to enable."
)


class ProxyManager:
def __init__(self, auth_token: str | None = None):
host = "127.0.0.1"
Expand Down Expand Up @@ -246,7 +264,7 @@ def send_simple_request(
data=body or None,
proxies=self.proxies,
timeout=timeout,
verify=False,
verify=VERIFY_TLS,
)
response_time = int((time.time() - start_time) * 1000)

Expand Down Expand Up @@ -383,7 +401,7 @@ def _send_modified_request(
data=request_data["body"] or None,
proxies=self.proxies,
timeout=30,
verify=False,
verify=VERIFY_TLS,
)
response_time = int((time.time() - start_time) * 1000)

Expand Down
2 changes: 1 addition & 1 deletion tests/runtime/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
"""Tests for strix.runtime module."""
# Tests for runtime module
Loading