Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions commands.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ $ solana-conformance create-env [OPTIONS]
* `-l, --section-limit INTEGER`: Limit number of fixture per section [default: 0]
* `-fd, --firedancer-repo PATH`: Path to firedancer repository
* `-tv, --test-vectors-repo PATH`: Path to test-vectors repository
* `--use-ng`: Use fuzz NG CLI (fuzz list/download repro) instead of API scraping [default: True]
* `--use-ng / --no-use-ng`: Use fuzz NG API (default: enabled) [default: use-ng]
* `-d, --debug-mode`: Enables debug mode, which disables multiprocessing
* `--help`: Show this message and exit.

Expand Down Expand Up @@ -162,8 +162,9 @@ $ solana-conformance debug-mismatches [OPTIONS]
* `-r, --randomize-output-buffer`: Randomizes bytes in output buffer before shared library execution
* `-p, --num-processes INTEGER`: Number of processes to use [default: 4]
* `-l, --section-limit INTEGER`: Limit number of fixture per section [default: 0]
* `--use-ng`: Use fuzz NG CLI (fuzz list/download repro) instead of API scraping [default: True]
* `--use-ng / --no-use-ng`: Use fuzz NG API (default: enabled) [default: use-ng]
* `-d, --debug-mode`: Enables debug mode, which disables multiprocessing
* `--force-redownload`: Force re-download even if fixtures are already cached
* `--help`: Show this message and exit.

## `solana-conformance decode-protobufs`
Expand Down Expand Up @@ -223,8 +224,9 @@ $ solana-conformance download-repros [OPTIONS]
* `-n, --section-names TEXT`: Comma-delimited list of lineage names to download [required]
* `-l, --section-limit INTEGER`: Limit number of repros per lineage (0 = all verified) [default: 0]
* `-p, --num-processes INTEGER`: Number of parallel download processes [default: 4]
* `--use-ng`: Use fuzz NG CLI (fuzz list/download repro) instead of API scraping [default: True]
* `--use-ng / --no-use-ng`: Use fuzz NG API (default: enabled) [default: use-ng]
* `--interactive / --no-interactive`: Prompt for authentication if needed [default: interactive]
* `--force-redownload`: Force re-download even if fixtures are already cached
* `--help`: Show this message and exit.

## `solana-conformance exec-fixtures`
Expand Down Expand Up @@ -330,7 +332,7 @@ $ solana-conformance list-repros [OPTIONS]

**Options**:

