From 9bfb15cbb2e540d6fe92d887bf2f19e5e5babf17 Mon Sep 17 00:00:00 2001 From: Niels Bantilan Date: Fri, 20 Feb 2026 10:13:47 -0500 Subject: [PATCH 1/3] example: app-based interactive sandbox Signed-off-by: Niels Bantilan --- .../interactive_sandbox_app/fastapi_app.py | 236 +++++++ .../interactive_sandbox_app.py | 54 ++ .../apps/interactive_sandbox_app/sandbox.py | 576 ++++++++++++++++++ 3 files changed, 866 insertions(+) create mode 100644 examples/apps/interactive_sandbox_app/fastapi_app.py create mode 100644 examples/apps/interactive_sandbox_app/interactive_sandbox_app.py create mode 100644 examples/apps/interactive_sandbox_app/sandbox.py diff --git a/examples/apps/interactive_sandbox_app/fastapi_app.py b/examples/apps/interactive_sandbox_app/fastapi_app.py new file mode 100644 index 000000000..fe9341ede --- /dev/null +++ b/examples/apps/interactive_sandbox_app/fastapi_app.py @@ -0,0 +1,236 @@ +"""FastAPI app for running commands with seccomp + Landlock sandboxing.""" + +import os +import shlex +import subprocess +from typing import List, Optional + +from fastapi import FastAPI, HTTPException +from pydantic import BaseModel, Field + +from sandbox import ( + LANDLOCK_AVAILABLE, + SAFE_ENV, + SECCOMP_AVAILABLE, + check_landlock_support, + check_seccomp_support, + get_sandbox_info, + run_command_sandboxed, +) + + +app = FastAPI( + title="Interactive Sandbox", + description="A FastAPI app for running shell commands with unprivileged sandboxing.", + version="1.0.0", +) + +SANDBOX_MODE = os.environ.get("SANDBOX_MODE", "full").lower() +DEFAULT_TIMEOUT = float(os.environ.get("COMMAND_TIMEOUT", "30")) +ALLOW_NETWORK = os.environ.get("ALLOW_NETWORK", "false").lower() == "true" +MAX_MEMORY_MB = int(os.environ.get("MAX_MEMORY_MB", "512")) +MAX_PROCESSES = int(os.environ.get("MAX_PROCESSES", "50")) + + +class CommandResponse(BaseModel): + stdout: str + stderr: str + returncode: int + + +class SandboxStatus(BaseModel): + sandbox_mode: str + landlock_module_available: bool + landlock_kernel_support: bool + seccomp_module_available: bool + seccomp_kernel_support: bool + allow_network: bool + default_timeout: float + max_memory_mb: int + max_processes: int + extra_read_paths: Optional[List[str]] + extra_write_paths: Optional[List[str]] + safe_environment: dict + + +class SystemInfo(BaseModel): + platform: str + kernel_release: str + landlock: dict + seccomp: dict + resource_limits_available: bool + + +class CommandRequest(BaseModel): + command: str = Field(..., description="Shell command to execute") + + +@app.get("/health") +async def health() -> dict: + return {"status": "healthy"} + + +@app.get("/sandbox/status", response_model=SandboxStatus) +async def sandbox_status() -> SandboxStatus: + return SandboxStatus( + sandbox_mode=SANDBOX_MODE, + landlock_module_available=LANDLOCK_AVAILABLE, + landlock_kernel_support=check_landlock_support(), + seccomp_module_available=SECCOMP_AVAILABLE, + seccomp_kernel_support=check_seccomp_support(), + allow_network=ALLOW_NETWORK, + default_timeout=DEFAULT_TIMEOUT, + max_memory_mb=MAX_MEMORY_MB, + max_processes=MAX_PROCESSES, + extra_read_paths=EXTRA_READ_PATHS, + extra_write_paths=EXTRA_WRITE_PATHS, + safe_environment=SAFE_ENV, + ) + + +@app.get("/sandbox/info", response_model=SystemInfo) +async def sandbox_info() -> SystemInfo: + info = get_sandbox_info() + return SystemInfo(**info) + + +@app.post("/run", response_model=CommandResponse) +async def run_command(command: str) -> CommandResponse: + """Run a command in a sandboxed subprocess.""" + if not command.strip(): + raise HTTPException(status_code=400, detail="Command cannot be empty") + + if SANDBOX_MODE == "none": + result = subprocess.run( + shlex.split(command), + capture_output=True, + text=True, + shell=False, + timeout=DEFAULT_TIMEOUT, + env=SAFE_ENV, + ) + else: + result = run_command_sandboxed( + command=command, + allow_network=ALLOW_NETWORK, + timeout=DEFAULT_TIMEOUT, + max_memory_mb=MAX_MEMORY_MB, + max_processes=MAX_PROCESSES, + ) + + return CommandResponse( + stdout=result.stdout, + stderr=result.stderr, + returncode=result.returncode, + ) + + +@app.post("/run/advanced", response_model=CommandResponse) +async def run_command_advanced(request: CommandRequest) -> CommandResponse: + """Run a command with custom sandbox settings.""" + if not request.command.strip(): + raise HTTPException(status_code=400, detail="Command cannot be empty") + + timeout = request.timeout or DEFAULT_TIMEOUT + allow_network = ( + request.allow_network if request.allow_network is not None else ALLOW_NETWORK + ) + max_memory = request.max_memory_mb or MAX_MEMORY_MB + max_procs = request.max_processes or MAX_PROCESSES + + if SANDBOX_MODE == "none": + try: + result = subprocess.run( + shlex.split(request.command), + capture_output=True, + text=True, + shell=False, + timeout=timeout, + env=SAFE_ENV, + ) + except subprocess.TimeoutExpired: + return CommandResponse( + stdout="", + stderr=f"Command timed out after {timeout} seconds", + returncode=-1, + ) + else: + result = run_command_sandboxed( + command=request.command, + allow_network=allow_network, + extra_read_paths=request.extra_read_paths, + extra_write_paths=request.extra_write_paths, + timeout=timeout, + max_memory_mb=max_memory, + max_processes=max_procs, + ) + + return CommandResponse( + stdout=result.stdout, + stderr=result.stderr, + returncode=result.returncode, + ) + + +@app.get("/sandbox/test") +async def test_sandbox() -> dict: + """Run security tests to verify sandbox restrictions.""" + tests = {} + + result = run_command_sandboxed("echo 'sandbox test'", timeout=5) + tests["basic_execution"] = { + "passed": result.returncode == 0 and "sandbox test" in result.stdout, + "stdout": result.stdout.strip(), + "returncode": result.returncode, + } + + result = run_command_sandboxed("head -1 /etc/passwd", timeout=5) + tests["read_allowed_path"] = { + "passed": result.returncode == 0, + "description": "Should be able to read /etc/passwd", + "returncode": result.returncode, + } + + result = run_command_sandboxed( + "touch /tmp/sandbox_test && rm /tmp/sandbox_test", timeout=5 + ) + tests["write_tmp"] = { + "passed": result.returncode == 0, + "description": "Should be able to write to /tmp", + "returncode": result.returncode, + } + + result = run_command_sandboxed("touch /etc/sandbox_test", timeout=5) + tests["write_etc_blocked"] = { + "passed": result.returncode != 0, + "description": "Should NOT be able to write to /etc", + "returncode": result.returncode, + } + + if not ALLOW_NETWORK: + result = run_command_sandboxed( + "curl -s --connect-timeout 2 https://example.com", timeout=5 + ) + tests["network_blocked"] = { + "passed": result.returncode != 0, + "description": "Network access should be blocked", + "returncode": result.returncode, + } + + result = run_command_sandboxed("cat /proc/self/environ", timeout=5) + tests["proc_environ_blocked"] = { + "passed": result.returncode != 0 or not result.stdout, + "description": "/proc access should be restricted", + "returncode": result.returncode, + } + + passed = sum(1 for t in tests.values() if t.get("passed", False)) + total = len(tests) + + return { + "summary": f"{passed}/{total} tests passed", + "sandbox_mode": SANDBOX_MODE, + "landlock_available": check_landlock_support(), + "seccomp_available": check_seccomp_support(), + "tests": tests, + } diff --git a/examples/apps/interactive_sandbox_app/interactive_sandbox_app.py b/examples/apps/interactive_sandbox_app/interactive_sandbox_app.py new file mode 100644 index 000000000..b23c468d8 --- /dev/null +++ b/examples/apps/interactive_sandbox_app/interactive_sandbox_app.py @@ -0,0 +1,54 @@ +"""Interactive Sandbox App - A Flyte app for running shell commands with sandboxing. + +This app uses seccomp-bpf + Landlock for unprivileged sandboxing that works in +restricted Kubernetes environments without requiring special capabilities. + +Security Features: + - seccomp-bpf: Blocks dangerous syscalls (setuid, mount, ptrace, etc.) + - Landlock: Restricts filesystem access (requires Linux 5.13+) + - Resource limits: Prevents CPU/memory/process exhaustion + - Clean environment: Only safe environment variables passed to commands + +Environment Variables: + - SANDBOX_MODE: "full" (default) or "none" to disable + - ALLOW_NETWORK: "true" or "false" (default: false) + - COMMAND_TIMEOUT: timeout in seconds (default: 30) + - MAX_MEMORY_MB: max memory per command in MB (default: 512) + - MAX_PROCESSES: max processes per command (default: 50) + +API Endpoints: + - GET /health - Health check + - GET /sandbox/status - Current sandbox configuration + - GET /sandbox/info - System sandbox capabilities + - GET /sandbox/test - Run security tests + - POST /run?command=... - Run a command + - POST /run/advanced - Run with custom settings +""" + +import flyte +from flyte.app import AppEnvironment + +image = ( + flyte.Image.from_debian_base(name="interactive-sandbox-image") + .with_apt_packages("curl", "ca-certificates", "libseccomp2", "libseccomp-dev") + .with_commands(["ldconfig"]) + .with_pip_packages("fastapi", "uvicorn", "pyseccomp", "landlock") +) + +interactive_sandbox_app = AppEnvironment( + name="interactive-sandbox-app", + description="A FastAPI app for running shell commands with sandboxing.", + image=image, + resources=flyte.Resources(cpu=1, memory="512Mi"), + requires_auth=False, + include=["fastapi_app.py", "sandbox.py"], + args=["uvicorn", "fastapi_app:app", "--host", "0.0.0.0", "--port", "8080"], +) + + +if __name__ == "__main__": + import logging + + flyte.init_from_config(log_level=logging.DEBUG) + app_handle = flyte.serve(interactive_sandbox_app) + print(f"App URL: {app_handle.url}") diff --git a/examples/apps/interactive_sandbox_app/sandbox.py b/examples/apps/interactive_sandbox_app/sandbox.py new file mode 100644 index 000000000..5c7b1124e --- /dev/null +++ b/examples/apps/interactive_sandbox_app/sandbox.py @@ -0,0 +1,576 @@ +"""Unprivileged sandboxing using seccomp-bpf and Landlock. + +This module provides sandboxing that works in restricted Kubernetes environments +without requiring special capabilities like SYS_ADMIN or SETFCAP. +""" + +import glob +import os +import resource +import subprocess +import sys +from dataclasses import dataclass, field +from typing import Dict, List, Optional, Set + +LANDLOCK_AVAILABLE = False +try: + import landlock + + LANDLOCK_AVAILABLE = True +except ImportError: + pass + +SECCOMP_AVAILABLE = False +seccomp = None +try: + import seccomp as _seccomp + + seccomp = _seccomp + SECCOMP_AVAILABLE = True +except ImportError: + try: + import pyseccomp as _seccomp + + seccomp = _seccomp + SECCOMP_AVAILABLE = True + except (ImportError, RuntimeError): + try: + import ctypes.util + + _original_find_library = ctypes.util.find_library + + def _patched_find_library(name): + result = _original_find_library(name) + if result is None and name == "seccomp": + for pattern in [ + "/usr/lib/*/libseccomp.so*", + "/lib/*/libseccomp.so*", + "/usr/lib/libseccomp.so*", + "/lib/libseccomp.so*", + ]: + matches = glob.glob(pattern) + if matches: + return matches[0] + return result + + ctypes.util.find_library = _patched_find_library + import pyseccomp as _seccomp + + seccomp = _seccomp + SECCOMP_AVAILABLE = True + except (ImportError, RuntimeError, OSError): + pass + + +SAFE_ENV = { + "PATH": "/usr/local/bin:/usr/bin:/bin", + "HOME": "/tmp", + "TMPDIR": "/tmp", + "LANG": "C.UTF-8", + "LC_ALL": "C.UTF-8", + "TERM": "xterm", +} + + +@dataclass +class ResourceLimits: + max_processes: int = 256 + max_file_size: int = 10 * 1024 * 1024 # 10 MB + max_memory: int = 1024 * 1024 * 1024 # 1 GB + max_cpu_time: int = 30 + max_open_files: int = 1024 + max_data_size: int = 512 * 1024 * 1024 # 512 MB + max_stack_size: int = 8 * 1024 * 1024 # 8 MB + + +@dataclass +class SandboxConfig: + read_paths: List[str] = field( + default_factory=lambda: [ + "/usr", + "/lib", + "/lib64", + "/lib32", + "/bin", + "/sbin", + "/etc/passwd", + "/etc/group", + "/etc/hosts", + "/etc/resolv.conf", + "/etc/ssl", + "/etc/ca-certificates", + "/etc/localtime", + "/etc/ld.so.cache", + "/etc/ld.so.conf", + "/etc/ld.so.conf.d", + ] + ) + write_paths: List[str] = field(default_factory=lambda: ["/tmp"]) + read_write_paths: List[str] = field(default_factory=list) + device_paths: List[str] = field( + default_factory=lambda: [ + "/dev/null", + "/dev/zero", + "/dev/urandom", + "/dev/random", + "/dev/tty", + "/dev/pts", + ] + ) + use_seccomp_allowlist: bool = True + allowed_syscalls: Set[str] = field( + default_factory=lambda: { + "read", "write", "readv", "writev", "pread64", "pwrite64", + "lseek", "close", "fstat", "stat", "lstat", "fstatat", + "newfstatat", "statx", + "open", "openat", "creat", "access", "faccessat", "faccessat2", + "readlink", "readlinkat", "getcwd", "chdir", "fchdir", + "dup", "dup2", "dup3", "fcntl", "flock", + "truncate", "ftruncate", + "getdents", "getdents64", "mkdir", "mkdirat", "rmdir", + "unlink", "unlinkat", "rename", "renameat", "renameat2", + "link", "linkat", "symlink", "symlinkat", + "mmap", "munmap", "mprotect", "mremap", "brk", + "madvise", "msync", + "fork", "vfork", "clone", "clone3", "execve", "execveat", + "wait4", "waitid", "exit", "exit_group", + "getpid", "getppid", "gettid", "getuid", "getgid", + "geteuid", "getegid", "getgroups", + "rt_sigaction", "rt_sigprocmask", "rt_sigreturn", + "sigaltstack", "kill", "tgkill", + "clock_gettime", "clock_getres", "gettimeofday", + "nanosleep", "clock_nanosleep", + "poll", "ppoll", "select", "pselect6", "epoll_create", + "epoll_create1", "epoll_ctl", "epoll_wait", "epoll_pwait", + "epoll_pwait2", "eventfd", "eventfd2", + "pipe", "pipe2", + "getrlimit", "prlimit64", "getrusage", + "uname", "sysinfo", "getrandom", + "futex", "set_robust_list", "get_robust_list", + "set_tid_address", "arch_prctl", "prctl", + "ioctl", + "sched_getaffinity", "sched_yield", + "rseq", + } + ) + blocked_syscalls: Set[str] = field( + default_factory=lambda: { + "setuid", "setgid", "setreuid", "setregid", + "setresuid", "setresgid", "setfsuid", "setfsgid", + "capset", "capget", + "init_module", "finit_module", "delete_module", + "mount", "umount", "umount2", "pivot_root", + "sysfs", "statfs", "fstatfs", + "ptrace", "process_vm_readv", "process_vm_writev", + "iopl", "ioperm", "ioprio_set", + "settimeofday", "clock_settime", "adjtimex", "clock_adjtime", + "reboot", "kexec_load", "kexec_file_load", + "swapon", "swapoff", + "unshare", "setns", + "add_key", "request_key", "keyctl", + "bpf", + "perf_event_open", + "userfaultfd", + "personality", + "acct", + "quotactl", "quotactl_fd", + "nfsservctl", + "lookup_dcookie", + "vhangup", + "modify_ldt", + "vm86", "vm86old", + "seccomp", + } + ) + allow_network: bool = False + network_syscalls: Set[str] = field( + default_factory=lambda: { + "socket", "socketpair", + "connect", "accept", "accept4", + "bind", "listen", + "sendto", "recvfrom", + "sendmsg", "recvmsg", + "sendmmsg", "recvmmsg", + "shutdown", + "getsockname", "getpeername", + "getsockopt", "setsockopt", + } + ) + resource_limits: ResourceLimits = field(default_factory=ResourceLimits) + environment: Optional[Dict[str, str]] = None + + +def check_landlock_support() -> bool: + """Check if Landlock is supported by the kernel (Linux 5.13+).""" + if not LANDLOCK_AVAILABLE: + return False + try: + import platform + + release = platform.release() + parts = release.split(".") + if len(parts) >= 2: + major, minor = int(parts[0]), int(parts[1].split("-")[0]) + return (major, minor) >= (5, 13) + return False + except Exception: + return False + + +def check_seccomp_support() -> bool: + """Check if seccomp is supported.""" + if not SECCOMP_AVAILABLE: + return False + try: + return os.path.exists("/proc/sys/kernel/seccomp") + except Exception: + return False + + +def apply_resource_limits(limits: ResourceLimits) -> Dict[str, bool]: + """Apply resource limits using setrlimit.""" + results = {} + limit_map = [ + ("max_processes", resource.RLIMIT_NPROC, limits.max_processes), + ("max_file_size", resource.RLIMIT_FSIZE, limits.max_file_size), + ("max_memory", resource.RLIMIT_AS, limits.max_memory), + ("max_cpu_time", resource.RLIMIT_CPU, limits.max_cpu_time), + ("max_open_files", resource.RLIMIT_NOFILE, limits.max_open_files), + ("max_data_size", resource.RLIMIT_DATA, limits.max_data_size), + ("max_stack_size", resource.RLIMIT_STACK, limits.max_stack_size), + ] + + for name, rlimit_type, value in limit_map: + try: + soft, hard = resource.getrlimit(rlimit_type) + if hard == resource.RLIM_INFINITY or value <= hard: + new_soft = min(value, hard) if hard != resource.RLIM_INFINITY else value + resource.setrlimit(rlimit_type, (new_soft, hard)) + results[name] = True + else: + results[name] = False + except (ValueError, resource.error, OSError): + results[name] = False + + return results + + +def apply_landlock_restrictions(config: SandboxConfig) -> bool: + """Apply Landlock filesystem restrictions.""" + if not LANDLOCK_AVAILABLE: + print("Landlock module not installed (pip install landlock)", file=sys.stderr) + return False + + try: + import landlock as ll + + ruleset = ll.Ruleset() + skip_paths = {"/dev/stdin", "/dev/stdout", "/dev/stderr", "/dev/fd"} + + dir_paths: Set[str] = set() + file_paths: Set[str] = set() + + def collect_paths(paths: List[str]) -> None: + for path in paths: + if path in skip_paths or not os.path.exists(path): + continue + try: + real_path = os.path.realpath(path) + if real_path.startswith("/proc"): + continue + if os.path.isdir(real_path): + dir_paths.add(real_path) + else: + file_paths.add(real_path) + except OSError: + continue + + collect_paths(config.read_paths) + collect_paths(config.write_paths) + collect_paths(config.read_write_paths) + collect_paths(config.device_paths) + + paths_added = 0 + for path in sorted(dir_paths) + sorted(file_paths): + try: + ruleset.allow(path) + paths_added += 1 + except Exception: + pass + + if paths_added == 0: + print( + "Warning: No paths could be added to Landlock ruleset", file=sys.stderr + ) + return False + + try: + ruleset.apply() + except Exception as apply_error: + if ") = 0" in str(apply_error): + return True + raise + + return True + + except Exception as e: + if ") = 0" in str(e): + return True + print(f"Failed to apply Landlock restrictions: {e}", file=sys.stderr) + return False + + +def apply_seccomp_restrictions(config: SandboxConfig) -> bool: + """Apply seccomp syscall filtering.""" + if not SECCOMP_AVAILABLE or seccomp is None: + print("seccomp module not installed (pip install pyseccomp)", file=sys.stderr) + return False + + try: + if config.use_seccomp_allowlist: + f = seccomp.SyscallFilter(seccomp.ERRNO(1)) + for syscall_name in config.allowed_syscalls: + try: + f.add_rule(seccomp.ALLOW, syscall_name) + except Exception: + pass + if config.allow_network: + for syscall_name in config.network_syscalls: + try: + f.add_rule(seccomp.ALLOW, syscall_name) + except Exception: + pass + else: + f = seccomp.SyscallFilter(seccomp.ALLOW) + for syscall_name in config.blocked_syscalls: + try: + f.add_rule(seccomp.ERRNO(1), syscall_name) + except Exception: + pass + if not config.allow_network: + for syscall_name in config.network_syscalls: + try: + f.add_rule(seccomp.ERRNO(1), syscall_name) + except Exception: + pass + + f.load() + return True + + except Exception as e: + print(f"Failed to apply seccomp restrictions: {e}", file=sys.stderr) + return False + + +def create_sandbox(config: Optional[SandboxConfig] = None) -> dict: + """Create a sandbox with the given configuration.""" + if config is None: + config = SandboxConfig() + + results = { + "landlock_supported": check_landlock_support(), + "landlock_applied": False, + "seccomp_supported": check_seccomp_support(), + "seccomp_applied": False, + "resource_limits": {}, + } + + results["resource_limits"] = apply_resource_limits(config.resource_limits) + + if results["landlock_supported"]: + results["landlock_applied"] = apply_landlock_restrictions(config) + + if results["seccomp_supported"]: + results["seccomp_applied"] = apply_seccomp_restrictions(config) + + return results + + +def run_sandboxed( + command: List[str], + config: Optional[SandboxConfig] = None, + timeout: Optional[float] = 30.0, + cwd: Optional[str] = None, +) -> subprocess.CompletedProcess: + """Run a command in a sandboxed subprocess.""" + if config is None: + config = SandboxConfig() + + env = config.environment if config.environment is not None else SAFE_ENV.copy() + + sandbox_script = f''' +import sys +import os +import resource + +limits = [ + (resource.RLIMIT_FSIZE, {config.resource_limits.max_file_size}), + (resource.RLIMIT_CPU, {config.resource_limits.max_cpu_time}), + (resource.RLIMIT_NOFILE, {config.resource_limits.max_open_files}), + (resource.RLIMIT_DATA, {config.resource_limits.max_data_size}), + (resource.RLIMIT_STACK, {config.resource_limits.max_stack_size}), +] +for rlimit_type, value in limits: + try: + soft, hard = resource.getrlimit(rlimit_type) + new_value = min(value, hard) if hard != resource.RLIM_INFINITY else value + resource.setrlimit(rlimit_type, (new_value, hard)) + except: + pass + +sys.path.insert(0, {repr(os.path.dirname(os.path.abspath(__file__)))}) + +try: + from sandbox import apply_landlock_restrictions, apply_seccomp_restrictions, SandboxConfig + + config = SandboxConfig( + read_paths={repr(config.read_paths)}, + write_paths={repr(config.write_paths)}, + read_write_paths={repr(config.read_write_paths)}, + device_paths={repr(config.device_paths)}, + allow_network={repr(config.allow_network)}, + use_seccomp_allowlist=False, + ) + + try: + apply_landlock_restrictions(config) + except Exception as e: + print(f"Landlock: {{e}}", file=sys.stderr) + + try: + apply_seccomp_restrictions(config) + except Exception as e: + print(f"Seccomp: {{e}}", file=sys.stderr) + +except ImportError as e: + print(f"Sandbox import error: {{e}}", file=sys.stderr) + +import subprocess +cmd = {repr(command)} +try: + proc = subprocess.run(cmd, capture_output=True, text=True, cwd={repr(cwd)}) + print(proc.stdout, end="") + print(proc.stderr, end="", file=sys.stderr) + sys.exit(proc.returncode) +except Exception as e: + print(f"Execution error: {{e}}", file=sys.stderr) + sys.exit(1) +''' + + try: + return subprocess.run( + [sys.executable, "-c", sandbox_script], + capture_output=True, + text=True, + timeout=timeout, + cwd=cwd, + env=env, + ) + except subprocess.TimeoutExpired as e: + return subprocess.CompletedProcess( + args=command, + returncode=-1, + stdout=e.stdout or "", + stderr=f"Command timed out after {timeout} seconds\n" + (e.stderr or ""), + ) + except Exception as e: + return subprocess.CompletedProcess( + args=command, + returncode=-1, + stdout="", + stderr=f"Failed to run sandboxed command: {e}\n", + ) + + +def run_command_sandboxed( + command: str, + allow_network: bool = False, + extra_read_paths: Optional[List[str]] = None, + extra_write_paths: Optional[List[str]] = None, + timeout: float = 30.0, + cwd: Optional[str] = None, + max_memory_mb: int = 512, + max_processes: int = 50, +) -> subprocess.CompletedProcess: + """Run a shell command in a sandbox.""" + import shlex + + resource_limits = ResourceLimits( + max_memory=max_memory_mb * 1024 * 1024, + max_processes=max_processes, + max_cpu_time=int(timeout) + 5, + ) + + config = SandboxConfig( + allow_network=allow_network, + resource_limits=resource_limits, + ) + + if extra_read_paths: + config.read_paths.extend(extra_read_paths) + if extra_write_paths: + config.write_paths.extend(extra_write_paths) + + try: + config.read_paths.append(os.getcwd()) + except OSError: + pass + + return run_sandboxed( + command=shlex.split(command), + config=config, + timeout=timeout, + cwd=cwd, + ) + + +def get_sandbox_info() -> dict: + """Get information about sandbox capabilities on this system.""" + import platform + + return { + "platform": platform.system(), + "kernel_release": platform.release(), + "landlock": { + "module_available": LANDLOCK_AVAILABLE, + "kernel_support": check_landlock_support(), + }, + "seccomp": { + "module_available": SECCOMP_AVAILABLE, + "kernel_support": check_seccomp_support(), + }, + "resource_limits_available": True, + } + + +if __name__ == "__main__": + import json + + print("=== Sandbox System Information ===") + print(json.dumps(get_sandbox_info(), indent=2)) + + print("\n=== Test: Run 'echo hello' ===") + result = run_command_sandboxed("echo hello") + print(f"stdout: {result.stdout}") + print(f"stderr: {result.stderr}") + print(f"returncode: {result.returncode}") + + print("\n=== Test: Try to read /etc/passwd (should work) ===") + result = run_command_sandboxed("head -1 /etc/passwd") + print(f"stdout: {result.stdout}") + print(f"returncode: {result.returncode}") + + print("\n=== Test: Try to write to /etc (should fail) ===") + result = run_command_sandboxed("touch /etc/test_file") + print(f"stderr: {result.stderr}") + print(f"returncode: {result.returncode}") + + print("\n=== Test: Try to access /proc/self/environ ===") + result = run_command_sandboxed("cat /proc/self/environ") + print(f"stderr: {result.stderr[:200] if result.stderr else 'none'}...") + print(f"returncode: {result.returncode}") + + print("\n=== Test: Network access (should fail) ===") + result = run_command_sandboxed("curl -s https://example.com", timeout=5) + print(f"stderr: {result.stderr[:200] if result.stderr else 'none'}...") + print(f"returncode: {result.returncode}") From 72b3eedd4a4ab10c300863a3f80fcdb31d01317b Mon Sep 17 00:00:00 2001 From: Niels Bantilan Date: Fri, 20 Feb 2026 14:36:15 -0500 Subject: [PATCH 2/3] updates Signed-off-by: Niels Bantilan --- .../interactive_sandbox_app/fastapi_app.py | 267 +++++++--- .../interactive_sandbox_app.py | 33 +- .../apps/interactive_sandbox_app/sandbox.py | 496 ++++++++++++++---- 3 files changed, 617 insertions(+), 179 deletions(-) diff --git a/examples/apps/interactive_sandbox_app/fastapi_app.py b/examples/apps/interactive_sandbox_app/fastapi_app.py index fe9341ede..31de34d13 100644 --- a/examples/apps/interactive_sandbox_app/fastapi_app.py +++ b/examples/apps/interactive_sandbox_app/fastapi_app.py @@ -2,6 +2,7 @@ import os import shlex +import json import subprocess from typing import List, Optional @@ -9,13 +10,12 @@ from pydantic import BaseModel, Field from sandbox import ( - LANDLOCK_AVAILABLE, SAFE_ENV, - SECCOMP_AVAILABLE, check_landlock_support, check_seccomp_support, get_sandbox_info, run_command_sandboxed, + run_python_sandboxed, ) @@ -25,11 +25,18 @@ version="1.0.0", ) -SANDBOX_MODE = os.environ.get("SANDBOX_MODE", "full").lower() DEFAULT_TIMEOUT = float(os.environ.get("COMMAND_TIMEOUT", "30")) ALLOW_NETWORK = os.environ.get("ALLOW_NETWORK", "false").lower() == "true" MAX_MEMORY_MB = int(os.environ.get("MAX_MEMORY_MB", "512")) MAX_PROCESSES = int(os.environ.get("MAX_PROCESSES", "50")) +EXTRA_READ_PATHS: Optional[List[str]] = None +EXTRA_WRITE_PATHS: Optional[List[str]] = None +_extra_read = os.environ.get("EXTRA_READ_PATHS", "") +_extra_write = os.environ.get("EXTRA_WRITE_PATHS", "") +if _extra_read: + EXTRA_READ_PATHS = json.loads(_extra_read) +if _extra_write: + EXTRA_WRITE_PATHS = json.loads(_extra_write) class CommandResponse(BaseModel): @@ -38,11 +45,21 @@ class CommandResponse(BaseModel): returncode: int +class PythonRequest(BaseModel): + code: str = Field(..., description="Python code to execute") + timeout: Optional[float] = Field(None, description="Execution timeout in seconds") + + +class PythonResponse(BaseModel): + result: Optional[str] = Field(None, description="String representation of the result") + stdout: str = Field(default="", description="Captured stdout") + stderr: str = Field(default="", description="Captured stderr") + returncode: int = Field(description="0 for success, non-zero for errors") + error: Optional[str] = Field(None, description="Error message if execution failed") + + class SandboxStatus(BaseModel): - sandbox_mode: str - landlock_module_available: bool landlock_kernel_support: bool - seccomp_module_available: bool seccomp_kernel_support: bool allow_network: bool default_timeout: float @@ -61,10 +78,6 @@ class SystemInfo(BaseModel): resource_limits_available: bool -class CommandRequest(BaseModel): - command: str = Field(..., description="Shell command to execute") - - @app.get("/health") async def health() -> dict: return {"status": "healthy"} @@ -73,10 +86,7 @@ async def health() -> dict: @app.get("/sandbox/status", response_model=SandboxStatus) async def sandbox_status() -> SandboxStatus: return SandboxStatus( - sandbox_mode=SANDBOX_MODE, - landlock_module_available=LANDLOCK_AVAILABLE, landlock_kernel_support=check_landlock_support(), - seccomp_module_available=SECCOMP_AVAILABLE, seccomp_kernel_support=check_seccomp_support(), allow_network=ALLOW_NETWORK, default_timeout=DEFAULT_TIMEOUT, @@ -100,23 +110,13 @@ async def run_command(command: str) -> CommandResponse: if not command.strip(): raise HTTPException(status_code=400, detail="Command cannot be empty") - if SANDBOX_MODE == "none": - result = subprocess.run( - shlex.split(command), - capture_output=True, - text=True, - shell=False, - timeout=DEFAULT_TIMEOUT, - env=SAFE_ENV, - ) - else: - result = run_command_sandboxed( - command=command, - allow_network=ALLOW_NETWORK, - timeout=DEFAULT_TIMEOUT, - max_memory_mb=MAX_MEMORY_MB, - max_processes=MAX_PROCESSES, - ) + result = run_command_sandboxed( + command=command, + allow_network=ALLOW_NETWORK, + timeout=DEFAULT_TIMEOUT, + max_memory_mb=MAX_MEMORY_MB, + max_processes=MAX_PROCESSES, + ) return CommandResponse( stdout=result.stdout, @@ -125,51 +125,170 @@ async def run_command(command: str) -> CommandResponse: ) -@app.post("/run/advanced", response_model=CommandResponse) -async def run_command_advanced(request: CommandRequest) -> CommandResponse: - """Run a command with custom sandbox settings.""" - if not request.command.strip(): - raise HTTPException(status_code=400, detail="Command cannot be empty") +@app.post("/python", response_model=PythonResponse) +async def run_python(request: PythonRequest) -> PythonResponse: + """Run Python code in a sandboxed subprocess. + + The code can be either: + - An expression (e.g., "2 + 2", "[x**2 for x in range(10)]") + - A statement or multiple statements (e.g., "x = 5\\nprint(x)") + + For expressions, the result is returned in the 'result' field. + For statements, any output goes to stdout/stderr. + """ + if not request.code.strip(): + raise HTTPException(status_code=400, detail="Code cannot be empty") + + timeout = request.timeout if request.timeout is not None else DEFAULT_TIMEOUT - timeout = request.timeout or DEFAULT_TIMEOUT - allow_network = ( - request.allow_network if request.allow_network is not None else ALLOW_NETWORK + result = run_python_sandboxed( + code=request.code, + allow_network=ALLOW_NETWORK, + timeout=timeout, + max_memory_mb=MAX_MEMORY_MB, + max_processes=MAX_PROCESSES, ) - max_memory = request.max_memory_mb or MAX_MEMORY_MB - max_procs = request.max_processes or MAX_PROCESSES - - if SANDBOX_MODE == "none": - try: - result = subprocess.run( - shlex.split(request.command), - capture_output=True, - text=True, - shell=False, - timeout=timeout, - env=SAFE_ENV, - ) - except subprocess.TimeoutExpired: - return CommandResponse( - stdout="", - stderr=f"Command timed out after {timeout} seconds", - returncode=-1, - ) - else: - result = run_command_sandboxed( - command=request.command, - allow_network=allow_network, - extra_read_paths=request.extra_read_paths, - extra_write_paths=request.extra_write_paths, - timeout=timeout, - max_memory_mb=max_memory, - max_processes=max_procs, - ) - return CommandResponse( - stdout=result.stdout, - stderr=result.stderr, - returncode=result.returncode, + return PythonResponse( + result=result.get("result"), + stdout=result.get("stdout", ""), + stderr=result.get("stderr", ""), + returncode=result.get("returncode", 1), + error=result.get("error"), + ) + + +@app.get("/sandbox/python/test") +async def test_python_sandbox() -> dict: + """Run security tests to verify Python sandbox restrictions.""" + tests = {} + + # Test 1: Basic expression evaluation + result = run_python_sandboxed("2 + 2", timeout=5) + tests["basic_expression"] = { + "passed": result.get("returncode") == 0 and result.get("result") == "4", + "description": "Basic arithmetic expression", + "result": result.get("result"), + "returncode": result.get("returncode"), + } + + # Test 2: List comprehension + result = run_python_sandboxed("[x**2 for x in range(5)]", timeout=5) + tests["list_comprehension"] = { + "passed": result.get("returncode") == 0 and result.get("result") == "[0, 1, 4, 9, 16]", + "description": "List comprehension", + "result": result.get("result"), + "returncode": result.get("returncode"), + } + + # Test 3: Print statement (stdout capture) + result = run_python_sandboxed("print('hello sandbox')", timeout=5) + tests["print_statement"] = { + "passed": result.get("returncode") == 0 and "hello sandbox" in result.get("stdout", ""), + "description": "Print statement with stdout capture", + "stdout": result.get("stdout", "").strip(), + "returncode": result.get("returncode"), + } + + # Test 4: Multi-line code + result = run_python_sandboxed("x = 10\ny = 20\nprint(x + y)", timeout=5) + tests["multiline_code"] = { + "passed": result.get("returncode") == 0 and "30" in result.get("stdout", ""), + "description": "Multi-line code execution", + "stdout": result.get("stdout", "").strip(), + "returncode": result.get("returncode"), + } + + # Test 5: Import standard library (should work) + result = run_python_sandboxed("import math; print(math.sqrt(16))", timeout=5) + tests["import_stdlib"] = { + "passed": result.get("returncode") == 0 and "4.0" in result.get("stdout", ""), + "description": "Import standard library module", + "stdout": result.get("stdout", "").strip(), + "returncode": result.get("returncode"), + } + + # Test 6: Try to read /etc/passwd (should work - read allowed) + result = run_python_sandboxed("open('/etc/passwd').readline()", timeout=5) + tests["read_etc_passwd"] = { + "passed": result.get("returncode") == 0 and result.get("result") is not None, + "description": "Reading /etc/passwd should be allowed", + "result": result.get("result", "")[:50] + "..." if result.get("result") else None, + "returncode": result.get("returncode"), + } + + # Test 7: Try to write to /etc (should fail - write blocked) + result = run_python_sandboxed("open('/etc/sandbox_test', 'w').write('test')", timeout=5) + tests["write_etc_blocked"] = { + "passed": result.get("returncode") != 0 or result.get("error") is not None, + "description": "Writing to /etc should be blocked", + "error": result.get("error"), + "returncode": result.get("returncode"), + } + + # Test 8: Try to access /proc/self/environ (should fail - blocked path) + result = run_python_sandboxed("open('/proc/self/environ').read()", timeout=5) + tests["proc_environ_blocked"] = { + "passed": result.get("returncode") != 0 or result.get("error") is not None, + "description": "/proc access should be blocked", + "error": result.get("error"), + "returncode": result.get("returncode"), + } + + # Test 9: Try to use subprocess (should fail if seccomp blocks fork/exec) + result = run_python_sandboxed( + "import subprocess; subprocess.run(['echo', 'test'], capture_output=True).stdout", + timeout=5 ) + tests["subprocess_restricted"] = { + "passed": True, # This may or may not work depending on seccomp config + "description": "Subprocess execution (may be restricted)", + "result": result.get("result"), + "error": result.get("error"), + "returncode": result.get("returncode"), + } + + # Test 10: Try network access (should fail if network disabled) + if not ALLOW_NETWORK: + result = run_python_sandboxed( + "import socket; s = socket.socket(); s.connect(('example.com', 80))", + timeout=5 + ) + tests["network_blocked"] = { + "passed": result.get("returncode") != 0 or result.get("error") is not None, + "description": "Network access should be blocked", + "error": result.get("error"), + "returncode": result.get("returncode"), + } + + # Test 11: Exception handling + result = run_python_sandboxed("1/0", timeout=5) + tests["exception_handling"] = { + "passed": result.get("returncode") != 0 and "ZeroDivisionError" in (result.get("error") or ""), + "description": "Exception should be caught and reported", + "error": result.get("error"), + "returncode": result.get("returncode"), + } + + # Test 12: Syntax error handling + result = run_python_sandboxed("def foo(", timeout=5) + tests["syntax_error"] = { + "passed": result.get("returncode") != 0 and "SyntaxError" in (result.get("error") or ""), + "description": "Syntax errors should be caught and reported", + "error": result.get("error"), + "returncode": result.get("returncode"), + } + + passed = sum(1 for t in tests.values() if t.get("passed", False)) + total = len(tests) + + return { + "summary": f"{passed}/{total} tests passed", + "landlock_available": check_landlock_support(), + "seccomp_available": check_seccomp_support(), + "network_allowed": ALLOW_NETWORK, + "tests": tests, + } @app.get("/sandbox/test") @@ -219,9 +338,10 @@ async def test_sandbox() -> dict: result = run_command_sandboxed("cat /proc/self/environ", timeout=5) tests["proc_environ_blocked"] = { - "passed": result.returncode != 0 or not result.stdout, - "description": "/proc access should be restricted", + "passed": result.returncode != 0, + "description": "/proc access should be blocked", "returncode": result.returncode, + "stderr": result.stderr.strip() if result.stderr else "", } passed = sum(1 for t in tests.values() if t.get("passed", False)) @@ -229,7 +349,6 @@ async def test_sandbox() -> dict: return { "summary": f"{passed}/{total} tests passed", - "sandbox_mode": SANDBOX_MODE, "landlock_available": check_landlock_support(), "seccomp_available": check_seccomp_support(), "tests": tests, diff --git a/examples/apps/interactive_sandbox_app/interactive_sandbox_app.py b/examples/apps/interactive_sandbox_app/interactive_sandbox_app.py index b23c468d8..474058fa6 100644 --- a/examples/apps/interactive_sandbox_app/interactive_sandbox_app.py +++ b/examples/apps/interactive_sandbox_app/interactive_sandbox_app.py @@ -10,7 +10,6 @@ - Clean environment: Only safe environment variables passed to commands Environment Variables: - - SANDBOX_MODE: "full" (default) or "none" to disable - ALLOW_NETWORK: "true" or "false" (default: false) - COMMAND_TIMEOUT: timeout in seconds (default: 30) - MAX_MEMORY_MB: max memory per command in MB (default: 512) @@ -22,27 +21,33 @@ - GET /sandbox/info - System sandbox capabilities - GET /sandbox/test - Run security tests - POST /run?command=... - Run a command - - POST /run/advanced - Run with custom settings """ import flyte -from flyte.app import AppEnvironment +from flyte.app import AppEnvironment, Parameter image = ( flyte.Image.from_debian_base(name="interactive-sandbox-image") .with_apt_packages("curl", "ca-certificates", "libseccomp2", "libseccomp-dev") - .with_commands(["ldconfig"]) - .with_pip_packages("fastapi", "uvicorn", "pyseccomp", "landlock") + .with_pip_packages("fastapi>=0.128.3", "uvicorn>=0.34.0", "pyseccomp", "landlock") ) interactive_sandbox_app = AppEnvironment( - name="interactive-sandbox-app", + name="interactive-sandbox-app-0", description="A FastAPI app for running shell commands with sandboxing.", image=image, resources=flyte.Resources(cpu=1, memory="512Mi"), requires_auth=False, include=["fastapi_app.py", "sandbox.py"], args=["uvicorn", "fastapi_app:app", "--host", "0.0.0.0", "--port", "8080"], + parameters=[ + Parameter(name="extra_read_paths", value="[]", env_var="EXTRA_READ_PATHS"), + Parameter(name="extra_write_paths", value="[]", env_var="EXTRA_WRITE_PATHS"), + Parameter(name="max_memory_mb", value="512", env_var="MAX_MEMORY_MB"), + Parameter(name="max_processes", value="50", env_var="MAX_PROCESSES"), + Parameter(name="timeout", value="30", env_var="COMMAND_TIMEOUT"), + Parameter(name="allow_network", value="false", env_var="ALLOW_NETWORK"), + ] ) @@ -52,3 +57,19 @@ flyte.init_from_config(log_level=logging.DEBUG) app_handle = flyte.serve(interactive_sandbox_app) print(f"App URL: {app_handle.url}") + + # Allowed to run: ✅ + # curl -X POST "https://broken-sunset-fcb75.apps.demo.hosted.unionai.cloud/run?command=echo+%27hello%27" + + # Not allowed to run: ❌ + # Network access: + # curl -X POST "https://broken-sunset-fcb75.apps.demo.hosted.unionai.cloud/run?command=curl+-s+https://example.com" + # + # Write to protected path (/etc): + # curl -X POST "https://broken-sunset-fcb75.apps.demo.hosted.unionai.cloud/run?command=touch+/etc/test_file" + # + # Read /proc/self/environ (restricted): + # curl -X POST "https://broken-sunset-fcb75.apps.demo.hosted.unionai.cloud/run?command=cat+/proc/self/environ" + # + # Attempt to mount (blocked syscall): + # curl -X POST "https://broken-sunset-fcb75.apps.demo.hosted.unionai.cloud/run?command=mount+-t+tmpfs+tmpfs+/mnt" diff --git a/examples/apps/interactive_sandbox_app/sandbox.py b/examples/apps/interactive_sandbox_app/sandbox.py index 5c7b1124e..4b5f84511 100644 --- a/examples/apps/interactive_sandbox_app/sandbox.py +++ b/examples/apps/interactive_sandbox_app/sandbox.py @@ -12,54 +12,32 @@ from dataclasses import dataclass, field from typing import Dict, List, Optional, Set -LANDLOCK_AVAILABLE = False -try: - import landlock - - LANDLOCK_AVAILABLE = True -except ImportError: - pass +import landlock -SECCOMP_AVAILABLE = False -seccomp = None try: - import seccomp as _seccomp - - seccomp = _seccomp - SECCOMP_AVAILABLE = True -except ImportError: - try: - import pyseccomp as _seccomp - - seccomp = _seccomp - SECCOMP_AVAILABLE = True - except (ImportError, RuntimeError): - try: - import ctypes.util - - _original_find_library = ctypes.util.find_library - - def _patched_find_library(name): - result = _original_find_library(name) - if result is None and name == "seccomp": - for pattern in [ - "/usr/lib/*/libseccomp.so*", - "/lib/*/libseccomp.so*", - "/usr/lib/libseccomp.so*", - "/lib/libseccomp.so*", - ]: - matches = glob.glob(pattern) - if matches: - return matches[0] - return result - - ctypes.util.find_library = _patched_find_library - import pyseccomp as _seccomp - - seccomp = _seccomp - SECCOMP_AVAILABLE = True - except (ImportError, RuntimeError, OSError): - pass + import ctypes.util + + _original_find_library = ctypes.util.find_library + + def _patched_find_library(name): + result = _original_find_library(name) + if result is None and name == "seccomp": + for pattern in [ + "/usr/lib/*/libseccomp.so*", + "/lib/*/libseccomp.so*", + "/usr/lib/libseccomp.so*", + "/lib/libseccomp.so*", + ]: + matches = glob.glob(pattern) + if matches: + return matches[0] + return result + + ctypes.util.find_library = _patched_find_library + import pyseccomp as seccomp + +except (ImportError, RuntimeError, OSError): + pass SAFE_ENV = { @@ -74,7 +52,7 @@ def _patched_find_library(name): @dataclass class ResourceLimits: - max_processes: int = 256 + max_processes: int = 64 max_file_size: int = 10 * 1024 * 1024 # 10 MB max_memory: int = 1024 * 1024 * 1024 # 1 GB max_cpu_time: int = 30 @@ -86,26 +64,9 @@ class ResourceLimits: @dataclass class SandboxConfig: read_paths: List[str] = field( - default_factory=lambda: [ - "/usr", - "/lib", - "/lib64", - "/lib32", - "/bin", - "/sbin", - "/etc/passwd", - "/etc/group", - "/etc/hosts", - "/etc/resolv.conf", - "/etc/ssl", - "/etc/ca-certificates", - "/etc/localtime", - "/etc/ld.so.cache", - "/etc/ld.so.conf", - "/etc/ld.so.conf.d", - ] + default_factory=lambda: ["/tmp", "/var/inputs"] ) - write_paths: List[str] = field(default_factory=lambda: ["/tmp"]) + write_paths: List[str] = field(default_factory=lambda: ["/tmp", "/var/outputs"]) read_write_paths: List[str] = field(default_factory=list) device_paths: List[str] = field( default_factory=lambda: [ @@ -113,8 +74,14 @@ class SandboxConfig: "/dev/zero", "/dev/urandom", "/dev/random", - "/dev/tty", - "/dev/pts", + ] + ) + blocked_path_patterns: List[str] = field( + default_factory=lambda: [ + "/proc/", + "/sys/", + "/run/secrets", + "/var/run/secrets", ] ) use_seccomp_allowlist: bool = True @@ -180,6 +147,8 @@ class SandboxConfig: "modify_ldt", "vm86", "vm86old", "seccomp", + "memfd_create", "memfd_secret", + "io_uring_setup", "io_uring_enter", "io_uring_register", } ) allow_network: bool = False @@ -200,10 +169,24 @@ class SandboxConfig: environment: Optional[Dict[str, str]] = None +def check_blocked_paths(command: List[str], blocked_patterns: List[str]) -> Optional[str]: + """Check if a command references any blocked path patterns. + + Returns the blocked pattern if found, None otherwise. + """ + import shlex + + command_str = " ".join(command) + + for pattern in blocked_patterns: + if pattern in command_str: + return pattern + + return None + + def check_landlock_support() -> bool: """Check if Landlock is supported by the kernel (Linux 5.13+).""" - if not LANDLOCK_AVAILABLE: - return False try: import platform @@ -219,8 +202,6 @@ def check_landlock_support() -> bool: def check_seccomp_support() -> bool: """Check if seccomp is supported.""" - if not SECCOMP_AVAILABLE: - return False try: return os.path.exists("/proc/sys/kernel/seccomp") except Exception: @@ -257,9 +238,6 @@ def apply_resource_limits(limits: ResourceLimits) -> Dict[str, bool]: def apply_landlock_restrictions(config: SandboxConfig) -> bool: """Apply Landlock filesystem restrictions.""" - if not LANDLOCK_AVAILABLE: - print("Landlock module not installed (pip install landlock)", file=sys.stderr) - return False try: import landlock as ll @@ -267,10 +245,16 @@ def apply_landlock_restrictions(config: SandboxConfig) -> bool: ruleset = ll.Ruleset() skip_paths = {"/dev/stdin", "/dev/stdout", "/dev/stderr", "/dev/fd"} - dir_paths: Set[str] = set() - file_paths: Set[str] = set() + read_dir_paths: Set[str] = set() + read_file_paths: Set[str] = set() + write_dir_paths: Set[str] = set() + write_file_paths: Set[str] = set() + rw_dir_paths: Set[str] = set() + rw_file_paths: Set[str] = set() - def collect_paths(paths: List[str]) -> None: + def collect_paths( + paths: List[str], dir_set: Set[str], file_set: Set[str] + ) -> None: for path in paths: if path in skip_paths or not os.path.exists(path): continue @@ -279,21 +263,72 @@ def collect_paths(paths: List[str]) -> None: if real_path.startswith("/proc"): continue if os.path.isdir(real_path): - dir_paths.add(real_path) + dir_set.add(real_path) else: - file_paths.add(real_path) + file_set.add(real_path) except OSError: continue - collect_paths(config.read_paths) - collect_paths(config.write_paths) - collect_paths(config.read_write_paths) - collect_paths(config.device_paths) + collect_paths(config.read_paths, read_dir_paths, read_file_paths) + collect_paths(config.write_paths, write_dir_paths, write_file_paths) + collect_paths(config.read_write_paths, rw_dir_paths, rw_file_paths) + collect_paths(config.device_paths, read_dir_paths, read_file_paths) paths_added = 0 - for path in sorted(dir_paths) + sorted(file_paths): + + read_file_access = ll.FSAccess.ReadFile + read_dir_access = ll.FSAccess.ReadFile | ll.FSAccess.ReadDir + write_file_access = ( + ll.FSAccess.WriteFile | ll.FSAccess.Truncate + ) + write_dir_access = ( + ll.FSAccess.WriteFile + | ll.FSAccess.Truncate + | ll.FSAccess.RemoveFile + | ll.FSAccess.RemoveDir + | ll.FSAccess.MakeDir + ) + rw_file_access = read_file_access | write_file_access + rw_dir_access = read_dir_access | write_dir_access + + for path in sorted(read_dir_paths): + try: + ruleset.allow(path, read_dir_access) + paths_added += 1 + except Exception: + pass + + for path in sorted(read_file_paths): + try: + ruleset.allow(path, read_file_access) + paths_added += 1 + except Exception: + pass + + for path in sorted(write_dir_paths): + try: + ruleset.allow(path, write_dir_access) + paths_added += 1 + except Exception: + pass + + for path in sorted(write_file_paths): try: - ruleset.allow(path) + ruleset.allow(path, write_file_access) + paths_added += 1 + except Exception: + pass + + for path in sorted(rw_dir_paths): + try: + ruleset.allow(path, rw_dir_access) + paths_added += 1 + except Exception: + pass + + for path in sorted(rw_file_paths): + try: + ruleset.allow(path, rw_file_access) paths_added += 1 except Exception: pass @@ -322,10 +357,6 @@ def collect_paths(paths: List[str]) -> None: def apply_seccomp_restrictions(config: SandboxConfig) -> bool: """Apply seccomp syscall filtering.""" - if not SECCOMP_AVAILABLE or seccomp is None: - print("seccomp module not installed (pip install pyseccomp)", file=sys.stderr) - return False - try: if config.use_seccomp_allowlist: f = seccomp.SyscallFilter(seccomp.ERRNO(1)) @@ -396,6 +427,15 @@ def run_sandboxed( if config is None: config = SandboxConfig() + blocked = check_blocked_paths(command, config.blocked_path_patterns) + if blocked: + return subprocess.CompletedProcess( + args=command, + returncode=1, + stdout="", + stderr=f"Access denied: path pattern '{blocked}' is blocked by sandbox policy\n", + ) + env = config.environment if config.environment is not None else SAFE_ENV.copy() sandbox_script = f''' @@ -444,6 +484,7 @@ def run_sandboxed( except ImportError as e: print(f"Sandbox import error: {{e}}", file=sys.stderr) + sys.exit(1) import subprocess cmd = {repr(command)} @@ -491,9 +532,26 @@ def run_command_sandboxed( cwd: Optional[str] = None, max_memory_mb: int = 512, max_processes: int = 50, + use_isolated_tmp: bool = True, ) -> subprocess.CompletedProcess: - """Run a shell command in a sandbox.""" + """Run a shell command in a sandbox. + + Args: + command: Shell command to execute. + allow_network: Whether to allow network syscalls. + extra_read_paths: Additional paths to allow reading. + extra_write_paths: Additional paths to allow writing. + timeout: Maximum execution time in seconds. + cwd: Working directory for the command. + max_memory_mb: Maximum memory in megabytes. + max_processes: Maximum number of processes. + use_isolated_tmp: If True, create an isolated temp directory instead of + using shared /tmp. This prevents symlink attacks and interference + between sandboxed processes. + """ import shlex + import tempfile + import uuid resource_limits = ResourceLimits( max_memory=max_memory_mb * 1024 * 1024, @@ -506,6 +564,16 @@ def run_command_sandboxed( resource_limits=resource_limits, ) + isolated_tmp_dir = None + if use_isolated_tmp: + isolated_tmp_dir = os.path.join(tempfile.gettempdir(), f"sandbox-{uuid.uuid4()}") + os.makedirs(isolated_tmp_dir, mode=0o700, exist_ok=True) + config.write_paths = [isolated_tmp_dir] + if config.environment is None: + config.environment = SAFE_ENV.copy() + config.environment["TMPDIR"] = isolated_tmp_dir + config.environment["HOME"] = isolated_tmp_dir + if extra_read_paths: config.read_paths.extend(extra_read_paths) if extra_write_paths: @@ -516,13 +584,244 @@ def run_command_sandboxed( except OSError: pass - return run_sandboxed( - command=shlex.split(command), - config=config, - timeout=timeout, - cwd=cwd, + try: + return run_sandboxed( + command=shlex.split(command), + config=config, + timeout=timeout, + cwd=cwd, + ) + finally: + if isolated_tmp_dir and os.path.exists(isolated_tmp_dir): + import shutil + try: + shutil.rmtree(isolated_tmp_dir) + except OSError: + pass + + +def check_blocked_paths_in_code(code: str, blocked_patterns: List[str]) -> Optional[str]: + """Check if Python code references any blocked path patterns. + + Returns the blocked pattern if found, None otherwise. + """ + for pattern in blocked_patterns: + if pattern in code: + return pattern + return None + + +def run_python_sandboxed( + code: str, + allow_network: bool = False, + extra_read_paths: Optional[List[str]] = None, + extra_write_paths: Optional[List[str]] = None, + timeout: float = 30.0, + max_memory_mb: int = 512, + max_processes: int = 50, +) -> Dict: + """Run Python code in a sandbox. + + The code can be either an expression or statements. For expressions, + the result is captured and returned. For statements, stdout/stderr + are captured. + + Args: + code: Python code to execute. + allow_network: Whether to allow network syscalls. + extra_read_paths: Additional paths to allow reading. + extra_write_paths: Additional paths to allow writing. + timeout: Maximum execution time in seconds. + max_memory_mb: Maximum memory in megabytes. + max_processes: Maximum number of processes. + + Returns: + Dict with keys: result, stdout, stderr, returncode, error + """ + import json + import tempfile + import uuid + + resource_limits = ResourceLimits( + max_memory=max_memory_mb * 1024 * 1024, + max_processes=max_processes, + max_cpu_time=int(timeout) + 5, ) + config = SandboxConfig( + allow_network=allow_network, + resource_limits=resource_limits, + ) + + blocked = check_blocked_paths_in_code(code, config.blocked_path_patterns) + if blocked: + return { + "result": None, + "stdout": "", + "stderr": f"Access denied: path pattern '{blocked}' is blocked by sandbox policy\n", + "returncode": 1, + "error": f"SecurityError: Access to '{blocked}' is blocked by sandbox policy", + } + + isolated_tmp_dir = os.path.join(tempfile.gettempdir(), f"sandbox-{uuid.uuid4()}") + os.makedirs(isolated_tmp_dir, mode=0o700, exist_ok=True) + config.write_paths = [isolated_tmp_dir] + if config.environment is None: + config.environment = SAFE_ENV.copy() + config.environment["TMPDIR"] = isolated_tmp_dir + config.environment["HOME"] = isolated_tmp_dir + + if extra_read_paths: + config.read_paths.extend(extra_read_paths) + if extra_write_paths: + config.write_paths.extend(extra_write_paths) + + try: + config.read_paths.append(os.getcwd()) + except OSError: + pass + + escaped_code = json.dumps(code) + + python_script = f''' +import sys +import os +import resource +import json + +limits = [ + (resource.RLIMIT_FSIZE, {config.resource_limits.max_file_size}), + (resource.RLIMIT_CPU, {config.resource_limits.max_cpu_time}), + (resource.RLIMIT_NOFILE, {config.resource_limits.max_open_files}), + (resource.RLIMIT_DATA, {config.resource_limits.max_data_size}), + (resource.RLIMIT_STACK, {config.resource_limits.max_stack_size}), +] +for rlimit_type, value in limits: + try: + soft, hard = resource.getrlimit(rlimit_type) + new_value = min(value, hard) if hard != resource.RLIM_INFINITY else value + resource.setrlimit(rlimit_type, (new_value, hard)) + except: + pass + +sys.path.insert(0, {repr(os.path.dirname(os.path.abspath(__file__)))}) + +try: + from sandbox import apply_landlock_restrictions, apply_seccomp_restrictions, SandboxConfig + + sandbox_config = SandboxConfig( + read_paths={repr(config.read_paths)}, + write_paths={repr(config.write_paths)}, + read_write_paths={repr(config.read_write_paths)}, + device_paths={repr(config.device_paths)}, + allow_network={repr(config.allow_network)}, + use_seccomp_allowlist=False, + ) + + try: + apply_landlock_restrictions(sandbox_config) + except Exception as e: + print(f"Landlock: {{e}}", file=sys.stderr) + + try: + apply_seccomp_restrictions(sandbox_config) + except Exception as e: + print(f"Seccomp: {{e}}", file=sys.stderr) + +except ImportError as e: + print(f"Sandbox import error: {{e}}", file=sys.stderr) + sys.exit(1) + +from io import StringIO +import contextlib + +code = {escaped_code} +result_data = {{"result": None, "error": None}} + +stdout_capture = StringIO() +stderr_capture = StringIO() + +try: + with contextlib.redirect_stdout(stdout_capture), contextlib.redirect_stderr(stderr_capture): + try: + compiled = compile(code, "", "eval") + result = eval(compiled) + result_data["result"] = repr(result) + except SyntaxError: + exec(compile(code, "", "exec")) +except Exception as e: + result_data["error"] = f"{{type(e).__name__}}: {{e}}" + +result_data["stdout"] = stdout_capture.getvalue() +result_data["stderr"] = stderr_capture.getvalue() + +print("__SANDBOX_RESULT__" + json.dumps(result_data)) +''' + + env = config.environment if config.environment is not None else SAFE_ENV.copy() + + try: + proc_result = subprocess.run( + [sys.executable, "-c", python_script], + capture_output=True, + text=True, + timeout=timeout, + env=env, + ) + + stdout = proc_result.stdout + stderr = proc_result.stderr + + if "__SANDBOX_RESULT__" in stdout: + marker_pos = stdout.find("__SANDBOX_RESULT__") + json_str = stdout[marker_pos + len("__SANDBOX_RESULT__"):] + pre_output = stdout[:marker_pos] + try: + result_data = json.loads(json_str.strip()) + combined_stdout = pre_output + result_data.get("stdout", "") + combined_stderr = stderr + result_data.get("stderr", "") + return { + "result": result_data.get("result"), + "stdout": combined_stdout, + "stderr": combined_stderr, + "returncode": 0 if result_data.get("error") is None else 1, + "error": result_data.get("error"), + } + except json.JSONDecodeError: + pass + + return { + "result": None, + "stdout": stdout, + "stderr": stderr, + "returncode": proc_result.returncode, + "error": "Failed to parse sandbox result" if proc_result.returncode != 0 else None, + } + + except subprocess.TimeoutExpired as e: + return { + "result": None, + "stdout": e.stdout or "", + "stderr": f"Execution timed out after {timeout} seconds\n" + (e.stderr or ""), + "returncode": -1, + "error": f"TimeoutError: Execution timed out after {timeout} seconds", + } + except Exception as e: + return { + "result": None, + "stdout": "", + "stderr": str(e), + "returncode": -1, + "error": f"{type(e).__name__}: {e}", + } + finally: + if isolated_tmp_dir and os.path.exists(isolated_tmp_dir): + import shutil + try: + shutil.rmtree(isolated_tmp_dir) + except OSError: + pass + def get_sandbox_info() -> dict: """Get information about sandbox capabilities on this system.""" @@ -532,11 +831,9 @@ def get_sandbox_info() -> dict: "platform": platform.system(), "kernel_release": platform.release(), "landlock": { - "module_available": LANDLOCK_AVAILABLE, "kernel_support": check_landlock_support(), }, "seccomp": { - "module_available": SECCOMP_AVAILABLE, "kernel_support": check_seccomp_support(), }, "resource_limits_available": True, @@ -565,10 +862,11 @@ def get_sandbox_info() -> dict: print(f"stderr: {result.stderr}") print(f"returncode: {result.returncode}") - print("\n=== Test: Try to access /proc/self/environ ===") + print("\n=== Test: Try to access /proc/self/environ (should fail) ===") result = run_command_sandboxed("cat /proc/self/environ") print(f"stderr: {result.stderr[:200] if result.stderr else 'none'}...") print(f"returncode: {result.returncode}") + assert result.returncode != 0, "/proc access should be blocked" print("\n=== Test: Network access (should fail) ===") result = run_command_sandboxed("curl -s https://example.com", timeout=5) From 3a80a0a18bac050a1bcd846317707e7aeb021fff Mon Sep 17 00:00:00 2001 From: Niels Bantilan Date: Fri, 20 Feb 2026 15:18:52 -0500 Subject: [PATCH 3/3] lint Signed-off-by: Niels Bantilan --- .../interactive_sandbox_app/fastapi_app.py | 22 +- .../interactive_sandbox_app.py | 2 +- .../apps/interactive_sandbox_app/sandbox.py | 293 ++++++++++++------ 3 files changed, 210 insertions(+), 107 deletions(-) diff --git a/examples/apps/interactive_sandbox_app/fastapi_app.py b/examples/apps/interactive_sandbox_app/fastapi_app.py index 31de34d13..4f5a80f75 100644 --- a/examples/apps/interactive_sandbox_app/fastapi_app.py +++ b/examples/apps/interactive_sandbox_app/fastapi_app.py @@ -1,14 +1,11 @@ """FastAPI app for running commands with seccomp + Landlock sandboxing.""" -import os -import shlex import json -import subprocess +import os from typing import List, Optional from fastapi import FastAPI, HTTPException from pydantic import BaseModel, Field - from sandbox import ( SAFE_ENV, check_landlock_support, @@ -18,7 +15,6 @@ run_python_sandboxed, ) - app = FastAPI( title="Interactive Sandbox", description="A FastAPI app for running shell commands with unprivileged sandboxing.", @@ -237,8 +233,7 @@ async def test_python_sandbox() -> dict: # Test 9: Try to use subprocess (should fail if seccomp blocks fork/exec) result = run_python_sandboxed( - "import subprocess; subprocess.run(['echo', 'test'], capture_output=True).stdout", - timeout=5 + "import subprocess; subprocess.run(['echo', 'test'], capture_output=True).stdout", timeout=5 ) tests["subprocess_restricted"] = { "passed": True, # This may or may not work depending on seccomp config @@ -250,10 +245,7 @@ async def test_python_sandbox() -> dict: # Test 10: Try network access (should fail if network disabled) if not ALLOW_NETWORK: - result = run_python_sandboxed( - "import socket; s = socket.socket(); s.connect(('example.com', 80))", - timeout=5 - ) + result = run_python_sandboxed("import socket; s = socket.socket(); s.connect(('example.com', 80))", timeout=5) tests["network_blocked"] = { "passed": result.get("returncode") != 0 or result.get("error") is not None, "description": "Network access should be blocked", @@ -310,9 +302,7 @@ async def test_sandbox() -> dict: "returncode": result.returncode, } - result = run_command_sandboxed( - "touch /tmp/sandbox_test && rm /tmp/sandbox_test", timeout=5 - ) + result = run_command_sandboxed("touch /tmp/sandbox_test && rm /tmp/sandbox_test", timeout=5) tests["write_tmp"] = { "passed": result.returncode == 0, "description": "Should be able to write to /tmp", @@ -327,9 +317,7 @@ async def test_sandbox() -> dict: } if not ALLOW_NETWORK: - result = run_command_sandboxed( - "curl -s --connect-timeout 2 https://example.com", timeout=5 - ) + result = run_command_sandboxed("curl -s --connect-timeout 2 https://example.com", timeout=5) tests["network_blocked"] = { "passed": result.returncode != 0, "description": "Network access should be blocked", diff --git a/examples/apps/interactive_sandbox_app/interactive_sandbox_app.py b/examples/apps/interactive_sandbox_app/interactive_sandbox_app.py index 474058fa6..d249f9700 100644 --- a/examples/apps/interactive_sandbox_app/interactive_sandbox_app.py +++ b/examples/apps/interactive_sandbox_app/interactive_sandbox_app.py @@ -47,7 +47,7 @@ Parameter(name="max_processes", value="50", env_var="MAX_PROCESSES"), Parameter(name="timeout", value="30", env_var="COMMAND_TIMEOUT"), Parameter(name="allow_network", value="false", env_var="ALLOW_NETWORK"), - ] + ], ) diff --git a/examples/apps/interactive_sandbox_app/sandbox.py b/examples/apps/interactive_sandbox_app/sandbox.py index 4b5f84511..0c230e543 100644 --- a/examples/apps/interactive_sandbox_app/sandbox.py +++ b/examples/apps/interactive_sandbox_app/sandbox.py @@ -12,8 +12,6 @@ from dataclasses import dataclass, field from typing import Dict, List, Optional, Set -import landlock - try: import ctypes.util @@ -63,9 +61,7 @@ class ResourceLimits: @dataclass class SandboxConfig: - read_paths: List[str] = field( - default_factory=lambda: ["/tmp", "/var/inputs"] - ) + read_paths: List[str] = field(default_factory=lambda: ["/tmp", "/var/inputs"]) write_paths: List[str] = field(default_factory=lambda: ["/tmp", "/var/outputs"]) read_write_paths: List[str] = field(default_factory=list) device_paths: List[str] = field( @@ -87,82 +83,204 @@ class SandboxConfig: use_seccomp_allowlist: bool = True allowed_syscalls: Set[str] = field( default_factory=lambda: { - "read", "write", "readv", "writev", "pread64", "pwrite64", - "lseek", "close", "fstat", "stat", "lstat", "fstatat", - "newfstatat", "statx", - "open", "openat", "creat", "access", "faccessat", "faccessat2", - "readlink", "readlinkat", "getcwd", "chdir", "fchdir", - "dup", "dup2", "dup3", "fcntl", "flock", - "truncate", "ftruncate", - "getdents", "getdents64", "mkdir", "mkdirat", "rmdir", - "unlink", "unlinkat", "rename", "renameat", "renameat2", - "link", "linkat", "symlink", "symlinkat", - "mmap", "munmap", "mprotect", "mremap", "brk", - "madvise", "msync", - "fork", "vfork", "clone", "clone3", "execve", "execveat", - "wait4", "waitid", "exit", "exit_group", - "getpid", "getppid", "gettid", "getuid", "getgid", - "geteuid", "getegid", "getgroups", - "rt_sigaction", "rt_sigprocmask", "rt_sigreturn", - "sigaltstack", "kill", "tgkill", - "clock_gettime", "clock_getres", "gettimeofday", - "nanosleep", "clock_nanosleep", - "poll", "ppoll", "select", "pselect6", "epoll_create", - "epoll_create1", "epoll_ctl", "epoll_wait", "epoll_pwait", - "epoll_pwait2", "eventfd", "eventfd2", - "pipe", "pipe2", - "getrlimit", "prlimit64", "getrusage", - "uname", "sysinfo", "getrandom", - "futex", "set_robust_list", "get_robust_list", - "set_tid_address", "arch_prctl", "prctl", + "read", + "write", + "readv", + "writev", + "pread64", + "pwrite64", + "lseek", + "close", + "fstat", + "stat", + "lstat", + "fstatat", + "newfstatat", + "statx", + "open", + "openat", + "creat", + "access", + "faccessat", + "faccessat2", + "readlink", + "readlinkat", + "getcwd", + "chdir", + "fchdir", + "dup", + "dup2", + "dup3", + "fcntl", + "flock", + "truncate", + "ftruncate", + "getdents", + "getdents64", + "mkdir", + "mkdirat", + "rmdir", + "unlink", + "unlinkat", + "rename", + "renameat", + "renameat2", + "link", + "linkat", + "symlink", + "symlinkat", + "mmap", + "munmap", + "mprotect", + "mremap", + "brk", + "madvise", + "msync", + "fork", + "vfork", + "clone", + "clone3", + "execve", + "execveat", + "wait4", + "waitid", + "exit", + "exit_group", + "getpid", + "getppid", + "gettid", + "getuid", + "getgid", + "geteuid", + "getegid", + "getgroups", + "rt_sigaction", + "rt_sigprocmask", + "rt_sigreturn", + "sigaltstack", + "kill", + "tgkill", + "clock_gettime", + "clock_getres", + "gettimeofday", + "nanosleep", + "clock_nanosleep", + "poll", + "ppoll", + "select", + "pselect6", + "epoll_create", + "epoll_create1", + "epoll_ctl", + "epoll_wait", + "epoll_pwait", + "epoll_pwait2", + "eventfd", + "eventfd2", + "pipe", + "pipe2", + "getrlimit", + "prlimit64", + "getrusage", + "uname", + "sysinfo", + "getrandom", + "futex", + "set_robust_list", + "get_robust_list", + "set_tid_address", + "arch_prctl", + "prctl", "ioctl", - "sched_getaffinity", "sched_yield", + "sched_getaffinity", + "sched_yield", "rseq", } ) blocked_syscalls: Set[str] = field( default_factory=lambda: { - "setuid", "setgid", "setreuid", "setregid", - "setresuid", "setresgid", "setfsuid", "setfsgid", - "capset", "capget", - "init_module", "finit_module", "delete_module", - "mount", "umount", "umount2", "pivot_root", - "sysfs", "statfs", "fstatfs", - "ptrace", "process_vm_readv", "process_vm_writev", - "iopl", "ioperm", "ioprio_set", - "settimeofday", "clock_settime", "adjtimex", "clock_adjtime", - "reboot", "kexec_load", "kexec_file_load", - "swapon", "swapoff", - "unshare", "setns", - "add_key", "request_key", "keyctl", + "setuid", + "setgid", + "setreuid", + "setregid", + "setresuid", + "setresgid", + "setfsuid", + "setfsgid", + "capset", + "capget", + "init_module", + "finit_module", + "delete_module", + "mount", + "umount", + "umount2", + "pivot_root", + "sysfs", + "statfs", + "fstatfs", + "ptrace", + "process_vm_readv", + "process_vm_writev", + "iopl", + "ioperm", + "ioprio_set", + "settimeofday", + "clock_settime", + "adjtimex", + "clock_adjtime", + "reboot", + "kexec_load", + "kexec_file_load", + "swapon", + "swapoff", + "unshare", + "setns", + "add_key", + "request_key", + "keyctl", "bpf", "perf_event_open", "userfaultfd", "personality", "acct", - "quotactl", "quotactl_fd", + "quotactl", + "quotactl_fd", "nfsservctl", "lookup_dcookie", "vhangup", "modify_ldt", - "vm86", "vm86old", + "vm86", + "vm86old", "seccomp", - "memfd_create", "memfd_secret", - "io_uring_setup", "io_uring_enter", "io_uring_register", + "memfd_create", + "memfd_secret", + "io_uring_setup", + "io_uring_enter", + "io_uring_register", } ) allow_network: bool = False network_syscalls: Set[str] = field( default_factory=lambda: { - "socket", "socketpair", - "connect", "accept", "accept4", - "bind", "listen", - "sendto", "recvfrom", - "sendmsg", "recvmsg", - "sendmmsg", "recvmmsg", + "socket", + "socketpair", + "connect", + "accept", + "accept4", + "bind", + "listen", + "sendto", + "recvfrom", + "sendmsg", + "recvmsg", + "sendmmsg", + "recvmmsg", "shutdown", - "getsockname", "getpeername", - "getsockopt", "setsockopt", + "getsockname", + "getpeername", + "getsockopt", + "setsockopt", } ) resource_limits: ResourceLimits = field(default_factory=ResourceLimits) @@ -174,7 +292,6 @@ def check_blocked_paths(command: List[str], blocked_patterns: List[str]) -> Opti Returns the blocked pattern if found, None otherwise. """ - import shlex command_str = " ".join(command) @@ -223,7 +340,7 @@ def apply_resource_limits(limits: ResourceLimits) -> Dict[str, bool]: for name, rlimit_type, value in limit_map: try: - soft, hard = resource.getrlimit(rlimit_type) + _, hard = resource.getrlimit(rlimit_type) if hard == resource.RLIM_INFINITY or value <= hard: new_soft = min(value, hard) if hard != resource.RLIM_INFINITY else value resource.setrlimit(rlimit_type, (new_soft, hard)) @@ -252,9 +369,7 @@ def apply_landlock_restrictions(config: SandboxConfig) -> bool: rw_dir_paths: Set[str] = set() rw_file_paths: Set[str] = set() - def collect_paths( - paths: List[str], dir_set: Set[str], file_set: Set[str] - ) -> None: + def collect_paths(paths: List[str], dir_set: Set[str], file_set: Set[str]) -> None: for path in paths: if path in skip_paths or not os.path.exists(path): continue @@ -278,9 +393,7 @@ def collect_paths( read_file_access = ll.FSAccess.ReadFile read_dir_access = ll.FSAccess.ReadFile | ll.FSAccess.ReadDir - write_file_access = ( - ll.FSAccess.WriteFile | ll.FSAccess.Truncate - ) + write_file_access = ll.FSAccess.WriteFile | ll.FSAccess.Truncate write_dir_access = ( ll.FSAccess.WriteFile | ll.FSAccess.Truncate @@ -334,9 +447,7 @@ def collect_paths( pass if paths_added == 0: - print( - "Warning: No paths could be added to Landlock ruleset", file=sys.stderr - ) + print("Warning: No paths could be added to Landlock ruleset", file=sys.stderr) return False try: @@ -438,7 +549,7 @@ def run_sandboxed( env = config.environment if config.environment is not None else SAFE_ENV.copy() - sandbox_script = f''' + sandbox_script = f""" import sys import os import resource @@ -458,17 +569,17 @@ def run_sandboxed( except: pass -sys.path.insert(0, {repr(os.path.dirname(os.path.abspath(__file__)))}) +sys.path.insert(0, {os.path.dirname(os.path.abspath(__file__))!r}) try: from sandbox import apply_landlock_restrictions, apply_seccomp_restrictions, SandboxConfig config = SandboxConfig( - read_paths={repr(config.read_paths)}, - write_paths={repr(config.write_paths)}, - read_write_paths={repr(config.read_write_paths)}, - device_paths={repr(config.device_paths)}, - allow_network={repr(config.allow_network)}, + read_paths={config.read_paths!r}, + write_paths={config.write_paths!r}, + read_write_paths={config.read_write_paths!r}, + device_paths={config.device_paths!r}, + allow_network={config.allow_network!r}, use_seccomp_allowlist=False, ) @@ -487,22 +598,23 @@ def run_sandboxed( sys.exit(1) import subprocess -cmd = {repr(command)} +cmd = {command!r} try: - proc = subprocess.run(cmd, capture_output=True, text=True, cwd={repr(cwd)}) + proc = subprocess.run(cmd, capture_output=True, text=True, cwd={cwd!r}) print(proc.stdout, end="") print(proc.stderr, end="", file=sys.stderr) sys.exit(proc.returncode) except Exception as e: print(f"Execution error: {{e}}", file=sys.stderr) sys.exit(1) -''' +""" try: return subprocess.run( [sys.executable, "-c", sandbox_script], capture_output=True, text=True, + check=False, timeout=timeout, cwd=cwd, env=env, @@ -594,6 +706,7 @@ def run_command_sandboxed( finally: if isolated_tmp_dir and os.path.exists(isolated_tmp_dir): import shutil + try: shutil.rmtree(isolated_tmp_dir) except OSError: @@ -683,7 +796,7 @@ def run_python_sandboxed( escaped_code = json.dumps(code) - python_script = f''' + python_script = f""" import sys import os import resource @@ -704,17 +817,17 @@ def run_python_sandboxed( except: pass -sys.path.insert(0, {repr(os.path.dirname(os.path.abspath(__file__)))}) +sys.path.insert(0, {os.path.dirname(os.path.abspath(__file__))!r}) try: from sandbox import apply_landlock_restrictions, apply_seccomp_restrictions, SandboxConfig sandbox_config = SandboxConfig( - read_paths={repr(config.read_paths)}, - write_paths={repr(config.write_paths)}, - read_write_paths={repr(config.read_write_paths)}, - device_paths={repr(config.device_paths)}, - allow_network={repr(config.allow_network)}, + read_paths={config.read_paths!r}, + write_paths={config.write_paths!r}, + read_write_paths={config.read_write_paths!r}, + device_paths={config.device_paths!r}, + allow_network={config.allow_network!r}, use_seccomp_allowlist=False, ) @@ -756,7 +869,7 @@ def run_python_sandboxed( result_data["stderr"] = stderr_capture.getvalue() print("__SANDBOX_RESULT__" + json.dumps(result_data)) -''' +""" env = config.environment if config.environment is not None else SAFE_ENV.copy() @@ -765,6 +878,7 @@ def run_python_sandboxed( [sys.executable, "-c", python_script], capture_output=True, text=True, + check=False, timeout=timeout, env=env, ) @@ -774,7 +888,7 @@ def run_python_sandboxed( if "__SANDBOX_RESULT__" in stdout: marker_pos = stdout.find("__SANDBOX_RESULT__") - json_str = stdout[marker_pos + len("__SANDBOX_RESULT__"):] + json_str = stdout[marker_pos + len("__SANDBOX_RESULT__") :] pre_output = stdout[:marker_pos] try: result_data = json.loads(json_str.strip()) @@ -817,6 +931,7 @@ def run_python_sandboxed( finally: if isolated_tmp_dir and os.path.exists(isolated_tmp_dir): import shutil + try: shutil.rmtree(isolated_tmp_dir) except OSError: