diff --git a/pipit/trace.py b/pipit/trace.py index 2b4f111a..5d5cbe08 100644 --- a/pipit/trace.py +++ b/pipit/trace.py @@ -861,3 +861,155 @@ def detect_pattern( patterns.append(match_original) return patterns + + # TODO: comments + # TODO: unit tests - compare with hta tool + def comm_comp_breakdown(self): + def kernel_type(name): + if "ncclKernel" in name: + return "comm" + elif "Memcpy" in name or "Memset" in name: + return "mem" + else: + return "comp" + + def merge_intervals(intervals): + merged_intervals = [intervals.pop(0)] + + while len(intervals) > 0: + top_start, top_end = merged_intervals[-1] + curr_start, curr_end = intervals.pop(0) + + if curr_end >= top_start and curr_start <= top_end: + merged_intervals[-1] = ( + min([curr_start, top_start]), + max([curr_end, top_end]), + ) + else: + merged_intervals.append((curr_start, curr_end)) + + return merged_intervals + + def mean_over_ranks(breakdown_dict): + mean_dict = { + "Total Time": 0, + "Only Comp": 0, + "Overlapped Comm-Comp": 0, + "Only Comm": 0, + "Other": 0, + } + + i = 0 + for rank in breakdown_dict: + for metric in breakdown_dict[rank]: + mean_dict[metric] += breakdown_dict[rank][metric] / 1e9 + i += 1 + + for key in mean_dict: + mean_dict[key] /= i + + breakdown_dict["Average"] = mean_dict + + self._match_events() + + ranks = set(self.events["Rank"]) + + breakdown_dict = {str(p): {} for p in ranks} + + process_labels_df = self.definitions.loc[ + self.definitions["Definition Type"] == "process_labels" + ] + cpu_processes = set( + process_labels_df[ + process_labels_df.apply( + lambda row: row["Attributes"]["args"]["labels"].startswith("CPU"), + axis=1, + ) + ]["Process"] + ) + filtered_df = self.events.loc[ + (~self.events["Process"].isin(cpu_processes)) + & (self.events["Process"].astype(str).str.isnumeric()) + ] + + for p in ranks: + gpu_df = filtered_df.loc[filtered_df["Rank"] == p] + + gpu_df = gpu_df.loc[gpu_df["Event Type"] == "Enter"] + + gpu_df["kernel_type"] = gpu_df.apply( + lambda row: kernel_type(row["Name"]), + axis=1, + ) + + comm_df = gpu_df.loc[gpu_df["kernel_type"] == "comm"] + comm_intervals = merge_intervals( + list( + zip( + comm_df["Timestamp (ns)"].to_list(), + comm_df["_matching_timestamp"].to_list(), + ) + ) + ) + + comp_df = gpu_df.loc[gpu_df["kernel_type"] == "comp"] + comp_intervals = merge_intervals( + list( + zip( + comp_df["Timestamp (ns)"].to_list(), + comp_df["_matching_timestamp"].to_list(), + ) + ) + ) + + comm_enter_times, comm_leave_times = zip(*comm_intervals) + comp_enter_times, comp_leave_times = zip(*comp_intervals) + comp_df = pd.DataFrame( + { + "Timestamp (ns)": comp_enter_times, + "_matching_timestamp": comp_leave_times, + } + ) + + total_comm_time, overlapped_comm_time = 0, 0 + for j in range(len(comm_enter_times)): + curr_enter_time, curr_leave_time = ( + comm_enter_times[j], + comm_leave_times[j], + ) + total_comm_time += curr_leave_time - curr_enter_time + + overlapped_df = comp_df.loc[ + (comp_df["Timestamp (ns)"] <= curr_leave_time) + & (comp_df["_matching_timestamp"] >= curr_enter_time) + ] + + overlapped_comm_time += overlapped_df.apply( + lambda row: ( + min([row["_matching_timestamp"], curr_leave_time]) + - max([row["Timestamp (ns)"], curr_enter_time]) + ), + axis=1, + ).sum() + + breakdown_dict[str(p)]["Total Time"] = max(gpu_df["Timestamp (ns)"]) - min( + gpu_df["Timestamp (ns)"] + ) + breakdown_dict[str(p)]["Non-overlapped Computation"] = ( + comp_df["_matching_timestamp"] - comp_df["Timestamp (ns)"] + ).sum() - overlapped_comm_time + breakdown_dict[str(p)][ + "Computation Overlapping w/ Communication" + ] = overlapped_comm_time + breakdown_dict[str(p)]["Non-overlapped Communication"] = ( + total_comm_time - overlapped_comm_time + ) + breakdown_dict[str(p)]["Other"] = breakdown_dict[str(p)]["Total Time"] - ( + breakdown_dict[str(p)]["Non-overlapped Computation"] + + breakdown_dict[str(p)]["Non-overlapped Communication"] + + breakdown_dict[str(p)]["Computation Overlapping w/ Communication"] + ) + + mean_over_ranks(breakdown_dict) + + return breakdown_dict