diff --git a/.gitignore b/.gitignore index 023f552c..a801c316 100644 --- a/.gitignore +++ b/.gitignore @@ -103,3 +103,5 @@ target/ .pytest_cache/ libs/*.whl + +gcsfs/tests/perf/microbenchmarks/__run__ diff --git a/.isort.cfg b/.isort.cfg index 1eab763a..cd08a195 100644 --- a/.isort.cfg +++ b/.isort.cfg @@ -1,3 +1,3 @@ [settings] profile = black -known_third_party = aiohttp,click,decorator,fsspec,fuse,google,google_auth_oauthlib,pytest,requests,setuptools +known_third_party = aiohttp,click,conftest,decorator,fsspec,fuse,google,google_auth_oauthlib,numpy,prettytable,psutil,pytest,requests,resource_monitor,setuptools,yaml diff --git a/cloudbuild/e2e-tests-cloudbuild.yaml b/cloudbuild/e2e-tests-cloudbuild.yaml index bc1bcebc..8306e949 100644 --- a/cloudbuild/e2e-tests-cloudbuild.yaml +++ b/cloudbuild/e2e-tests-cloudbuild.yaml @@ -129,7 +129,7 @@ steps: pip install --upgrade pip > /dev/null # Install testing libraries explicitly, as they are not in setup.py - pip install pytest pytest-timeout pytest-subtests pytest-asyncio fusepy google-cloud-storage > /dev/null + pip install pytest pytest-timeout pytest-subtests pytest-asyncio fusepy google-cloud-storage psutil PyYAML > /dev/null pip install -e . > /dev/null echo '--- Preparing test environment on VM ---' diff --git a/environment_gcsfs.yaml b/environment_gcsfs.yaml index 39db5120..113fcd35 100644 --- a/environment_gcsfs.yaml +++ b/environment_gcsfs.yaml @@ -13,12 +13,17 @@ dependencies: - google-auth-oauthlib - google-cloud-core - google-cloud-storage + - numpy - grpcio - pytest + - pytest-benchmark - pytest-timeout - pytest-asyncio - pytest-subtests + - psutil + - ptable - requests - ujson + - pyyaml - pip: - git+https://github.com/fsspec/filesystem_spec diff --git a/gcsfs/extended_gcsfs.py b/gcsfs/extended_gcsfs.py index 44088d07..ae56b79e 100644 --- a/gcsfs/extended_gcsfs.py +++ b/gcsfs/extended_gcsfs.py @@ -144,6 +144,7 @@ def _open( path, mode="rb", block_size=None, + cache_type="readahead", cache_options=None, acl=None, consistency=None, @@ -163,6 +164,7 @@ def _open( path, mode, block_size=block_size or self.default_block_size, + cache_type=cache_type, cache_options=cache_options, consistency=consistency or self.consistency, metadata=metadata, diff --git a/gcsfs/tests/conftest.py b/gcsfs/tests/conftest.py index ad856125..dd7a5cc9 100644 --- a/gcsfs/tests/conftest.py +++ b/gcsfs/tests/conftest.py @@ -66,6 +66,12 @@ params = dict() +BUCKET_NAME_MAP = { + "regional": TEST_BUCKET, + "zonal": TEST_ZONAL_BUCKET, + "hns": TEST_HNS_BUCKET, +} + def stop_docker(container): cmd = shlex.split('docker ps -a -q --filter "name=%s"' % container) diff --git a/gcsfs/tests/perf/microbenchmarks/README.md b/gcsfs/tests/perf/microbenchmarks/README.md new file mode 100644 index 00000000..a494ec3f --- /dev/null +++ b/gcsfs/tests/perf/microbenchmarks/README.md @@ -0,0 +1,192 @@ +# GCSFS Microbenchmarks + +## Introduction + +This document describes the microbenchmark suite for `gcsfs`. These benchmarks are designed to measure the performance of various I/O operations under different conditions. They are built using `pytest` and the `pytest-benchmark` plugin to provide detailed performance metrics for single-threaded, multi-threaded, and multi-process scenarios. + +## Prerequisites + +Before running the benchmarks, ensure you have installed the project's dependencies for performance testing. This can be done by running the following command from the root of the repository: +```bash +pip install -r gcsfs/tests/perf/microbenchmarks/requirements.txt +``` + +This will install `pytest`, `pytest-benchmark`, and other necessary dependencies. +For more information on `pytest-benchmark`, you can refer to its official documentation. [1] + +## Read Benchmarks + +The read benchmarks are located in `gcsfs/tests/perf/microbenchmarks/read/` and are designed to test read performance with various configurations. + +### Parameters + +The read benchmarks are defined by the `ReadBenchmarkParameters` class in `read/parameters.py`. Key parameters include: + +* `name`: The name of the benchmark configuration. +* `num_files`: The number of files to use, this is always num_processes x num_threads. +* `pattern`: Read pattern, either sequential (`seq`) or random (`rand`). +* `num_threads`: Number of threads for multi-threaded tests. +* `num_processes`: Number of processes for multi-process tests. +* `block_size_bytes`: The block size for gcsfs file buffering. Defaults to `16MB`. +* `chunk_size_bytes`: The size of each read operation. Defaults to `16MB`. +* `file_size_bytes`: The total size of each file. +* `rounds`: The total number of pytest-benchmark rounds for each parameterized test. Defaults to `10`. + + +To ensure that the results are stable and not skewed by outliers, each benchmark is run for a set number of rounds. +By default, this is set to 10 rounds, but it can be configured via `rounds` parameter if needed. This helps in providing a more accurate and reliable performance profile. + +### Configurations + +The base configurations in `read/configs.yaml` are simplified to just `read_seq` and `read_rand`. Decorators are then used to generate a full suite of test cases by creating variations for parallelism, file sizes, and bucket types. + +The benchmarks are split into three main test functions based on the execution model: + +* `test_read_single_threaded`: Measures baseline performance of read operations. +* `test_read_multi_threaded`: Measures performance with multiple threads. +* `test_read_multi_process`: Measures performance using multiple processes, each with its own set of threads. + +### Running Benchmarks with `pytest` + +You can use `pytest` to run the benchmarks directly. +The `GCSFS_BENCHMARK_FILTER` option is useful for filtering tests by name. + +**Examples:** + +Run all read benchmarks: +```bash +pytest gcsfs/tests/perf/microbenchmarks/read/ +``` + +Run a specific benchmark(s) configuration by setting `GCSFS_BENCHMARK_FILTER` environment variable which expect comma separated configuration names. +This is useful for targeting specific configuration(s) defined in `read/configs.yaml`. + +For example, if you want to run multi process sequential and random reads only, you can set: +```bash +export GCSFS_BENCHMARK_FILTER="read_seq_multi_process, read_rand_multi_process" +pytest gcsfs/tests/perf/microbenchmarks/read/ +``` + +## Function-level Fixture: `gcsfs_benchmark_read_write` + +A function-level `pytest` fixture named `gcsfs_benchmark_read_write` (defined in `conftest.py`) is used to set up and tear down the environment for the benchmarks. + +### Setup and Teardown + +* **Setup**: Before a benchmark function runs, this fixture creates the specified number of files with the configured size in a temporary directory within the test bucket. It uses `os.urandom()` to write data in chunks to avoid high memory usage. +* **Teardown**: After the benchmark completes, the fixture recursively deletes the temporary directory and all the files created during the setup phase. + +Here is how the fixture is used in a test: + +```python +@pytest.mark.parametrize( + "gcsfs_benchmark_read_write", + single_threaded_cases, + indirect=True, + ids=lambda p: p.name, +) +def test_read_single_threaded(benchmark, gcsfs_benchmark_read_write): + gcs, file_paths, params = gcsfs_benchmark_read_write + # ... benchmark logic ... +``` + +### Environment Variables +To run the benchmarks, you need to configure your environment. +The orchestrator script (`run.py`) sets these for you, but if you are running `pytest` directly, you will need to export them. + +* `GCSFS_TEST_BUCKET`: The name of a regional GCS bucket. +* `GCSFS_ZONAL_TEST_BUCKET`: The name of a zonal GCS bucket. +* `GCSFS_HNS_TEST_BUCKET`: The name of an HNS-enabled GCS bucket. + +You must also set the following environment variables to ensure that the benchmarks run against the live GCS API and that experimental features are enabled. + +```bash +export STORAGE_EMULATOR_HOST="https://storage.googleapis.com" +export GCSFS_EXPERIMENTAL_ZB_HNS_SUPPORT="true" +``` + +## Orchestrator Script (`run.py`) + +An orchestrator script, `run.py`, is provided to simplify running the benchmark suite. It wraps `pytest`, sets up the necessary environment variables, and generates a summary report. + +### Parameters + +The script accepts several command-line arguments: + +* `--group`: The benchmark group to run (e.g., `read`). +* `--config`: The name of a specific benchmark configuration to run (e.g., `read_seq`). +* `--regional-bucket`: Name of the Regional GCS bucket. +* `--zonal-bucket`: Name of the Zonal GCS bucket. +* `--hns-bucket`: Name of the HNS GCS bucket. +* `--log`: Set to `true` to enable `pytest` console logging. +* `--log-level`: Sets the log level (e.g., `INFO`, `DEBUG`). + +**Important Notes:** +* You must provide at least one bucket name (`--regional-bucket`, `--zonal-bucket`, or `--hns-bucket`). + +Run the script with `--help` to see all available options: +```bash +python gcsfs/tests/perf/microbenchmarks/run.py --help +``` + +### Examples + +Here are some examples of how to use the orchestrator script from the root of the `gcsfs` repository: + +Run all available benchmarks against a regional bucket with default settings. This is the simplest way to trigger all tests across all groups (e.g., read, write): +```bash +python gcsfs/tests/perf/microbenchmarks/run.py --regional-bucket your-regional-bucket +``` + +Run only the `read` group benchmarks against a regional bucket with the default 128MB file size: +```bash +python gcsfs/tests/perf/microbenchmarks/run.py --group read --regional-bucket your-regional-bucket +``` + +Run only the single-threaded sequential read benchmark with 256MB and 512MB file sizes: +```bash +python gcsfs/tests/perf/microbenchmarks/run.py \ + --group read \ + --config "read_seq" \ + --regional-bucket your-regional-bucket +``` + +Run all read benchmarks against both a regional and a zonal bucket: +```bash +python gcsfs/tests/perf/microbenchmarks/run.py \ + --group read \ + --regional-bucket your-regional-bucket \ + --zonal-bucket your-zonal-bucket +``` + +### Script Output + +The script will create a timestamped directory in `gcsfs/tests/perf/microbenchmarks/__run__/` containing the JSON and CSV results, and it will print a summary table to the console. + +#### JSON File (`results.json`) + +The `results.json` file will contain a structured representation of the benchmark results. +The exact content can vary depending on the pytest-benchmark version and the tests run, but it typically includes: +* machine_info: Details about the system where the benchmarks were run (e.g., Python version, OS, CPU). +* benchmarks: A list of individual benchmark results, each containing: + * name: The name of the benchmark test. + * stats: Performance statistics like min, max, mean, stddev, rounds, iterations, ops (operations per second), q1, q3 (quartiles). + * options: Configuration options used for the benchmark (e.g., min_rounds, max_time). + * extra_info: Any additional information associated with the benchmark. + +#### CSV File (`results.csv`) +The CSV file provides a detailed performance profile of gcsfs operations, allowing for analysis of how different factors like threading, process parallelism, and access patterns affect I/O throughput. +This file is a summarized view of the results generated in the JSON file and for each test run, the file records detailed performance statistics, including: +* Minimum, maximum, mean, and median execution times in secs. +* Standard deviation and percentile values (p90, p95, p99) for timing. +* The maximum throughput achieved, measured in Megabytes per second (MB/s). +* The maximum CPU and memory used during the test + + +#### Summary Table +The script also puts out a nice summary table like below, for quick glance at results. + +| Bucket Type | Group | Pattern | Files | Threads | Processes | File Size (MB) | Chunk Size (MB) | Block Size (MB) | Min Latency (s) | Mean Latency (s) | Max Throughput (MB/s) | Max CPU (%) | Max Memory (MB) | +| :--- | :--- | :--- | :--- | :--- | :--- | :--- | :--- | :--- | :--- | :--- | :--- | :--- | :--- | +| regional | read | seq | 1 | 1 | 1 | 128.00 | 16.00 | 16.00 | 0.6391 | 0.7953 | 200.2678 | 0.26 | 507 +| regional | read | rand | 1 | 1 | 1 | 128.00 | 16.00 | 16.00 | 0.6537 | 0.7843 | 195.8066 | 5.6 | 510 diff --git a/gcsfs/tests/perf/microbenchmarks/conftest.py b/gcsfs/tests/perf/microbenchmarks/conftest.py new file mode 100644 index 00000000..d5ded283 --- /dev/null +++ b/gcsfs/tests/perf/microbenchmarks/conftest.py @@ -0,0 +1,180 @@ +import logging +import multiprocessing +import os +import statistics +import time +import uuid +from typing import Any, List + +import pytest +from resource_monitor import ResourceMonitor + +MB = 1024 * 1024 + +try: + # This import is used to check if the pytest-benchmark plugin is installed. + import pytest_benchmark # noqa: F401 + + benchmark_plugin_installed = True +except ImportError: + benchmark_plugin_installed = False + + +def _write_file(gcs, path, file_size, chunk_size): + chunks_to_write = file_size // chunk_size + remainder = file_size % chunk_size + with gcs.open(path, "wb") as f: + for _ in range(chunks_to_write): + f.write(os.urandom(chunk_size)) + if remainder > 0: + f.write(os.urandom(remainder)) + + +def _prepare_files(gcs, file_paths, file_size): + chunk_size = min(100 * MB, file_size) + args = [(gcs, path, file_size, chunk_size) for path in file_paths] + ctx = multiprocessing.get_context("spawn") + with ctx.Pool(16) as pool: + pool.starmap(_write_file, args) + + +@pytest.fixture +def monitor(): + """ + Provides the ResourceMonitor class. + Usage: with monitor() as m: ... + """ + return ResourceMonitor + + +@pytest.fixture +def gcsfs_benchmark_read_write(extended_gcs_factory, request): + """ + A fixture that creates temporary files for a benchmark run and cleans + them up afterward. + + It uses the `BenchmarkParameters` object from the test's parametrization + to determine how many files to create and of what size. + """ + params = request.param + gcs = extended_gcs_factory(block_size=params.block_size_bytes) + + prefix = f"{params.bucket_name}/benchmark-files-{uuid.uuid4()}" + file_paths = [f"{prefix}/file_{i}" for i in range(params.num_files)] + + logging.info( + f"Setting up benchmark '{params.name}': creating {params.num_files} file(s) " + f"of size {params.file_size_bytes / MB:.2f} MB each." + ) + + start_time = time.perf_counter() + # Create all files in parallel, 16 at a time + _prepare_files(gcs, file_paths, params.file_size_bytes) + + duration_ms = (time.perf_counter() - start_time) * 1000 + logging.info( + f"Benchmark '{params.name}' setup created {params.num_files} files in {duration_ms:.2f} ms." + ) + + yield gcs, file_paths, params + + # --- Teardown --- + logging.info(f"Tearing down benchmark '{params.name}': deleting files.") + try: + for path in file_paths: + gcs.rm(path) + except Exception as e: + logging.error(f"Failed to clean up benchmark files: {e}") + + +if benchmark_plugin_installed: + + def pytest_benchmark_generate_json(config, benchmarks, machine_info, commit_info): + """ + Hook to post-process benchmark results before generating the JSON report. + """ + for bench in benchmarks: + if "timings" in bench.get("extra_info", {}): + bench.stats.data = bench.extra_info["timings"] + bench.stats.min = bench.extra_info["min_time"] + bench.stats.max = bench.extra_info["max_time"] + bench.stats.mean = bench.extra_info["mean_time"] + bench.stats.median = bench.extra_info["median_time"] + bench.stats.stddev = bench.extra_info["stddev_time"] + bench.stats.rounds = bench.extra_info["rounds"] + + del bench.extra_info["timings"] + del bench.extra_info["min_time"] + del bench.extra_info["max_time"] + del bench.extra_info["mean_time"] + del bench.extra_info["median_time"] + del bench.extra_info["stddev_time"] + + +def publish_benchmark_extra_info( + benchmark: Any, params: Any, benchmark_group: str +) -> None: + """ + Helper function to publish benchmark parameters to the extra_info property. + """ + benchmark.extra_info["num_files"] = params.num_files + benchmark.extra_info["file_size"] = params.file_size_bytes + benchmark.extra_info["chunk_size"] = params.chunk_size_bytes + benchmark.extra_info["block_size"] = params.block_size_bytes + benchmark.extra_info["pattern"] = params.pattern + benchmark.extra_info["threads"] = params.num_threads + benchmark.extra_info["rounds"] = params.rounds + benchmark.extra_info["bucket_name"] = params.bucket_name + benchmark.extra_info["bucket_type"] = params.bucket_type + benchmark.extra_info["processes"] = params.num_processes + benchmark.group = benchmark_group + + +def publish_resource_metrics(benchmark: Any, monitor: ResourceMonitor) -> None: + """ + Helper function to publish resource monitor results to the extra_info property. + """ + benchmark.extra_info.update( + { + "cpu_max_global": f"{monitor.max_cpu:.2f}", + "mem_max": f"{monitor.max_mem:.2f}", + "net_throughput_mb_s": f"{monitor.throughput_mb_s:.2f}", + "vcpus": monitor.vcpus, + } + ) + + +def publish_multi_process_benchmark_extra_info( + benchmark: Any, round_durations_s: List[float], params: Any +) -> None: + """ + Calculate statistics for multi-process benchmarks and publish them + to extra_info. + """ + if not round_durations_s: + return + + min_time = min(round_durations_s) + max_time = max(round_durations_s) + mean_time = statistics.mean(round_durations_s) + median_time = statistics.median(round_durations_s) + stddev_time = ( + statistics.stdev(round_durations_s) if len(round_durations_s) > 1 else 0.0 + ) + + # Build the results table as a single multi-line string to log it cleanly. + results_table = ( + f"\n{'-' * 90}\n" + f"{'Name (time in s)':<50s} {'Min':>8s} {'Max':>8s} {'Mean':>8s} {'Rounds':>8s}\n" + f"{'-' * 90}\n" + f"{params.name:<50s} {min_time:>8.4f} {max_time:>8.4f} {mean_time:>8.4f} {params.rounds:>8d}\n" + f"{'-' * 90}" + ) + logging.info(f"Multi-process benchmark results:{results_table}") + + benchmark.extra_info["timings"] = round_durations_s + benchmark.extra_info["min_time"] = min_time + benchmark.extra_info["max_time"] = max_time + benchmark.extra_info["mean_time"] = mean_time + benchmark.extra_info["median_time"] = median_time + benchmark.extra_info["stddev_time"] = stddev_time diff --git a/gcsfs/tests/perf/microbenchmarks/read/configs.py b/gcsfs/tests/perf/microbenchmarks/read/configs.py new file mode 100644 index 00000000..f49986f2 --- /dev/null +++ b/gcsfs/tests/perf/microbenchmarks/read/configs.py @@ -0,0 +1,78 @@ +import itertools +import logging +import os + +import yaml + +from gcsfs.tests.conftest import BUCKET_NAME_MAP +from gcsfs.tests.perf.microbenchmarks.conftest import MB +from gcsfs.tests.perf.microbenchmarks.read.parameters import ReadBenchmarkParameters +from gcsfs.tests.settings import BENCHMARK_FILTER + + +def _generate_benchmark_cases(): + """ + Generates benchmark cases by reading the configuration from configs.yaml. + """ + config_path = os.path.join(os.path.dirname(__file__), "configs.yaml") + with open(config_path, "r") as f: + config = yaml.safe_load(f) + + common_config = config["common"] + scenarios = config["scenarios"] + + if BENCHMARK_FILTER: + filter_names = [name.strip().lower() for name in BENCHMARK_FILTER.split(",")] + scenarios = [s for s in scenarios if s["name"].lower() in filter_names] + all_cases = [] + + for scenario in scenarios: + procs_list = scenario.get("processes", [1]) + threads_list = scenario.get("threads", [1]) + file_sizes_mb = common_config.get("file_sizes_mb", [128]) + block_sizes_mb = common_config.get("block_sizes_mb", [16]) + bucket_types = common_config.get("bucket_types", ["regional"]) + + param_combinations = itertools.product( + procs_list, threads_list, file_sizes_mb, block_sizes_mb, bucket_types + ) + + for procs, threads, size_mb, block_size_mb, bucket_type in param_combinations: + bucket_name = BUCKET_NAME_MAP.get(bucket_type) + if not bucket_name: + continue + + name = ( + f"{scenario['name']}_{procs}procs_{threads}threads_" + f"{size_mb}MB_file_{block_size_mb}MB_block_{bucket_type}" + ) + + params = ReadBenchmarkParameters( + name=name, + pattern=scenario["pattern"], + bucket_name=bucket_name, + bucket_type=bucket_type, + num_threads=threads, + num_processes=procs, + num_files=threads * procs, + rounds=common_config.get("rounds", 10), + block_size_bytes=block_size_mb * MB, + chunk_size_bytes=block_size_mb + * MB, # Always keep chunk size same as block size for effective readahead caching + file_size_bytes=size_mb * MB, + ) + all_cases.append(params) + + return all_cases + + +def get_read_benchmark_cases(): + """ + Returns a list of ReadBenchmarkParameters, optionally filtered by + the GCSFS_BENCHMARK_FILTER environment variable. + """ + all_cases = _generate_benchmark_cases() + logging.info( + f"Benchmark cases to be triggered: {', '.join([case.name for case in all_cases])}" + ) + return all_cases diff --git a/gcsfs/tests/perf/microbenchmarks/read/configs.yaml b/gcsfs/tests/perf/microbenchmarks/read/configs.yaml new file mode 100644 index 00000000..a061a186 --- /dev/null +++ b/gcsfs/tests/perf/microbenchmarks/read/configs.yaml @@ -0,0 +1,30 @@ +common: + bucket_types: + - "regional" + - "zonal" + file_sizes_mb: + - 1024 # 1GB + block_sizes_mb: + - 64 + rounds: 10 + +scenarios: + - name: "read_seq" + pattern: "seq" + threads: [1,4,8] + processes: [1] + + - name: "read_seq_multi_process" + pattern: "seq" + threads: [1,2] + processes: [16] + + - name: "read_rand" + pattern: "rand" + threads: [1,4,8] + processes: [1] + + - name: "read_rand_multi_process" + pattern: "rand" + threads: [1,2] + processes: [16] diff --git a/gcsfs/tests/perf/microbenchmarks/read/parameters.py b/gcsfs/tests/perf/microbenchmarks/read/parameters.py new file mode 100644 index 00000000..bb674508 --- /dev/null +++ b/gcsfs/tests/perf/microbenchmarks/read/parameters.py @@ -0,0 +1,43 @@ +from dataclasses import dataclass + +from gcsfs.tests.perf.microbenchmarks.conftest import MB + + +@dataclass +class ReadBenchmarkParameters: + """ + Defines the parameters for a read benchmark test cases. + """ + + # The name of config + name: str + + # Read pattern: "seq" for sequential, "rand" for random. + pattern: str + + # The name of the GCS bucket to use for the benchmark. + bucket_name: str = "" + + # The type of the bucket, e.g., "regional", "zonal", "hns". + bucket_type: str = "" + + # Number of threads for multi-threaded tests, default to 1. + num_threads: int = 1 + + # Number of processes for multi-process tests, default to 1. + num_processes: int = 1 + + # Number of files to create for the benchmark. + num_files: int = 1 + + # Number of rounds for the benchmark, default to 10. + rounds: int = 10 + + # The block size for gcsfs file buffering default to 16MB. + block_size_bytes: int = 16 * MB + + # The size of each read or write operation in bytes default to 16MB. + chunk_size_bytes: int = 16 * MB + + # Size of each file in bytes + file_size_bytes: int = 128 * MB diff --git a/gcsfs/tests/perf/microbenchmarks/read/test_read.py b/gcsfs/tests/perf/microbenchmarks/read/test_read.py new file mode 100644 index 00000000..10b71eeb --- /dev/null +++ b/gcsfs/tests/perf/microbenchmarks/read/test_read.py @@ -0,0 +1,234 @@ +import logging +import multiprocessing +import random +import time +from concurrent.futures import ThreadPoolExecutor + +import pytest + +from gcsfs.tests.perf.microbenchmarks.conftest import ( + publish_benchmark_extra_info, + publish_multi_process_benchmark_extra_info, + publish_resource_metrics, +) +from gcsfs.tests.perf.microbenchmarks.read.configs import get_read_benchmark_cases +from gcsfs.tests.settings import BENCHMARK_SKIP_TESTS + +pytestmark = pytest.mark.skipif( + BENCHMARK_SKIP_TESTS, + reason="""Skipping benchmark tests. +Set GCSFS_BENCHMARK_SKIP_TESTS=false to run them, +or use the orchestrator script at gcsfs/tests/perf/microbenchmarks/run.py""", +) + +BENCHMARK_GROUP = "read" + + +def _read_op_seq(gcs, path, chunk_size): + start_time = time.perf_counter() + with gcs.open(path, "rb") as f: + while f.read(chunk_size): + pass + duration_ms = (time.perf_counter() - start_time) * 1000 + logging.info(f"SEQ_READ : {path} - {duration_ms:.2f} ms.") + + +def _read_op_rand(gcs, path, chunk_size, offsets): + start_time = time.perf_counter() + # Random benchmarks should not prefetch + with gcs.open(path, "rb", cache_type="none") as f: + for offset in offsets: + f.seek(offset) + f.read(chunk_size) + duration_ms = (time.perf_counter() - start_time) * 1000 + logging.info(f"RAND_READ : {path} - {duration_ms:.2f} ms.") + + +def _random_read_worker(gcs, path, chunk_size, offsets): + """A worker that reads a file from random offsets.""" + local_offsets = list(offsets) + random.shuffle(local_offsets) + _read_op_rand(gcs, path, chunk_size, local_offsets) + + +all_benchmark_cases = get_read_benchmark_cases() + +single_threaded_cases = [ + p for p in all_benchmark_cases if p.num_threads == 1 and p.num_processes == 1 +] +multi_threaded_cases = [ + p for p in all_benchmark_cases if p.num_threads > 1 and p.num_processes == 1 +] +multi_process_cases = [p for p in all_benchmark_cases if p.num_processes > 1] + + +@pytest.mark.parametrize( + "gcsfs_benchmark_read_write", + single_threaded_cases, + indirect=True, + ids=lambda p: p.name, +) +def test_read_single_threaded(benchmark, gcsfs_benchmark_read_write, monitor): + gcs, file_paths, params = gcsfs_benchmark_read_write + + publish_benchmark_extra_info(benchmark, params, BENCHMARK_GROUP) + path = file_paths[0] + + op = None + op_args = None + if params.pattern == "seq": + op = _read_op_seq + op_args = (gcs, path, params.chunk_size_bytes) + elif params.pattern == "rand": + offsets = list(range(0, params.file_size_bytes, params.chunk_size_bytes)) + op = _random_read_worker + op_args = (gcs, path, params.chunk_size_bytes, offsets) + + with monitor() as m: + benchmark.pedantic(op, rounds=params.rounds, args=op_args) + + publish_resource_metrics(benchmark, m) + + +@pytest.mark.parametrize( + "gcsfs_benchmark_read_write", + multi_threaded_cases, + indirect=True, + ids=lambda p: p.name, +) +def test_read_multi_threaded(benchmark, gcsfs_benchmark_read_write, monitor): + gcs, file_paths, params = gcsfs_benchmark_read_write + + publish_benchmark_extra_info(benchmark, params, BENCHMARK_GROUP) + + def run_benchmark(): + logging.info("Multi-threaded benchmark: Starting benchmark round.") + with ThreadPoolExecutor(max_workers=params.num_threads) as executor: + if params.pattern == "seq": + # Each thread reads one full file sequentially. + futures = [ + executor.submit(_read_op_seq, gcs, path, params.chunk_size_bytes) + for path in file_paths + ] + list(futures) # Wait for completion + + elif params.pattern == "rand": + + offsets = list( + range(0, params.file_size_bytes, params.chunk_size_bytes) + ) + + if params.num_files == 1: + # All threads read the same file randomly. + paths_to_read = [file_paths[0]] * params.num_threads + else: + # Each thread reads a different file randomly. + paths_to_read = file_paths + + futures = [ + executor.submit( + _random_read_worker, gcs, path, params.chunk_size_bytes, offsets + ) + for path in paths_to_read + ] + list(futures) # Wait for completion + + with monitor() as m: + benchmark.pedantic(run_benchmark, rounds=params.rounds) + + publish_resource_metrics(benchmark, m) + + +def _process_worker( + gcs, + file_paths, + chunk_size, + num_threads, + pattern, + file_size_bytes, + process_durations_shared, + index, +): + """A worker function for each process to read a list of files.""" + start_time = time.perf_counter() + with ThreadPoolExecutor(max_workers=num_threads) as executor: + if pattern == "seq": + futures = [ + executor.submit(_read_op_seq, gcs, path, chunk_size) + for path in file_paths + ] + elif pattern == "rand": + offsets = list(range(0, file_size_bytes, chunk_size)) + + futures = [ + executor.submit(_random_read_worker, gcs, path, chunk_size, offsets) + for path in file_paths + ] + + # Wait for all threads in the process to complete + list(futures) + duration_s = time.perf_counter() - start_time + process_durations_shared[index] = duration_s + + +@pytest.mark.parametrize( + "gcsfs_benchmark_read_write", + multi_process_cases, + indirect=True, + ids=lambda p: p.name, +) +def test_read_multi_process(benchmark, gcsfs_benchmark_read_write, request, monitor): + gcs, file_paths, params = gcsfs_benchmark_read_write + publish_benchmark_extra_info(benchmark, params, BENCHMARK_GROUP) + + if multiprocessing.get_start_method(allow_none=True) != "spawn": + multiprocessing.set_start_method("spawn", force=True) + + process_durations_shared = multiprocessing.Array("d", params.num_processes) + files_per_process = params.num_files // params.num_processes + threads_per_process = params.num_threads + + round_durations_s = [] + with monitor() as m: + for _ in range(params.rounds): + logging.info("Multi-process benchmark: Starting benchmark round.") + processes = [] + + for i in range(params.num_processes): + if params.num_files > 1: + start_index = i * files_per_process + end_index = start_index + files_per_process + process_files = file_paths[start_index:end_index] + else: # num_files == 1 + # Each process will have its threads read from the same single file + process_files = [file_paths[0]] * threads_per_process + + p = multiprocessing.Process( + target=_process_worker, + args=( + gcs, + process_files, + params.chunk_size_bytes, + threads_per_process, + params.pattern, + params.file_size_bytes, + process_durations_shared, + i, + ), + ) + processes.append(p) + p.start() + + for p in processes: + p.join() + + # The round duration is the time of the slowest process + round_durations_s.append(max(process_durations_shared[:])) + + publish_multi_process_benchmark_extra_info(benchmark, round_durations_s, params) + publish_resource_metrics(benchmark, m) + + # If --benchmark-json is passed, add a dummy benchmark run to generate a + # report entry that can be updated via the hook with timings. + if request.config.getoption("benchmark_json"): + benchmark.pedantic(lambda: None, rounds=1, iterations=1, warmup_rounds=0) diff --git a/gcsfs/tests/perf/microbenchmarks/requirements.txt b/gcsfs/tests/perf/microbenchmarks/requirements.txt new file mode 100644 index 00000000..c45213ac --- /dev/null +++ b/gcsfs/tests/perf/microbenchmarks/requirements.txt @@ -0,0 +1,7 @@ +numpy +psutil +ptable +pytest +pytest-benchmark +pytest-timeout +PyYAML diff --git a/gcsfs/tests/perf/microbenchmarks/resource_monitor.py b/gcsfs/tests/perf/microbenchmarks/resource_monitor.py new file mode 100644 index 00000000..317a80eb --- /dev/null +++ b/gcsfs/tests/perf/microbenchmarks/resource_monitor.py @@ -0,0 +1,86 @@ +import threading +import time + +import psutil + + +class ResourceMonitor: + def __init__(self): + self.interval = 1.0 + + self.vcpus = psutil.cpu_count() or 1 + self.max_cpu = 0.0 + self.max_mem = 0.0 + + # Network and Time tracking + self.start_time = 0.0 + self.duration = 0.0 + self.start_net = None + self.net_sent_mb = 0.0 + self.net_recv_mb = 0.0 + + self._stop_event = threading.Event() + self._thread = None + + def __enter__(self): + self.start_net = psutil.net_io_counters() + self.start_time = time.perf_counter() + self.start() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.stop() + self.duration = time.perf_counter() - self.start_time + end_net = psutil.net_io_counters() + + self.net_sent_mb = (end_net.bytes_sent - self.start_net.bytes_sent) / ( + 1024 * 1024 + ) + self.net_recv_mb = (end_net.bytes_recv - self.start_net.bytes_recv) / ( + 1024 * 1024 + ) + + def _monitor(self): + psutil.cpu_percent(interval=None) + current_process = psutil.Process() + while not self._stop_event.is_set(): + try: + # CPU and Memory tracking for current process tree + total_cpu = current_process.cpu_percent(interval=None) + current_mem = current_process.memory_info().rss + for child in current_process.children(recursive=True): + try: + total_cpu += child.cpu_percent(interval=None) + current_mem += child.memory_info().rss + except (psutil.NoSuchProcess, psutil.AccessDenied): + continue + + # Normalize CPU by number of vcpus + global_cpu = total_cpu / self.vcpus + + mem = current_mem + + if global_cpu > self.max_cpu: + self.max_cpu = global_cpu + if mem > self.max_mem: + self.max_mem = mem + except psutil.NoSuchProcess: + pass + + time.sleep(self.interval) + + def start(self): + self._thread = threading.Thread(target=self._monitor, daemon=True) + self._thread.start() + + def stop(self): + self._stop_event.set() + if self._thread: + self._thread.join() + + @property + def throughput_mb_s(self): + """Calculates combined network throughput.""" + if self.duration <= 0: + return 0.0 + return (self.net_sent_mb + self.net_recv_mb) / self.duration diff --git a/gcsfs/tests/perf/microbenchmarks/run.py b/gcsfs/tests/perf/microbenchmarks/run.py new file mode 100644 index 00000000..cfb54048 --- /dev/null +++ b/gcsfs/tests/perf/microbenchmarks/run.py @@ -0,0 +1,333 @@ +import argparse +import csv +import json +import logging +import os +import subprocess +import sys +from datetime import datetime + +import numpy as np +from conftest import MB +from prettytable import PrettyTable + + +def _setup_environment(args): + """ + Validate command-line arguments and configure environment variables. + + This function checks for required arguments (like bucket names) and sets + up the necessary environment variables for the benchmark execution. + + Args: + args (argparse.Namespace): The parsed command-line arguments. + + """ + # Validate that at least one bucket is provided + if not any([args.regional_bucket, args.zonal_bucket, args.hns_bucket]): + logging.error( + "At least one of --regional-bucket, --zonal-bucket, or --hns-bucket must be provided." + ) + sys.exit(1) + + # Set environment variables for buckets + os.environ["GCSFS_TEST_BUCKET"] = ( + args.regional_bucket if args.regional_bucket else "" + ) + os.environ["GCSFS_ZONAL_TEST_BUCKET"] = ( + args.zonal_bucket if args.zonal_bucket else "" + ) + os.environ["GCSFS_HNS_TEST_BUCKET"] = args.hns_bucket if args.hns_bucket else "" + os.environ["GCSFS_EXPERIMENTAL_ZB_HNS_SUPPORT"] = "true" + os.environ["STORAGE_EMULATOR_HOST"] = "https://storage.googleapis.com" + os.environ["GCSFS_BENCHMARK_SKIP_TESTS"] = "false" + + if args.config: + os.environ["GCSFS_BENCHMARK_FILTER"] = ",".join(args.config) + + +def _run_benchmarks(results_dir, args): + """Execute the benchmark suite using pytest. + + This function constructs and runs a pytest command to execute the benchmarks. + It captures the output in a JSON file and handles logging and test filtering + based on the provided arguments. + + Args: + results_dir (str): The directory where benchmark results will be saved. + args (argparse.Namespace): The parsed command-line arguments. + + Returns: + str: The path to the generated JSON results file. + """ + logging.info(f"Starting benchmark run for group: {args.group}") + + base_path = os.path.dirname(__file__) + if args.group: + benchmark_path = os.path.join(base_path, args.group) + if not os.path.isdir(benchmark_path): + logging.error(f"Benchmark group directory not found: {benchmark_path}") + sys.exit(1) + else: + benchmark_path = base_path + + json_output_path = os.path.join(results_dir, "results.json") + + pytest_command = [ + sys.executable, + "-m", + "pytest", + benchmark_path, + f"--benchmark-json={json_output_path}", + ] + + if args.log: + pytest_command.extend( + [ + "-o", + f"log_cli={args.log}", + "-o", + f"log_cli_level={args.log_level.upper()}", + ] + ) + + logging.info(f"Executing command: {' '.join(pytest_command)}") + + try: + env = os.environ.copy() + subprocess.run(pytest_command, check=True, env=env, text=True) + logging.info(f"Benchmark run completed. Results saved to {json_output_path}") + except subprocess.CalledProcessError as e: + logging.error( + f"Benchmark run completed with error: {e}, results saved to {json_output_path}" + ) + except FileNotFoundError: + logging.error( + "pytest not found. Please ensure it is installed in your environment." + ) + sys.exit(1) + + return json_output_path + + +def _process_benchmark_result(bench, headers, extra_info_headers, stats_headers): + """ + Process a single benchmark result and prepare it for CSV reporting. + + This function extracts relevant statistics and metadata from a benchmark + run, calculates derived metrics like percentiles and throughput, and + formats it as a dictionary. + + Args: + bench (dict): The dictionary for a single benchmark from the JSON output. + headers (list): The list of all header names for the CSV. + extra_info_headers (list): Headers from the 'extra_info' section. + stats_headers (list): Headers from the 'stats' section. + + """ + row = {h: "" for h in headers} + row["name"] = bench["name"] + row["group"] = bench.get("group", "") + + # Populate extra_info and stats + for key in extra_info_headers: + row[key] = bench["extra_info"].get(key) + for key in stats_headers: + row[key] = bench["stats"].get(key) + + # Calculate percentiles + timings = bench["stats"].get("data") + if timings: + row["p90"] = np.percentile(timings, 90) + row["p95"] = np.percentile(timings, 95) + row["p99"] = np.percentile(timings, 99) + + # Calculate max throughput + file_size = bench["extra_info"].get("file_size", 0) + num_files = bench["extra_info"].get("num_files", 1) + total_bytes = file_size * num_files + + min_time = bench["stats"].get("min") + if min_time and min_time > 0: + row["max_throughput_mb_s"] = (total_bytes / min_time) / MB + else: + row["max_throughput_mb_s"] = "0.0" + + return row + + +def _generate_report(json_path, results_dir): + """Generate a CSV summary report from the pytest-benchmark JSON output. + + Args: + json_path (str): The path to the JSON file containing benchmark results. + results_dir (str): The directory where the CSV report will be saved. + + Returns: + str: The path to the generated CSV report file. + + """ + logging.info(f"Generating CSV report from {json_path}") + + with open(json_path, "r") as f: + data = json.load(f) + + report_path = os.path.join(results_dir, "results.csv") + + # Dynamically get headers from the first benchmark's extra_info and stats + first_benchmark = data["benchmarks"][0] + extra_info_headers = sorted(first_benchmark["extra_info"].keys()) + stats_headers = ["min", "max", "mean", "median", "stddev"] + custom_headers = ["p90", "p95", "p99", "max_throughput_mb_s"] + + headers = ["name", "group"] + extra_info_headers + stats_headers + custom_headers + + with open(report_path, "w", newline="") as f: + writer = csv.writer(f) + writer.writerow(headers) + + for bench in data["benchmarks"]: + row = _process_benchmark_result( + bench, headers, extra_info_headers, stats_headers + ) + writer.writerow([row[h] for h in headers]) + + logging.info(f"CSV report generated at {report_path}") + + return report_path + + +def _create_table_row(row): + """ + Format a dictionary of benchmark results into a list for table display. + + Args: + row (dict): A dictionary representing a single row from the CSV report. + + Returns: + list: A list of formatted values ready for printing in a table. + + """ + return [ + row.get("bucket_type", ""), + row.get("group", ""), + row.get("pattern", ""), + row.get("num_files", ""), + row.get("threads", ""), + row.get("processes", ""), + f"{float(row.get('file_size', 0)) / MB:.2f}", + f"{float(row.get('chunk_size', 0)) / MB:.2f}", + f"{float(row.get('block_size', 0)) / MB:.2f}", + f"{float(row.get('min', 0)):.4f}", + f"{float(row.get('mean', 0)):.4f}", + float(row.get("max_throughput_mb_s", 0)), + f"{float(row.get('cpu_max_global', 0)):.2f}", + f"{float(row.get('mem_max', 0)) / MB:.2f}", + ] + + +def _print_csv_to_shell(report_path): + """Read a CSV report and print it to the console as a formatted table. + + Args: + report_path (str): The path to the CSV report file. + + """ + try: + with open(report_path, "r") as f: + reader = csv.DictReader(f) + rows = list(reader) + + if not rows: + logging.info("No data to display.") + return + + # Define the headers for the output table + display_headers = [ + "Bucket Type", + "Group", + "Pattern", + "Files", + "Threads", + "Processes", + "File Size (MB)", + "Chunk Size (MB)", + "Block Size (MB)", + "Min Latency (s)", + "Mean Latency (s)", + "Max Throughput(MB/s)", + "Max CPU (%)", + "Max Memory (MB)", + ] + table = PrettyTable() + table.field_names = display_headers + + for row in rows: + table.add_row(_create_table_row(row)) + print(table) + except FileNotFoundError: + logging.error(f"Report file not found at: {report_path}") + + +def main(): + """ + Parse command-line arguments and orchestrate the benchmark execution. + + This is the main entry point of the script. It sets up the environment, + runs the benchmarks, generates reports, and prints a summary to the console. + + """ + parser = argparse.ArgumentParser(description="Run GCSFS performance benchmarks.") + parser.add_argument( + "--group", + help="The benchmark group to run (e.g., 'read'). Runs all if not specified.", + ) + parser.add_argument( + "--config", + nargs="+", + help="The name(s) of the benchmark configuration(s) to run(e.g., --config read_seq_1thread,read_rand_1thread).", + ) + parser.add_argument( + "--regional-bucket", + help="Name of the regional GCS bucket to use for benchmarks.", + ) + parser.add_argument( + "--zonal-bucket", + help="Name of the zonal GCS bucket to use for benchmarks.", + ) + parser.add_argument( + "--hns-bucket", + help="Name of the HNS GCS bucket to use for benchmarks.", + ) + parser.add_argument( + "--log", + default="false", + help="Enable pytest console logging (log_cli=true).", + ) + parser.add_argument( + "--log-level", + default="DEBUG", + help="Set pytest console logging level (e.g., DEBUG, INFO, WARNING). Only effective if --log is enabled.", + ) + args = parser.parse_args() + + _setup_environment(args) + + # Create results directory + timestamp = datetime.now().strftime("%d%m%Y-%H%M%S") + results_dir = os.path.join(os.path.dirname(__file__), "__run__", timestamp) + os.makedirs(results_dir, exist_ok=True) + + # Run benchmarks and generate report + json_result_path = _run_benchmarks(results_dir, args) + if json_result_path: + csv_report_path = _generate_report(json_result_path, results_dir) + if csv_report_path: + _print_csv_to_shell(csv_report_path) + + +if __name__ == "__main__": + logging.basicConfig( + level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" + ) + main() diff --git a/gcsfs/tests/settings.py b/gcsfs/tests/settings.py index 8220a04e..79b15e2b 100644 --- a/gcsfs/tests/settings.py +++ b/gcsfs/tests/settings.py @@ -4,9 +4,19 @@ TEST_VERSIONED_BUCKET = os.getenv("GCSFS_TEST_VERSIONED_BUCKET", "gcsfs_test_versioned") TEST_HNS_BUCKET = os.getenv("GCSFS_HNS_TEST_BUCKET", "gcsfs_hns_test") TEST_ZONAL_BUCKET = os.getenv("GCSFS_ZONAL_TEST_BUCKET", "gcsfs_zonal_test") +TEST_HNS_BUCKET = os.getenv("GCSFS_HNS_TEST_BUCKET", "gcsfs_hns_test") TEST_PROJECT = os.getenv("GCSFS_TEST_PROJECT", "project") TEST_REQUESTER_PAYS_BUCKET = f"{TEST_BUCKET}_req_pay" TEST_KMS_KEY = os.getenv( "GCSFS_TEST_KMS_KEY", f"projects/{TEST_PROJECT}/locations/us/keyRings/gcsfs_test/cryptKeys/gcsfs_test_key", ) + +# ============================================================================= +# Performance Benchmark Settings +# ============================================================================= +BENCHMARK_FILTER = os.environ.get("GCSFS_BENCHMARK_FILTER", "") +BENCHMARK_SKIP_TESTS = os.environ.get("GCSFS_BENCHMARK_SKIP_TESTS", "true").lower() in ( + "true", + "1", +) diff --git a/setup.cfg b/setup.cfg index b3dff84e..c43933a7 100644 --- a/setup.cfg +++ b/setup.cfg @@ -32,6 +32,6 @@ max-line-length = 120 [tool:pytest] addopts = - --color=yes --timeout=600 + --color=yes --timeout=1800 log_cli = false log_cli_level = DEBUG