* `--use-ng`: Use fuzz NG API instead of web scraping [default: True]
* `--use-ng / --no-use-ng`: Use fuzz NG API (default: enabled) [default: use-ng]
* `-l, --lineage TEXT`: Filter to specific lineage (shows all repros in that lineage)
* `--interactive / --no-interactive`: Enable interactive configuration prompts if credentials are missing [default: interactive]
* `-f, --fuzzcorp-url TEXT`: FuzzCorp URL for web scraping (used when --use-ng is not set) [default: https://api.dev.fuzzcorp.asymmetric.re/uglyweb/firedancer-io/solfuzz/bugs/]
Expand Down
26 changes: 24 additions & 2 deletions src/test_suite/fuzzcorp_api_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,7 @@ def download_artifact_data(
lineage: str,
org: Optional[str] = None,
project: Optional[str] = None,
progress_callback=None,
) -> bytes:
data = {
"file_name": artifact_hash,
Expand All @@ -257,7 +258,28 @@ def download_artifact_data(
query_data = json.dumps(data)
url = f"{url}?arpc={urllib.parse.quote(query_data)}"

# Stream the response to handle large files
# Stream the response to handle large files with progress tracking
with self.client.stream("GET", url, headers=headers) as response:
response.raise_for_status()
return response.read()

# Get total size from Content-Length header (if available)
total_size = int(response.headers.get("Content-Length", 0))

# If no progress callback, just read all at once
if progress_callback is None:
return response.read()

# Download with progress tracking
downloaded = 0
chunks = []
chunk_count = 0

for chunk in response.iter_bytes(chunk_size=8192):
chunks.append(chunk)
downloaded += len(chunk)
chunk_count += 1

# Call progress callback (total_size may be 0 if no Content-Length)
progress_callback(downloaded, total_size)

return b"".join(chunks)
213 changes: 160 additions & 53 deletions src/test_suite/multiprocessing_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,54 @@
import tempfile
import time
import zipfile
import io
import threading
from concurrent.futures import ThreadPoolExecutor
from test_suite.fuzzcorp_auth import get_fuzzcorp_auth
from test_suite.fuzzcorp_api_client import FuzzCorpAPIClient


# The cached client is process-local (keyed by PID) to handle multiprocessing.
# Connections remain open until the process exits or client.close() is called.
_client_cache = {}
_client_lock = threading.Lock()

# File write lock for parallel extraction (prevents concurrent write conflicts)
_file_write_lock = threading.Lock()


def get_shared_fuzzcorp_client():
pid = os.getpid()

with _client_lock:
if pid not in _client_cache:
auth = get_fuzzcorp_auth(interactive=False)
if not auth:
return None

client = FuzzCorpAPIClient(
api_origin=auth.get_api_origin(),
token=auth.get_token(),
org=auth.get_organization(),
project=auth.get_project(),
http2=True,
)
_client_cache[pid] = client

return _client_cache[pid]


def close_shared_fuzzcorp_client():
pid = os.getpid()
with _client_lock:
if pid in _client_cache:
try:
_client_cache[pid].close()
except Exception:
pass # Ignore errors during cleanup
del _client_cache[pid]


def process_target(
harness_ctx: HarnessCtx, library: ctypes.CDLL, context: ContextType
) -> invoke_pb.InstrEffects | None:
Expand Down Expand Up @@ -444,74 +488,117 @@ def execute_fixture(test_file: Path) -> tuple[str, int, dict | None]:
)


def download_and_process(source):
def download_and_process(source, progress_queue=None):
try:
if isinstance(source, (tuple, list)) and len(source) == 2:
section_name, crash_hash = source
out_dir = globals.inputs_dir / f"{section_name}_{crash_hash}"
out_dir.mkdir(parents=True, exist_ok=True)

# Use FuzzCorp HTTP API to download repro
# Get configuration
config = get_fuzzcorp_auth(interactive=False)
if not config:
# Use shared client (connection pooling)
client = get_shared_fuzzcorp_client()
if not client:
return {
"success": False,
"repro": f"{section_name}/{crash_hash}",
"message": "Failed to download: no FuzzCorp config",
"message": "Failed to download: no FuzzCorp client",
}

# Create HTTP client
with FuzzCorpAPIClient(
api_origin=config.get_api_origin(),
token=config.get_token(),
org=config.get_organization(),
project=config.get_project(),
http2=True,
) as client:
# Get repro metadata to find artifact hashes
repro_metadata = client.get_repro_by_hash(
crash_hash,
org=config.get_organization(),
project=config.get_project(),
)

if not repro_metadata.artifact_hashes:
# Check if we already have fixtures for this repro
# Store cache markers in a persistent directory (not in inputs_dir which may be deleted)
# Use a hidden directory in the user's home directory or system temp
cache_dir = Path.home() / ".solana-conformance" / "download_cache"
cache_dir.mkdir(parents=True, exist_ok=True)

cache_marker = cache_dir / f"{section_name}_{crash_hash}.marker"
force_redownload = os.getenv("FORCE_REDOWNLOAD", "").lower() in (
"1",
"true",
"yes",
)

if cache_marker.exists() and not force_redownload:
# Check if fixture files actually exist (cache marker exists but files might be deleted)
existing_fixtures = list(globals.inputs_dir.glob("*.fix"))
fixture_count = len(existing_fixtures)

# Only use cache if actual fixture files exist
if fixture_count > 0:
return {
"success": False,
"success": True,
"repro": f"{section_name}/{crash_hash}",
"message": "Failed to process: no artifacts found",
"fixtures": fixture_count,
"artifacts": 0, # Cached, so 0 artifacts downloaded
"cached": True,
"message": f"Using cached fixtures for {section_name}/{crash_hash}",
}
# else: Cache marker exists but files are gone, re-download

# Download and extract each artifact
fix_count = 0
num_artifacts = len(repro_metadata.artifact_hashes)
for idx, artifact_hash in enumerate(repro_metadata.artifact_hashes, 1):
# Download artifact (ZIP file)
artifact_data = client.download_artifact_data(
artifact_hash, section_name
)

# Save and extract ZIP
with tempfile.NamedTemporaryFile(
suffix=".zip", delete=False
) as tmp_zip:
tmp_zip.write(artifact_data)
tmp_zip_path = tmp_zip.name

try:
with zipfile.ZipFile(tmp_zip_path) as z:
# Extract all .fix files
for member in z.namelist():
if member.endswith(".fix"):
fix_content = z.read(member)
fix_name = Path(member).name
fix_path = globals.inputs_dir / fix_name
with open(fix_path, "wb") as f:
f.write(fix_content)
fix_count += 1
finally:
os.unlink(tmp_zip_path)
# Get repro metadata to find artifact hashes
repro_metadata = client.get_repro_by_hash(crash_hash)

if not repro_metadata.artifact_hashes:
return {
"success": False,
"repro": f"{section_name}/{crash_hash}",
"message": "Failed to process: no artifacts found",
}

# Parallel artifact downloads with optional progress tracking
def download_artifact(artifact_hash):
"""Download a single artifact and return its data."""
# Create progress callback if queue is provided
progress_callback = None
if progress_queue:

def callback(downloaded, total):
# Send progress update to main process
if progress_queue:
progress_queue.put(
{
"type": "download_progress",
"downloaded": downloaded,
"total": total,
"artifact_hash": artifact_hash,
}
)

progress_callback = callback

return client.download_artifact_data(
artifact_hash, section_name, progress_callback=progress_callback
)

num_artifacts = len(repro_metadata.artifact_hashes)

# Use ThreadPoolExecutor for parallel downloads (max 5 concurrent)
with ThreadPoolExecutor(max_workers=min(5, num_artifacts)) as executor:
artifacts_data = list(
executor.map(download_artifact, repro_metadata.artifact_hashes)
)

# Parallel in-memory ZIP extraction
def extract_artifact(artifact_data):
"""Extract .fix files from a single artifact ZIP."""
extracted_count = 0
with zipfile.ZipFile(io.BytesIO(artifact_data)) as z:
for member in z.namelist():
if member.endswith(".fix"):
fix_content = z.read(member)
fix_name = Path(member).name
fix_path = globals.inputs_dir / fix_name
# Thread-safe file write (prevents conflicts)
with _file_write_lock:
with open(fix_path, "wb") as f:
f.write(fix_content)
extracted_count += 1
return extracted_count

# Extract all artifacts in parallel
with ThreadPoolExecutor(max_workers=min(5, num_artifacts)) as executor:
fix_counts = list(executor.map(extract_artifact, artifacts_data))

fix_count = sum(fix_counts)

if fix_count == 0:
return {
Expand All @@ -520,12 +607,32 @@ def download_and_process(source):
"message": "Failed to process: no .fix files found in artifacts",
}

# Create cache marker file to indicate successful download
cache_dir = Path.home() / ".solana-conformance" / "download_cache"
cache_dir.mkdir(parents=True, exist_ok=True)
cache_marker = cache_dir / f"{section_name}_{crash_hash}.marker"
try:
# Write metadata to marker file (size, fixture count, timestamp)
import time

with open(cache_marker, "w") as f:
f.write(f"downloaded_at={int(time.time())}\n")
f.write(f"fixtures={fix_count}\n")
f.write(f"artifacts={num_artifacts}\n")
except Exception as e:
# Non-fatal: just means we'll re-download next time
print(
f"[NOTICE] Failed to create cache marker for {section_name}/{crash_hash}: {e}",
flush=True,
)

# Return structured result with artifact count
return {
"success": True,
"repro": f"{section_name}/{crash_hash}",
"fixtures": fix_count,
"artifacts": num_artifacts,
"cached": False,
"message": f"Processed {section_name}/{crash_hash} successfully ({fix_count} fixture(s) from {num_artifacts} artifact(s))",
}

Expand Down
Loading
Loading