From cc79e071a0cf58b7d895385868b40fcb406ab6fb Mon Sep 17 00:00:00 2001 From: Robin Voetter Date: Sat, 30 Aug 2025 12:43:12 +0200 Subject: [PATCH 1/3] example: basic distributed profiling --- problems/amd_distributed/eval.py | 67 +++++++++++++++++++++++++++++--- 1 file changed, 62 insertions(+), 5 deletions(-) diff --git a/problems/amd_distributed/eval.py b/problems/amd_distributed/eval.py index c3d20f9..ff4febe 100644 --- a/problems/amd_distributed/eval.py +++ b/problems/amd_distributed/eval.py @@ -499,26 +499,83 @@ def run_benchmarking(logger: PopcornOutput, pool: multiprocessing.Pool, tests: l return 112 -def run_single_profile(test: TestCase) -> str: +def _run_single_profile(test: TestCase) -> str: """ Runs a single test case. Do not call directly """ from submission import custom_kernel - from torch.profiler import profile, record_function, ProfilerActivity + from torch.profiler import profile, ProfilerActivity data = generate_input(**test.args) torch.cuda.synchronize() with profile(activities=[ProfilerActivity.CPU, ProfilerActivity.CUDA]) as prof: submission_output = custom_kernel(_clone_data(data, 0)) torch.cuda.synchronize() + return prof.key_averages().table(sort_by="self_cuda_time_total", row_limit=20) -def run_profiling(logger: PopcornOutput, tests: list[TestCase]): +def _run_distributed_profile(test: TestCase, rank: int) -> profile: + """ + Runs a single profiling case. Do not call directly + """ + from submission import custom_kernel + from torch.profiler import profile, ProfilerActivity + import torch.distributed as dist + world_size = test.args["world_size"] + os.environ["MASTER_ADDR"] = "127.0.0.1" + os.environ["MASTER_PORT"] = "12356" + dist.init_process_group("nccl", init_method="env://", rank=rank, world_size=world_size, device_id=torch.device(f'cuda:{rank}')) + + try: + data = generate_input(**test.args, rank=rank) + data = _clone_data(data, rank) + torch.cuda.synchronize() + + with profile(activities=[ProfilerActivity.CPU, ProfilerActivity.CUDA]) as prof: + submission_output = custom_kernel(data) + torch.cuda.synchronize() + + return prof + + finally: + dist.destroy_process_group() + +def run_multi_gpu_profile(pool: multiprocessing.Pool, test: TestCase, world_size: int) -> str: + """ + Runs a single test in another process. + """ + rets = [] + # world_size is a mandatory argument for multi-gpu tests + for i in range(world_size): + rets.append( + pool.apply_async( + _run_distributed_profile, + args=(test, i), + ) + ) + + rets = [el.get(120) for el in rets] + + # TODO: Combine distributed profiling results? + return rets[0].key_averages().table(sort_by="self_cuda_time_total", row_limit=20) + +def run_single_profile(test: TestCase, pool: multiprocessing.Pool) -> str: + """ + Runs a single profiling activity in another process. + """ + world_size = test.args.get("world_size", None) + if world_size is None: + return pool.apply(_run_single_profile, (test,)) + else: + return run_multi_gpu_profile(pool, test, world_size) + + +def run_profiling(logger: PopcornOutput, pool: multiprocessing.Pool, tests: list[TestCase]): logger.log("benchmark-count", len(tests)) for idx, test in enumerate(tests): logger.log(f"benchmark.{idx}.spec", test.spec) - report = run_single_profile(test) + report = run_single_profile(test, pool) logger.log(f"benchmark.{idx}.report", base64.b64encode(report.encode("utf-8"), b"+*").decode("utf-8")) logger.log("check", "pass") return 0 @@ -568,7 +625,7 @@ def main(): logger.log("check", "pass" if passed else "fail") elif mode == "profile": - run_profiling(logger, tests) + run_profiling(logger, pool, tests) else: # invalid mode return 2 From 53cd59eb9dcdd2444abe9f93e2750934b968c498 Mon Sep 17 00:00:00 2001 From: Robin Voetter Date: Sat, 30 Aug 2025 21:37:04 +0200 Subject: [PATCH 2/3] example: combine distributed profiler activity --- problems/amd_distributed/eval.py | 42 +++++++++++++++++++++++++++++--- 1 file changed, 38 insertions(+), 4 deletions(-) diff --git a/problems/amd_distributed/eval.py b/problems/amd_distributed/eval.py index ff4febe..c99e73f 100644 --- a/problems/amd_distributed/eval.py +++ b/problems/amd_distributed/eval.py @@ -515,7 +515,7 @@ def _run_single_profile(test: TestCase) -> str: return prof.key_averages().table(sort_by="self_cuda_time_total", row_limit=20) -def _run_distributed_profile(test: TestCase, rank: int) -> profile: +def _run_distributed_profile(test: TestCase, rank: int) -> "EventList": """ Runs a single profiling case. Do not call directly """ @@ -536,11 +536,46 @@ def _run_distributed_profile(test: TestCase, rank: int) -> profile: submission_output = custom_kernel(data) torch.cuda.synchronize() - return prof + return prof.events() finally: dist.destroy_process_group() + +def _combine_traces(traces: list["EventList"]) -> "EventList": + """ + Combine multiple event traces obtained from multiple (distributed) torch.profiler + activities. This function simply aggregates the data as like `prof.key_averages()`, + except over multiple traces. Most of this function is reimplemented + from `torch.autograd.profiler_util.EventList.key_averages()`. + """ + from torch.autograd.profiler_util import FunctionEventAvg, EventList + from collections import defaultdict + + def get_key(event) -> tuple[str, ...]: + return ( + str(event.key), + str(event.node_id), + str(event.device_type), + str(event.is_legacy), + str(event.is_user_annotation), + ) + + stats: dict[tuple[str, ...], FunctionEventAvg] = defaultdict(FunctionEventAvg) + + for events in traces: + for event in events: + stats[get_key(event)].add(event) + + avg_list = EventList(stats.values()) + for event in avg_list: + event.stack = [] + event.input_shapes = "" + event.overload_name = "" + + return avg_list + + def run_multi_gpu_profile(pool: multiprocessing.Pool, test: TestCase, world_size: int) -> str: """ Runs a single test in another process. @@ -556,9 +591,8 @@ def run_multi_gpu_profile(pool: multiprocessing.Pool, test: TestCase, world_size ) rets = [el.get(120) for el in rets] + return _combine_traces(rets).table(sort_by="self_cuda_time_total", row_limit=20) - # TODO: Combine distributed profiling results? - return rets[0].key_averages().table(sort_by="self_cuda_time_total", row_limit=20) def run_single_profile(test: TestCase, pool: multiprocessing.Pool) -> str: """ From e342af471eb1a801d3fd6b85bc7d8e0988a1362a Mon Sep 17 00:00:00 2001 From: Robin Voetter Date: Sat, 30 Aug 2025 22:00:17 +0200 Subject: [PATCH 3/3] example: emit nvtx markers --- problems/amd_distributed/eval.py | 36 ++++++++++++++++++++------------ 1 file changed, 23 insertions(+), 13 deletions(-) diff --git a/problems/amd_distributed/eval.py b/problems/amd_distributed/eval.py index c99e73f..597b5ff 100644 --- a/problems/amd_distributed/eval.py +++ b/problems/amd_distributed/eval.py @@ -11,6 +11,7 @@ from typing import Any, Optional import torch.cuda +from torch.cuda.nvtx import range as nvtx_range from utils import set_seed, clear_l2_cache @@ -505,13 +506,16 @@ def _run_single_profile(test: TestCase) -> str: """ from submission import custom_kernel from torch.profiler import profile, ProfilerActivity - data = generate_input(**test.args) - torch.cuda.synchronize() - with profile(activities=[ProfilerActivity.CPU, ProfilerActivity.CUDA]) as prof: - submission_output = custom_kernel(_clone_data(data, 0)) + with nvtx_range("generate input"): + data = generate_input(**test.args) torch.cuda.synchronize() + with profile(activities=[ProfilerActivity.CPU, ProfilerActivity.CUDA]) as prof: + with nvtx_range("custom_kernel"): + submission_output = custom_kernel(_clone_data(data, 0)) + torch.cuda.synchronize() + return prof.key_averages().table(sort_by="self_cuda_time_total", row_limit=20) @@ -522,19 +526,25 @@ def _run_distributed_profile(test: TestCase, rank: int) -> "EventList": from submission import custom_kernel from torch.profiler import profile, ProfilerActivity import torch.distributed as dist - world_size = test.args["world_size"] - os.environ["MASTER_ADDR"] = "127.0.0.1" - os.environ["MASTER_PORT"] = "12356" - dist.init_process_group("nccl", init_method="env://", rank=rank, world_size=world_size, device_id=torch.device(f'cuda:{rank}')) + + with nvtx_range(f"init nccl, rank {rank}"): + world_size = test.args["world_size"] + os.environ["MASTER_ADDR"] = "127.0.0.1" + os.environ["MASTER_PORT"] = "12356" + dist.init_process_group("nccl", init_method="env://", rank=rank, world_size=world_size, device_id=torch.device(f'cuda:{rank}')) try: - data = generate_input(**test.args, rank=rank) - data = _clone_data(data, rank) - torch.cuda.synchronize() + with nvtx_range(f"generate input, rank {rank}"): + data = generate_input(**test.args, rank=rank) + data = _clone_data(data, rank) + torch.cuda.synchronize() + dist.barrier() with profile(activities=[ProfilerActivity.CPU, ProfilerActivity.CUDA]) as prof: - submission_output = custom_kernel(data) - torch.cuda.synchronize() + with nvtx_range(f"custom_kernel, rank {rank}"): + submission_output = custom_kernel(data) + torch.cuda.synchronize() + dist.barrier() return prof.events()