diff --git a/problems/amd_distributed/eval.py b/problems/amd_distributed/eval.py index c3d20f9..597b5ff 100644 --- a/problems/amd_distributed/eval.py +++ b/problems/amd_distributed/eval.py @@ -11,6 +11,7 @@ from typing import Any, Optional import torch.cuda +from torch.cuda.nvtx import range as nvtx_range from utils import set_seed, clear_l2_cache @@ -499,26 +500,126 @@ def run_benchmarking(logger: PopcornOutput, pool: multiprocessing.Pool, tests: l return 112 -def run_single_profile(test: TestCase) -> str: +def _run_single_profile(test: TestCase) -> str: """ Runs a single test case. Do not call directly """ from submission import custom_kernel - from torch.profiler import profile, record_function, ProfilerActivity - data = generate_input(**test.args) - torch.cuda.synchronize() + from torch.profiler import profile, ProfilerActivity - with profile(activities=[ProfilerActivity.CPU, ProfilerActivity.CUDA]) as prof: - submission_output = custom_kernel(_clone_data(data, 0)) + with nvtx_range("generate input"): + data = generate_input(**test.args) torch.cuda.synchronize() + + with profile(activities=[ProfilerActivity.CPU, ProfilerActivity.CUDA]) as prof: + with nvtx_range("custom_kernel"): + submission_output = custom_kernel(_clone_data(data, 0)) + torch.cuda.synchronize() + return prof.key_averages().table(sort_by="self_cuda_time_total", row_limit=20) -def run_profiling(logger: PopcornOutput, tests: list[TestCase]): +def _run_distributed_profile(test: TestCase, rank: int) -> "EventList": + """ + Runs a single profiling case. Do not call directly + """ + from submission import custom_kernel + from torch.profiler import profile, ProfilerActivity + import torch.distributed as dist + + with nvtx_range(f"init nccl, rank {rank}"): + world_size = test.args["world_size"] + os.environ["MASTER_ADDR"] = "127.0.0.1" + os.environ["MASTER_PORT"] = "12356" + dist.init_process_group("nccl", init_method="env://", rank=rank, world_size=world_size, device_id=torch.device(f'cuda:{rank}')) + + try: + with nvtx_range(f"generate input, rank {rank}"): + data = generate_input(**test.args, rank=rank) + data = _clone_data(data, rank) + torch.cuda.synchronize() + dist.barrier() + + with profile(activities=[ProfilerActivity.CPU, ProfilerActivity.CUDA]) as prof: + with nvtx_range(f"custom_kernel, rank {rank}"): + submission_output = custom_kernel(data) + torch.cuda.synchronize() + dist.barrier() + + return prof.events() + + finally: + dist.destroy_process_group() + + +def _combine_traces(traces: list["EventList"]) -> "EventList": + """ + Combine multiple event traces obtained from multiple (distributed) torch.profiler + activities. This function simply aggregates the data as like `prof.key_averages()`, + except over multiple traces. Most of this function is reimplemented + from `torch.autograd.profiler_util.EventList.key_averages()`. + """ + from torch.autograd.profiler_util import FunctionEventAvg, EventList + from collections import defaultdict + + def get_key(event) -> tuple[str, ...]: + return ( + str(event.key), + str(event.node_id), + str(event.device_type), + str(event.is_legacy), + str(event.is_user_annotation), + ) + + stats: dict[tuple[str, ...], FunctionEventAvg] = defaultdict(FunctionEventAvg) + + for events in traces: + for event in events: + stats[get_key(event)].add(event) + + avg_list = EventList(stats.values()) + for event in avg_list: + event.stack = [] + event.input_shapes = "" + event.overload_name = "" + + return avg_list + + +def run_multi_gpu_profile(pool: multiprocessing.Pool, test: TestCase, world_size: int) -> str: + """ + Runs a single test in another process. + """ + rets = [] + # world_size is a mandatory argument for multi-gpu tests + for i in range(world_size): + rets.append( + pool.apply_async( + _run_distributed_profile, + args=(test, i), + ) + ) + + rets = [el.get(120) for el in rets] + return _combine_traces(rets).table(sort_by="self_cuda_time_total", row_limit=20) + + +def run_single_profile(test: TestCase, pool: multiprocessing.Pool) -> str: + """ + Runs a single profiling activity in another process. + """ + world_size = test.args.get("world_size", None) + if world_size is None: + return pool.apply(_run_single_profile, (test,)) + else: + return run_multi_gpu_profile(pool, test, world_size) + + +def run_profiling(logger: PopcornOutput, pool: multiprocessing.Pool, tests: list[TestCase]): logger.log("benchmark-count", len(tests)) for idx, test in enumerate(tests): logger.log(f"benchmark.{idx}.spec", test.spec) - report = run_single_profile(test) + report = run_single_profile(test, pool) logger.log(f"benchmark.{idx}.report", base64.b64encode(report.encode("utf-8"), b"+*").decode("utf-8")) logger.log("check", "pass") return 0 @@ -568,7 +669,7 @@ def main(): logger.log("check", "pass" if passed else "fail") elif mode == "profile": - run_profiling(logger, tests) + run_profiling(logger, pool, tests) else: # invalid mode return 2