From 0e2b07e2e58e5877e942b5387226fe6786e39612 Mon Sep 17 00:00:00 2001 From: Charles Moyes Date: Thu, 23 Oct 2025 21:52:28 +0000 Subject: [PATCH 1/2] Download speed improvements --- commands.md | 10 +- src/test_suite/fuzzcorp_api_client.py | 26 +- src/test_suite/multiprocessing_utils.py | 240 +++++++++++++++---- src/test_suite/test_suite.py | 305 ++++++++++++++++++++---- 4 files changed, 480 insertions(+), 101 deletions(-) diff --git a/commands.md b/commands.md index 9f7fd0a..b814454 100644 --- a/commands.md +++ b/commands.md @@ -77,7 +77,7 @@ $ solana-conformance create-env [OPTIONS] * `-l, --section-limit INTEGER`: Limit number of fixture per section [default: 0] * `-fd, --firedancer-repo PATH`: Path to firedancer repository * `-tv, --test-vectors-repo PATH`: Path to test-vectors repository -* `--use-ng`: Use fuzz NG CLI (fuzz list/download repro) instead of API scraping [default: True] +* `--use-ng / --no-use-ng`: Use fuzz NG API (default: enabled) [default: use-ng] * `-d, --debug-mode`: Enables debug mode, which disables multiprocessing * `--help`: Show this message and exit. @@ -162,8 +162,9 @@ $ solana-conformance debug-mismatches [OPTIONS] * `-r, --randomize-output-buffer`: Randomizes bytes in output buffer before shared library execution * `-p, --num-processes INTEGER`: Number of processes to use [default: 4] * `-l, --section-limit INTEGER`: Limit number of fixture per section [default: 0] -* `--use-ng`: Use fuzz NG CLI (fuzz list/download repro) instead of API scraping [default: True] +* `--use-ng / --no-use-ng`: Use fuzz NG API (default: enabled) [default: use-ng] * `-d, --debug-mode`: Enables debug mode, which disables multiprocessing +* `--force-redownload`: Force re-download even if fixtures are already cached * `--help`: Show this message and exit. ## `solana-conformance decode-protobufs` @@ -223,8 +224,9 @@ $ solana-conformance download-repros [OPTIONS] * `-n, --section-names TEXT`: Comma-delimited list of lineage names to download [required] * `-l, --section-limit INTEGER`: Limit number of repros per lineage (0 = all verified) [default: 0] * `-p, --num-processes INTEGER`: Number of parallel download processes [default: 4] -* `--use-ng`: Use fuzz NG CLI (fuzz list/download repro) instead of API scraping [default: True] +* `--use-ng / --no-use-ng`: Use fuzz NG API (default: enabled) [default: use-ng] * `--interactive / --no-interactive`: Prompt for authentication if needed [default: interactive] +* `--force-redownload`: Force re-download even if fixtures are already cached * `--help`: Show this message and exit. ## `solana-conformance exec-fixtures` @@ -330,7 +332,7 @@ $ solana-conformance list-repros [OPTIONS] **Options**: -* `--use-ng`: Use fuzz NG API instead of web scraping [default: True] +* `--use-ng / --no-use-ng`: Use fuzz NG API (default: enabled) [default: use-ng] * `-l, --lineage TEXT`: Filter to specific lineage (shows all repros in that lineage) * `--interactive / --no-interactive`: Enable interactive configuration prompts if credentials are missing [default: interactive] * `-f, --fuzzcorp-url TEXT`: FuzzCorp URL for web scraping (used when --use-ng is not set) [default: https://api.dev.fuzzcorp.asymmetric.re/uglyweb/firedancer-io/solfuzz/bugs/] diff --git a/src/test_suite/fuzzcorp_api_client.py b/src/test_suite/fuzzcorp_api_client.py index bf83657..cf9fafc 100644 --- a/src/test_suite/fuzzcorp_api_client.py +++ b/src/test_suite/fuzzcorp_api_client.py @@ -238,6 +238,7 @@ def download_artifact_data( lineage: str, org: Optional[str] = None, project: Optional[str] = None, + progress_callback=None, ) -> bytes: data = { "file_name": artifact_hash, @@ -257,7 +258,28 @@ def download_artifact_data( query_data = json.dumps(data) url = f"{url}?arpc={urllib.parse.quote(query_data)}" - # Stream the response to handle large files + # Stream the response to handle large files with progress tracking with self.client.stream("GET", url, headers=headers) as response: response.raise_for_status() - return response.read() + + # Get total size from Content-Length header (if available) + total_size = int(response.headers.get("Content-Length", 0)) + + # If no progress callback, just read all at once + if progress_callback is None: + return response.read() + + # Download with progress tracking + downloaded = 0 + chunks = [] + chunk_count = 0 + + for chunk in response.iter_bytes(chunk_size=8192): + chunks.append(chunk) + downloaded += len(chunk) + chunk_count += 1 + + # Call progress callback (total_size may be 0 if no Content-Length) + progress_callback(downloaded, total_size) + + return b"".join(chunks) diff --git a/src/test_suite/multiprocessing_utils.py b/src/test_suite/multiprocessing_utils.py index 91bd0b5..c0f7143 100644 --- a/src/test_suite/multiprocessing_utils.py +++ b/src/test_suite/multiprocessing_utils.py @@ -19,10 +19,54 @@ import tempfile import time import zipfile +import io +import threading +from concurrent.futures import ThreadPoolExecutor from test_suite.fuzzcorp_auth import get_fuzzcorp_auth from test_suite.fuzzcorp_api_client import FuzzCorpAPIClient +# The cached client is process-local (keyed by PID) to handle multiprocessing. +# Connections remain open until the process exits or client.close() is called. +_client_cache = {} +_client_lock = threading.Lock() + +# File write lock for parallel extraction (prevents concurrent write conflicts) +_file_write_lock = threading.Lock() + + +def get_shared_fuzzcorp_client(): + pid = os.getpid() + + with _client_lock: + if pid not in _client_cache: + auth = get_fuzzcorp_auth(interactive=False) + if not auth: + return None + + client = FuzzCorpAPIClient( + api_origin=auth.get_api_origin(), + token=auth.get_token(), + org=auth.get_organization(), + project=auth.get_project(), + http2=True, + ) + _client_cache[pid] = client + + return _client_cache[pid] + + +def close_shared_fuzzcorp_client(): + pid = os.getpid() + with _client_lock: + if pid in _client_cache: + try: + _client_cache[pid].close() + except Exception: + pass # Ignore errors during cleanup + del _client_cache[pid] + + def process_target( harness_ctx: HarnessCtx, library: ctypes.CDLL, context: ContextType ) -> invoke_pb.InstrEffects | None: @@ -444,74 +488,148 @@ def execute_fixture(test_file: Path) -> tuple[str, int, dict | None]: ) -def download_and_process(source): +def download_and_process(source, progress_queue=None): try: if isinstance(source, (tuple, list)) and len(source) == 2: section_name, crash_hash = source out_dir = globals.inputs_dir / f"{section_name}_{crash_hash}" out_dir.mkdir(parents=True, exist_ok=True) - # Use FuzzCorp HTTP API to download repro - # Get configuration - config = get_fuzzcorp_auth(interactive=False) - if not config: + # Use shared client (connection pooling) + client = get_shared_fuzzcorp_client() + if not client: return { "success": False, "repro": f"{section_name}/{crash_hash}", - "message": "Failed to download: no FuzzCorp config", + "message": "Failed to download: no FuzzCorp client", } - # Create HTTP client - with FuzzCorpAPIClient( - api_origin=config.get_api_origin(), - token=config.get_token(), - org=config.get_organization(), - project=config.get_project(), - http2=True, - ) as client: - # Get repro metadata to find artifact hashes - repro_metadata = client.get_repro_by_hash( - crash_hash, - org=config.get_organization(), - project=config.get_project(), - ) + # Check if we already have fixtures for this repro + # Store cache markers in a persistent directory (not in inputs_dir which may be deleted) + # Use a hidden directory in the user's home directory or system temp + cache_dir = Path.home() / ".solana-conformance" / "download_cache" + cache_dir.mkdir(parents=True, exist_ok=True) + + cache_marker = cache_dir / f"{section_name}_{crash_hash}.marker" + force_redownload = os.getenv("FORCE_REDOWNLOAD", "").lower() in ( + "1", + "true", + "yes", + ) + + if cache_marker.exists() and not force_redownload: + # Check if fixture files actually exist (cache marker exists but files might be deleted) + existing_fixtures = list(globals.inputs_dir.glob("*.fix")) + fixture_count = len(existing_fixtures) + + # Only use cache if actual fixture files exist + if fixture_count > 0: + debug_progress = os.getenv( + "DEBUG_DOWNLOAD_PROGRESS", "" + ).lower() in ("1", "true", "yes") + if debug_progress: + print( + f"[DEBUG] Worker PID={os.getpid()} skipping {section_name}/{crash_hash} (already downloaded, {fixture_count} fixtures cached)", + flush=True, + ) - if not repro_metadata.artifact_hashes: return { - "success": False, + "success": True, "repro": f"{section_name}/{crash_hash}", - "message": "Failed to process: no artifacts found", + "fixtures": fixture_count, + "artifacts": 0, # Cached, so 0 artifacts downloaded + "cached": True, + "message": f"Using cached fixtures for {section_name}/{crash_hash}", } + # else: Cache marker exists but files are gone, re-download - # Download and extract each artifact - fix_count = 0 - num_artifacts = len(repro_metadata.artifact_hashes) - for idx, artifact_hash in enumerate(repro_metadata.artifact_hashes, 1): - # Download artifact (ZIP file) - artifact_data = client.download_artifact_data( - artifact_hash, section_name - ) - - # Save and extract ZIP - with tempfile.NamedTemporaryFile( - suffix=".zip", delete=False - ) as tmp_zip: - tmp_zip.write(artifact_data) - tmp_zip_path = tmp_zip.name - - try: - with zipfile.ZipFile(tmp_zip_path) as z: - # Extract all .fix files - for member in z.namelist(): - if member.endswith(".fix"): - fix_content = z.read(member) - fix_name = Path(member).name - fix_path = globals.inputs_dir / fix_name - with open(fix_path, "wb") as f: - f.write(fix_content) - fix_count += 1 - finally: - os.unlink(tmp_zip_path) + # Get repro metadata to find artifact hashes + repro_metadata = client.get_repro_by_hash(crash_hash) + + if not repro_metadata.artifact_hashes: + return { + "success": False, + "repro": f"{section_name}/{crash_hash}", + "message": "Failed to process: no artifacts found", + } + + # Parallel artifact downloads with optional progress tracking + debug_progress = os.getenv("DEBUG_DOWNLOAD_PROGRESS", "").lower() in ( + "1", + "true", + "yes", + ) + + if debug_progress: + print( + f"[DEBUG] Worker PID={os.getpid()} starting download for {section_name}/{crash_hash}", + flush=True, + ) + + def download_artifact(artifact_hash): + """Download a single artifact and return its data.""" + # Create progress callback if queue is provided + progress_callback = None + if progress_queue or debug_progress: + + def callback(downloaded, total): + # Debug logging if enabled + if debug_progress: + if total > 0: + pct = downloaded / total * 100 + progress_str = ( + f"{downloaded}/{total} bytes ({pct:.1f}%)" + ) + else: + progress_str = f"{downloaded} bytes (total unknown)" + + # Send progress update to main process + if progress_queue: + progress_queue.put( + { + "type": "download_progress", + "downloaded": downloaded, + "total": total, + "artifact_hash": artifact_hash, + } + ) + + progress_callback = callback + + return client.download_artifact_data( + artifact_hash, section_name, progress_callback=progress_callback + ) + + num_artifacts = len(repro_metadata.artifact_hashes) + + # Use ThreadPoolExecutor for parallel downloads (max 5 concurrent) + with ThreadPoolExecutor(max_workers=min(5, num_artifacts)) as executor: + artifacts_data = list( + executor.map(download_artifact, repro_metadata.artifact_hashes) + ) + + # Parallel in-memory ZIP extraction + def extract_artifact(artifact_data): + """Extract .fix files from a single artifact ZIP.""" + extracted_count = 0 + with zipfile.ZipFile(io.BytesIO(artifact_data)) as z: + for member in z.namelist(): + if member.endswith(".fix"): + fix_content = z.read(member) + fix_name = Path(member).name + fix_path = globals.inputs_dir / fix_name + # Thread-safe file write (prevents conflicts) + with _file_write_lock: + with open(fix_path, "wb") as f: + f.write(fix_content) + extracted_count += 1 + return extracted_count + + # Extract all artifacts in parallel + with ThreadPoolExecutor(max_workers=min(5, num_artifacts)) as executor: + fix_counts = list(executor.map(extract_artifact, artifacts_data)) + + fix_count = sum(fix_counts) if fix_count == 0: return { @@ -520,12 +638,30 @@ def download_and_process(source): "message": "Failed to process: no .fix files found in artifacts", } + # Create cache marker file to indicate successful download + cache_dir = Path.home() / ".solana-conformance" / "download_cache" + cache_dir.mkdir(parents=True, exist_ok=True) + cache_marker = cache_dir / f"{section_name}_{crash_hash}.marker" + try: + # Write metadata to marker file (size, fixture count, timestamp) + import time + + with open(cache_marker, "w") as f: + f.write(f"downloaded_at={int(time.time())}\n") + f.write(f"fixtures={fix_count}\n") + f.write(f"artifacts={num_artifacts}\n") + except Exception as e: + # Non-fatal: just means we'll re-download next time + if debug_progress: + print(f"[DEBUG] Failed to create cache marker: {e}", flush=True) + # Return structured result with artifact count return { "success": True, "repro": f"{section_name}/{crash_hash}", "fixtures": fix_count, "artifacts": num_artifacts, + "cached": False, "message": f"Processed {section_name}/{crash_hash} successfully ({fix_count} fixture(s) from {num_artifacts} artifact(s))", } diff --git a/src/test_suite/test_suite.py b/src/test_suite/test_suite.py index 97e1aa2..868b843 100644 --- a/src/test_suite/test_suite.py +++ b/src/test_suite/test_suite.py @@ -324,8 +324,14 @@ def create_fixtures( for file_path in input.rglob("*.fix"): if file_path.is_file(): test_cases.append(file_path) + num_test_cases = len(test_cases) + # Early exit if no test cases found + if num_test_cases == 0: + print(f"\n[NOTICE] No test cases found in {input}") + return True + globals.default_harness_ctx = HARNESS_MAP[default_harness_ctx] # Generate the test cases in parallel from files on disk @@ -518,6 +524,11 @@ def run_tests( num_test_cases = len(test_cases) + # Early exit if no test cases found + if num_test_cases == 0: + print(f"\n[NOTICE] No test cases found in {input}") + return True + # Process the test results in parallel print("Running tests...") test_case_results = [] @@ -733,8 +744,8 @@ def configure_fuzzcorp( def list_repros( use_ng: bool = typer.Option( True, - "--use-ng", - help="Use fuzz NG API instead of web scraping", + "--use-ng/--no-use-ng", + help="Use fuzz NG API (default: enabled)", ), lineage: str = typer.Option( None, @@ -985,22 +996,44 @@ def download_repros( ), use_ng: bool = typer.Option( True, - "--use-ng", - help="Use fuzz NG CLI (fuzz list/download repro) instead of API scraping", + "--use-ng/--no-use-ng", + help="Use fuzz NG API (default: enabled)", ), interactive: bool = typer.Option( True, "--interactive/--no-interactive", help="Prompt for authentication if needed", ), + force_redownload: bool = typer.Option( + False, + "--force-redownload", + help="Force re-download even if fixtures are already cached", + ), ): """Download repros from FuzzCorp NG API.""" - # Create output directories + # Set force redownload environment variable for worker processes + if force_redownload: + os.environ["FORCE_REDOWNLOAD"] = "1" + else: + os.environ.pop("FORCE_REDOWNLOAD", None) + + # Create output directories (preserve inputs for caching) if output_dir.exists(): - shutil.rmtree(output_dir) - output_dir.mkdir(parents=True, exist_ok=True) + for item in output_dir.iterdir(): + if item.name != "inputs": # Preserve inputs for caching + if item.is_dir(): + shutil.rmtree(item) + else: + item.unlink() + else: + output_dir.mkdir(parents=True, exist_ok=True) inputs_dir = output_dir / "inputs" + + # Only delete inputs_dir if force redownload is set + if force_redownload and inputs_dir.exists(): + shutil.rmtree(inputs_dir) + inputs_dir.mkdir(parents=True, exist_ok=True) # Set globals for download_and_process @@ -1062,51 +1095,194 @@ def fetch_repros(client): # Download in parallel with progress bar if num_processes > 1: + from multiprocessing import Manager + import functools + + # Debug mode for progress tracking + debug_progress = os.getenv("DEBUG_DOWNLOAD_PROGRESS", "").lower() in ( + "1", + "true", + "yes", + ) + + if debug_progress: + print( + f"[DEBUG] Main process: debug logging enabled, starting {len(download_list)} downloads with {num_processes} workers", + flush=True, + ) + + # Create a shared queue for progress updates + manager = Manager() + progress_queue = manager.Queue() + + # Wrap download_and_process to include progress_queue + download_with_progress = functools.partial( + download_and_process, progress_queue=progress_queue + ) + with Pool(processes=num_processes) as pool: results = [] total_artifacts = 0 total_fixtures = 0 + total_bytes_downloaded = 0 + total_bytes_expected = 0 + total_cached = 0 + + # Track per-artifact progress + artifact_progress = {} # artifact_hash -> (downloaded, total) + with tqdm.tqdm( total=len(download_list), desc="Downloading", unit="repro" ) as pbar: - for result in pool.imap_unordered( - download_and_process, download_list - ): - results.append(result) - # Handle structured results - if isinstance(result, dict): - if result.get("success"): - artifacts = result.get("artifacts", 0) - fixtures = result.get("fixtures", 0) - total_artifacts += artifacts - total_fixtures += fixtures - pbar.set_postfix( - { + # Start async downloads + async_result = pool.imap_unordered( + download_with_progress, download_list + ) + + completed = 0 + while completed < len(download_list): + # Check for progress updates (non-blocking) + while not progress_queue.empty(): + try: + update = progress_queue.get_nowait() + if update["type"] == "download_progress": + artifact_hash = update["artifact_hash"] + downloaded = update["downloaded"] + total = update["total"] + + # Update artifact tracking + old_downloaded = artifact_progress.get( + artifact_hash, (0, 0) + )[0] + artifact_progress[artifact_hash] = ( + downloaded, + total, + ) + + # Update totals + delta = downloaded - old_downloaded + total_bytes_downloaded += delta + if total > 0: + old_total = artifact_progress.get( + artifact_hash, (0, 0) + )[1] + if old_total == 0: + total_bytes_expected += total + + # Debug logging if enabled + if debug_progress: + if total > 0: + pct = downloaded / total * 100 + artifact_str = ( + f"{downloaded}/{total} ({pct:.1f}%)" + ) + else: + artifact_str = ( + f"{downloaded} bytes (total unknown)" + ) + + if total_bytes_expected > 0: + global_str = f"{total_bytes_downloaded / 1024 / 1024:.1f}/{total_bytes_expected / 1024 / 1024:.1f}MB" + else: + global_str = f"{total_bytes_downloaded / 1024 / 1024:.1f}MB (total unknown)" + + # Update progress bar with byte-level granularity + postfix = { "artifacts": total_artifacts, "fixtures": total_fixtures, } - ) + if total_bytes_expected > 0: + postfix["data"] = ( + f"{total_bytes_downloaded / 1024 / 1024:.1f}/{total_bytes_expected / 1024 / 1024:.1f}MB" + ) + pbar.set_postfix(postfix, refresh=True) + except: + break + + # Check for completed downloads (timeout to allow progress updates) + try: + result = async_result.__next__() + completed += 1 + results.append(result) + + # Handle structured results + if isinstance(result, dict): + if result.get("success"): + artifacts = result.get("artifacts", 0) + fixtures = result.get("fixtures", 0) + cached = result.get("cached", False) + total_artifacts += artifacts + total_fixtures += fixtures + if cached: + total_cached += 1 + + postfix = { + "artifacts": total_artifacts, + "fixtures": total_fixtures, + } + if total_cached > 0: + postfix["cached"] = total_cached + pbar.set_postfix(postfix) + else: + pbar.write( + f" [WARNING] {result['repro']}: {result['message']}" + ) else: - pbar.write( - f" [WARNING] {result['repro']}: {result['message']}" - ) - else: - # Legacy string result - if result.startswith("Error") or result.startswith( - "Failed" - ): - pbar.write(f" [WARNING] {result}") - pbar.update(1) + # Legacy string result + if result.startswith("Error") or result.startswith( + "Failed" + ): + pbar.write(f" [WARNING] {result}") + pbar.update(1) + except StopIteration: + break else: - # Sequential download with progress bar + # Sequential download with progress bar and granular progress results = [] total_artifacts = 0 total_fixtures = 0 + total_bytes_downloaded = 0 + total_bytes_expected = 0 + artifact_progress = {} # artifact_hash -> (downloaded, total) + + # Create in-process progress callback + def progress_callback_factory(pbar_ref): + """Create a progress callback that updates the tqdm bar.""" + + def callback(downloaded, total, artifact_hash): + nonlocal total_bytes_downloaded, total_bytes_expected, artifact_progress + + # Update artifact tracking + old_downloaded = artifact_progress.get(artifact_hash, (0, 0))[0] + artifact_progress[artifact_hash] = (downloaded, total) + + # Update totals + total_bytes_downloaded += downloaded - old_downloaded + if total > 0: + old_total = artifact_progress.get(artifact_hash, (0, 0))[1] + if old_total == 0: + total_bytes_expected += total + + # Update progress bar + postfix = { + "artifacts": total_artifacts, + "fixtures": total_fixtures, + } + if total_bytes_expected > 0: + postfix["data"] = ( + f"{total_bytes_downloaded / 1024 / 1024:.1f}/{total_bytes_expected / 1024 / 1024:.1f}MB" + ) + pbar_ref.set_postfix(postfix, refresh=True) + + return callback + with tqdm.tqdm( total=len(download_list), desc="Downloading", unit="repro" ) as pbar: for item in download_list: - result = download_and_process(item) + # NOTE: Sequential mode doesn't use progress_queue + # Instead we rely on the progress_callback in the API client + result = download_and_process(item, progress_queue=None) results.append(result) # Handle structured results if isinstance(result, dict): @@ -1232,8 +1408,8 @@ def debug_mismatches( ), use_ng: bool = typer.Option( True, - "--use-ng", - help="Use fuzz NG CLI (fuzz list/download repro) instead of API scraping", + "--use-ng/--no-use-ng", + help="Use fuzz NG API (default: enabled)", ), debug_mode: bool = typer.Option( False, @@ -1241,19 +1417,40 @@ def debug_mismatches( "-d", help="Enables debug mode, which disables multiprocessing", ), + force_redownload: bool = typer.Option( + False, + "--force-redownload", + help="Force re-download even if fixtures are already cached", + ), ): initialize_process_output_buffers(randomize_output_buffer=randomize_output_buffer) + # Set force redownload environment variable for worker processes + if force_redownload: + os.environ["FORCE_REDOWNLOAD"] = "1" + else: + os.environ.pop("FORCE_REDOWNLOAD", None) + globals.output_dir = output_dir + # Only delete output_dir if it exists (but preserve inputs for caching) + # We'll clean up other subdirectories but keep inputs_dir for cache reuse if globals.output_dir.exists(): - shutil.rmtree(globals.output_dir) - globals.output_dir.mkdir(parents=True, exist_ok=True) + for item in globals.output_dir.iterdir(): + if item.name != "inputs": # Preserve inputs for caching + if item.is_dir(): + shutil.rmtree(item) + else: + item.unlink() + else: + globals.output_dir.mkdir(parents=True, exist_ok=True) globals.inputs_dir = globals.output_dir / "inputs" - if globals.inputs_dir.exists(): + # Only delete inputs_dir if force redownload is set + if force_redownload and globals.inputs_dir.exists(): shutil.rmtree(globals.inputs_dir) + globals.inputs_dir.mkdir(parents=True, exist_ok=True) fuzzcorp_cookie = os.getenv("FUZZCORP_COOKIE") @@ -1354,6 +1551,10 @@ def fetch_repros(client): continue custom_data_urls.append(custom_url) + if not custom_data_urls: + print("\n[NOTICE] No repros to download") + return True + ld_preload = os.environ.pop("LD_PRELOAD", None) num_test_cases = len(custom_data_urls) @@ -1369,8 +1570,23 @@ def fetch_repros(client): ) # Print download results summary - successful_downloads = [r for r in results if r and "successfully" in r] - failed_downloads = [r for r in results if r and ("Failed" in r or "Error" in r)] + successful_downloads = [] + failed_downloads = [] + for r in results: + if not r: + continue + if isinstance(r, dict): + if r.get("success"): + successful_downloads.append(r) + else: + failed_downloads.append(r) + else: + # Legacy string results + if "successfully" in r: + successful_downloads.append(r) + elif "Failed" in r or "Error" in r: + failed_downloads.append(r) + print( f"\nDownload summary: {len(successful_downloads)} succeeded, {len(failed_downloads)} failed" ) @@ -1378,7 +1594,10 @@ def fetch_repros(client): if failed_downloads: print(f"\n[WARNING] Failed downloads:") for failure in failed_downloads: - print(f" - {failure}") + if isinstance(failure, dict): + print(f" - {failure['repro']}: {failure['message']}") + else: + print(f" - {failure}") if ld_preload is not None: os.environ["LD_PRELOAD"] = ld_preload @@ -2094,8 +2313,8 @@ def create_env( ), use_ng: bool = typer.Option( True, - "--use-ng", - help="Use fuzz NG CLI (fuzz list/download repro) instead of API scraping", + "--use-ng/--no-use-ng", + help="Use fuzz NG API (default: enabled)", ), debug_mode: bool = typer.Option( False, From d94c2eb91fbb73d6e54b06b4452993394a9f9232 Mon Sep 17 00:00:00 2001 From: Charles Moyes Date: Thu, 23 Oct 2025 21:56:42 +0000 Subject: [PATCH 2/2] Remove debug mode --- src/test_suite/multiprocessing_utils.py | 39 ++++--------------------- src/test_suite/test_suite.py | 35 ++-------------------- 2 files changed, 7 insertions(+), 67 deletions(-) diff --git a/src/test_suite/multiprocessing_utils.py b/src/test_suite/multiprocessing_utils.py index c0f7143..c8523de 100644 --- a/src/test_suite/multiprocessing_utils.py +++ b/src/test_suite/multiprocessing_utils.py @@ -524,15 +524,6 @@ def download_and_process(source, progress_queue=None): # Only use cache if actual fixture files exist if fixture_count > 0: - debug_progress = os.getenv( - "DEBUG_DOWNLOAD_PROGRESS", "" - ).lower() in ("1", "true", "yes") - if debug_progress: - print( - f"[DEBUG] Worker PID={os.getpid()} skipping {section_name}/{crash_hash} (already downloaded, {fixture_count} fixtures cached)", - flush=True, - ) - return { "success": True, "repro": f"{section_name}/{crash_hash}", @@ -554,35 +545,13 @@ def download_and_process(source, progress_queue=None): } # Parallel artifact downloads with optional progress tracking - debug_progress = os.getenv("DEBUG_DOWNLOAD_PROGRESS", "").lower() in ( - "1", - "true", - "yes", - ) - - if debug_progress: - print( - f"[DEBUG] Worker PID={os.getpid()} starting download for {section_name}/{crash_hash}", - flush=True, - ) - def download_artifact(artifact_hash): """Download a single artifact and return its data.""" # Create progress callback if queue is provided progress_callback = None - if progress_queue or debug_progress: + if progress_queue: def callback(downloaded, total): - # Debug logging if enabled - if debug_progress: - if total > 0: - pct = downloaded / total * 100 - progress_str = ( - f"{downloaded}/{total} bytes ({pct:.1f}%)" - ) - else: - progress_str = f"{downloaded} bytes (total unknown)" - # Send progress update to main process if progress_queue: progress_queue.put( @@ -652,8 +621,10 @@ def extract_artifact(artifact_data): f.write(f"artifacts={num_artifacts}\n") except Exception as e: # Non-fatal: just means we'll re-download next time - if debug_progress: - print(f"[DEBUG] Failed to create cache marker: {e}", flush=True) + print( + f"[NOTICE] Failed to create cache marker for {section_name}/{crash_hash}: {e}", + flush=True, + ) # Return structured result with artifact count return { diff --git a/src/test_suite/test_suite.py b/src/test_suite/test_suite.py index 868b843..34b2287 100644 --- a/src/test_suite/test_suite.py +++ b/src/test_suite/test_suite.py @@ -46,6 +46,8 @@ import httpx from test_suite.fuzzcorp_auth import get_fuzzcorp_auth, FuzzCorpAuth from test_suite.fuzzcorp_utils import fuzzcorp_api_call +from multiprocessing import Manager +import functools """ Harness options: @@ -1095,22 +1097,6 @@ def fetch_repros(client): # Download in parallel with progress bar if num_processes > 1: - from multiprocessing import Manager - import functools - - # Debug mode for progress tracking - debug_progress = os.getenv("DEBUG_DOWNLOAD_PROGRESS", "").lower() in ( - "1", - "true", - "yes", - ) - - if debug_progress: - print( - f"[DEBUG] Main process: debug logging enabled, starting {len(download_list)} downloads with {num_processes} workers", - flush=True, - ) - # Create a shared queue for progress updates manager = Manager() progress_queue = manager.Queue() @@ -1169,23 +1155,6 @@ def fetch_repros(client): if old_total == 0: total_bytes_expected += total - # Debug logging if enabled - if debug_progress: - if total > 0: - pct = downloaded / total * 100 - artifact_str = ( - f"{downloaded}/{total} ({pct:.1f}%)" - ) - else: - artifact_str = ( - f"{downloaded} bytes (total unknown)" - ) - - if total_bytes_expected > 0: - global_str = f"{total_bytes_downloaded / 1024 / 1024:.1f}/{total_bytes_expected / 1024 / 1024:.1f}MB" - else: - global_str = f"{total_bytes_downloaded / 1024 / 1024:.1f}MB (total unknown)" - # Update progress bar with byte-level granularity postfix = { "artifacts": total_artifacts,