From dd44b499d636deb453b42c2f38ba49ca9c12cd65 Mon Sep 17 00:00:00 2001 From: S Islam Date: Wed, 1 Oct 2025 23:44:26 -0400 Subject: [PATCH 1/2] Efficient Booststrap --- SigProfilerExtractor/subroutines.py | 539 ++++++++++++++++++---------- 1 file changed, 348 insertions(+), 191 deletions(-) mode change 100644 => 100755 SigProfilerExtractor/subroutines.py diff --git a/SigProfilerExtractor/subroutines.py b/SigProfilerExtractor/subroutines.py old mode 100644 new mode 100755 index 81a3122..c949c71 --- a/SigProfilerExtractor/subroutines.py +++ b/SigProfilerExtractor/subroutines.py @@ -24,10 +24,16 @@ import string import os, sys import scipy - -os.environ["MKL_NUM_THREADS"] = "1" -os.environ["NUMEXPR_NUM_THREADS"] = "1" -os.environ["OMP_NUM_THREADS"] = "1" +import secrets +import traceback +from datetime import datetime +from multiprocessing import Manager + +os.environ.setdefault("OMP_NUM_THREADS", "1") +os.environ.setdefault("MKL_NUM_THREADS", "1") +os.environ.setdefault("OPENBLAS_NUM_THREADS", "1") +os.environ.setdefault("NUMEXPR_NUM_THREADS", "1") +os.environ.setdefault("VECLIB_MAXIMUM_THREADS", "1") # import SigProfilerExtractor as cosmic from scipy.stats import ranksums @@ -368,155 +374,286 @@ def nnmf_gpu( return Ws, Hs, convergence + def BootstrapCancerGenomes(genomes, seed=None): """ - index = genomes.index - cols = genomes.columns - genomes = np.array(genomes) - n = 100 - stderr = genomes*0.025 - std = stderr*math.sqrt(n) - genomes = np.random.normal(genomes, std, size=None).astype(int) - genomes[genomes<0] = 0 - - dataframe = pd.DataFrame(genomes) - dataframe.index = index - dataframe.columns = cols + Fast bootstrap in float32 to cut memory bandwidth. + - Accepts np.ndarray or pandas.DataFrame + - Returns np.ndarray[float32] with the same shape """ + # 1) One-time, contiguous float32 view for efficiency + G = np.asarray(genomes, dtype=np.float32, order="C") # (rows, cols) + + # 2) Column totals; keep accumulation in float64 for numerical accuracy + totals64 = G.sum(axis=0, dtype=np.float64) + # guard against zeros (no events in a column) + totals64[totals64 <= 0.0] = 1.0 + + # 3) Probabilities per column – multinomial in NumPy expects float64 probs + probs64 = (G / totals64).astype(np.float64, copy=False) + + # 4) Draw column-wise multinomials + out = np.empty_like(G, dtype=np.int32) # integer counts + for j in range(G.shape[1]): + n = int(round(totals64[j])) + out[:, j] = seed.multinomial(n, probs64[:, j], 1)[0] + + # check out for debugging + #print(out.astype(np.float32, copy=False)[0:5,0:5]) + # 5) Return float32 + return out.astype(np.float32, copy=False) - # normalize the data - a = pd.DataFrame( - genomes.sum(0) - ) # performing the sum of each column. variable a store the sum - a = ( - a.transpose() - ) # transfose the value so that if gets a proper dimention to normalize the data - repmat = pd.concat( - [a] * genomes.shape[0] - ) # normalize the data to get the probility of each datapoint to perform the "random.multinomial" operation - normGenomes = genomes / np.array(repmat) # part of normalizing step - - # Get the normGenomes as Matlab/ alternation of the "mnrnd" fuction in Matlap - # do the Boostraping - dataframe = ( - pd.DataFrame() - ) # declare an empty variable to populate with the Bootstrapped data - for i in range(0, normGenomes.shape[1]): - dataframe[i] = list( - seed.multinomial(a.iloc[:, i], normGenomes.iloc[:, i], 1)[0] - ) - return dataframe # NMF version for the multiprocessing library def pnmf( + rep_idx, + virt_id, batch_generator_pair=[1, None], + submit_time=None, + global_start=None, + shared_total=None, + shared_lock=None, genomes=1, totalProcesses=1, resample=True, init="nndsvd", - normalization_cutoff=10000000, + normalization_cutoff=10_000_000, norm="log2", gpu=False, execution_parameters=None, + return_timing=False, # optional: also return timing dict + verbose_bootstrap=False, # optional: per-bootstrap prints (GPU batch) ): - tic = time.time() - totalMutations = np.sum(genomes, axis=0) - genomes = pd.DataFrame(genomes) # creating/loading a dataframe/matrix + """ + One replicate (or a small batch on GPU) of NMF with fast float32 bootstrap. + + Expects BootstrapCancerGenomes(genomes_float32, seed) to return float32 ndarray. + Requires: normalize_samples, denormalize_samples, nnmf_gpu, nnmf_cpu, + Generator, PCG64DXSM in scope. + """ + start_wall = time.perf_counter() + queue_wait = (start_wall - submit_time) if submit_time is not None else 0.0 - # generators used for noise and matrix initialization + # -------- helpers -------- + def _iters_from_conv(Conv, max_it): + try: + v = Conv[0] if isinstance(Conv, (list, tuple, np.ndarray)) else Conv + v = int(v) + return v if v > 0 else int(max_it) + except Exception: + return int(max_it) + + # Float32, contiguous once up front (avoid pandas entirely here) + genomes_arr = np.asarray(genomes, dtype=np.float32, order="C") + + # RNGs for this replicate/batch poisson_generator = batch_generator_pair[1][0] - rep_generator = batch_generator_pair[1][1] - rand_rng = Generator(PCG64DXSM(rep_generator)) + rep_generator = batch_generator_pair[1][1] + rand_rng = Generator(PCG64DXSM(rep_generator)) poisson_rng = Generator(PCG64DXSM(poisson_generator)) - if gpu: - batch_size = batch_generator_pair[0] - nmf_fn = nnmf_gpu - results = [] - genome_list = [] - - for b in range(batch_size): - if resample == True: - bootstrapGenomes = BootstrapCancerGenomes(genomes, seed=poisson_rng) - else: - bootstrapGenomes = genomes - - bootstrapGenomes[bootstrapGenomes < 0.0001] = 0.0001 - totalMutations = np.sum(bootstrapGenomes, axis=0) - bootstrapGenomes = normalize_samples( - bootstrapGenomes, - totalMutations, - norm=norm, - normalization_cutoff=normalization_cutoff, + timing = {"queue_wait": float(queue_wait)} + try: + # ========================= + # ======== GPU ========== + # ========================= + if gpu: + batch_size = int(batch_generator_pair[0]) + nmf_fn = nnmf_gpu + + genome_list = [] + totals_list = [] + boot_times = [] + + for b in range(batch_size): + t0 = time.perf_counter() + + if resample: + bg = BootstrapCancerGenomes(genomes_arr, seed=poisson_rng) # float32 ndarray + else: + # copy to avoid mutating the shared base array + bg = genomes_arr.copy(order="C") + + # clamp smalls in-place, stays float32 & contiguous + np.maximum(bg, 1e-4, out=bg) + + # totals in float64 for stability (this is a tiny vector) + totals = bg.sum(axis=0, dtype=np.float64) + + # normalization (may return DataFrame in some installs) + normG = normalize_samples( + bg, totals, + norm=norm, normalization_cutoff=normalization_cutoff + ) + normG = np.asarray(normG, dtype=np.float32, order="C") + + genome_list.append(normG) + totals_list.append(totals) + boot_times.append(time.perf_counter() - t0) + + if verbose_bootstrap: + print(f"[{virt_id}] replicate {rep_idx+1}.{b+1} bootstrap={boot_times[-1]:.3f}s", flush=True) + + # Stack into (B, R, C) float32 and run GPU NMF + g = np.stack(genome_list, axis=0).astype(np.float32, copy=False) + + if torch.cuda.is_available(): + torch.cuda.synchronize() + t_gpu0 = time.perf_counter() + + W, H, Conv = nmf_fn( + g, + totalProcesses, + init=init, + execution_parameters=execution_parameters, + generator=rand_rng, ) - genome_list.append(bootstrapGenomes.values) - - g = np.array(genome_list) - W, H, Conv = nmf_fn( - g, - totalProcesses, - init=init, - execution_parameters=execution_parameters, - generator=rand_rng, - ) - for i in range(len(W)): - _W = np.array(W[i]) - _H = np.array(H[i]) - total = _W.sum(axis=0, keepdims=True)#[np.newaxis] - _W = _W / total - _H = _H * total.T - _H = denormalize_samples(_H, totalMutations) - _conv = Conv[i] - results.append((_W, _H, _conv)) - print("process " + str(totalProcesses) + " continues please wait... ") - print("execution time: {} seconds \n".format(round(time.time() - tic), 2)) - return results - - else: - nmf_fn = nnmf_cpu - if resample == True: - bootstrapGenomes = BootstrapCancerGenomes(genomes, seed=poisson_rng) + if torch.cuda.is_available(): + torch.cuda.synchronize() + gpu_nmf_time = time.perf_counter() - t_gpu0 + + # Postprocess per item in batch + results = [] + for i in range(len(W)): + _W = np.array(W[i], copy=False) + _H = np.array(H[i], copy=False) + + total = _W.sum(axis=0, keepdims=True) + _W = _W / total + _H = _H * total.T + _H = denormalize_samples(_H, totals_list[i]) + + results.append((_W, _H, Conv[i])) + + finish_wall = time.perf_counter() + compute = finish_wall - start_wall + wall_since_start = finish_wall - global_start if global_start else -1.0 + + + + # Prepare print + max_it = execution_parameters.get("max_NMF_iterations", 0) + iters = _iters_from_conv(Conv, max_it) + tag = " max_iter" if max_it and iters >= max_it else "" + gpu_txt = f" GPU={torch.cuda.current_device()}" if torch.cuda.is_available() else "" + + # Timing package + timing.update({ + "bootstrap_times": [float(x) for x in boot_times], + "bootstrap_mean": float(np.mean(boot_times)) if boot_times else 0.0, + "bootstrap_min": float(np.min(boot_times)) if boot_times else 0.0, + "bootstrap_max": float(np.max(boot_times)) if boot_times else 0.0, + "nmf_time": float(gpu_nmf_time), + "compute": float(compute), + }) + + msg = (f"Signture=>{totalProcesses} " + f"replicate=>{rep_idx+1} " + f"queue=>{queue_wait:.1f}s total-time=>{compute:.1f}s " + f"bootstrap(mean/min/max)=>{timing['bootstrap_mean']:.3f}/" + f"{timing['bootstrap_min']:.3f}/" + f"{timing['bootstrap_max']:.3f}s " + f"nmf=>{gpu_nmf_time:.3f}s " + f"(iters=>{iters}{tag})") + + msg += f" wall=>{wall_since_start:.1f}s{gpu_txt}" + print(msg, flush=True) + + return (results, timing) if return_timing else results + + # ========================= + # ========= CPU =========== + # ========================= else: - bootstrapGenomes = genomes - - bootstrapGenomes[bootstrapGenomes < 0.0001] = 0.0001 - bootstrapGenomes = bootstrapGenomes.astype(float) - - # normalize the samples to handle the hypermutators + nmf_fn = nnmf_cpu + # Bootstrap once (single item on CPU path) + t0 = time.perf_counter() + if resample: + bg = BootstrapCancerGenomes(genomes_arr, seed=poisson_rng) + else: + bg = genomes_arr.copy(order="C") - totalMutations = np.sum(bootstrapGenomes, axis=0) + np.maximum(bg, 1e-4, out=bg) + totals = bg.sum(axis=0, dtype=np.float64) - bootstrapGenomes = normalize_samples( - bootstrapGenomes, - totalMutations, - norm=norm, - normalization_cutoff=normalization_cutoff, + normG = normalize_samples( + bg, totals, + norm=norm, normalization_cutoff=normalization_cutoff + ) + normG = np.asarray(normG, dtype=np.float32, order="C") + boot_time = time.perf_counter() - t0 + + # Run CPU NMF + t1 = time.perf_counter() + W, H, Conv = nmf_fn( + normG, + totalProcesses, + init=init, + execution_parameters=execution_parameters, + generator=rand_rng, + ) + nmf_cpu_time = time.perf_counter() - t1 + + W = np.array(W, copy=False) + H = np.array(H, copy=False) + total = W.sum(axis=0, keepdims=True) + W = W / total + H = H * total.T + H = denormalize_samples(H, totals) + + finish_wall = time.perf_counter() + compute = finish_wall - start_wall + wall_since_start = finish_wall - global_start if global_start else -1.0 + + cumul_compute = None + if shared_total is not None and shared_lock is not None: + with shared_lock: + shared_total.value += compute + cumul_compute = shared_total.value + + max_it = execution_parameters.get("max_NMF_iterations", 0) + iters = _iters_from_conv(Conv, max_it) + tag = " max_iter" if max_it and iters >= max_it else "" + + timing.update({ + "bootstrap_times": [float(boot_time)], + "bootstrap_mean": float(boot_time), + "bootstrap_min": float(boot_time), + "bootstrap_max": float(boot_time), + "nmf_time": float(nmf_cpu_time), + "compute": float(compute), + }) + + msg = (f"Signture=>{totalProcesses} " + f"replicate=>{rep_idx+1} " + f"queue=>{queue_wait:.1f}s total-time=>{compute:.1f}s " + f"bootstrap=>{boot_time:.3f}s nmf=>{nmf_cpu_time:.3f}s " + f"(iters=>{iters}{tag})") + """ + if cumul_compute is not None: + msg += f" cum_compute={cumul_compute:.1f}s" + """ + msg += f" wall=>{wall_since_start:.1f}s" + print(msg, flush=True) + + result = (W, H, Conv) + return (result, timing) if return_timing else result + + except Exception as e: + fail_wall = time.perf_counter() + compute = fail_wall - start_wall + wall_since_start = fail_wall - global_start if global_start else -1.0 + tb = traceback.format_exc(limit=3) + print( + f"[{virt_id}] replicate {rep_idx+1} FAILED " + f"queue={queue_wait:.1f}s compute={compute:.1f}s " + f"wall={wall_since_start:.1f}s: {e}\n{tb}", + flush=True, ) - - bootstrapGenomes = np.array(bootstrapGenomes) - - W, H, kl = nmf_fn( - bootstrapGenomes, - totalProcesses, - init=init, - execution_parameters=execution_parameters, - generator=rand_rng, - ) # uses custom function nnmf - - W = np.array(W) - H = np.array(H) - total = W.sum(axis=0, keepdims=True) #[np.newaxis] - W = W / total - H = H * total.T - - # denormalize H - H = denormalize_samples(H, totalMutations) - print("process " + str(totalProcesses) + " continues please wait... ") - print("execution time: {} seconds \n".format(round(time.time() - tic), 2)) - return W, H, kl + raise def parallel_runs( @@ -526,86 +663,106 @@ def parallel_runs( verbose=False, replicate_generators=None, ): - iterations = execution_parameters["NMF_replicates"] - init = execution_parameters["NMF_init"] - normalization_cutoff = execution_parameters["normalization_cutoff"] - n_cpu = execution_parameters["cpu"] - resample = execution_parameters["resample"] - norm = execution_parameters["matrix_normalization"] - gpu = execution_parameters["gpu"] - batch_size = execution_parameters["batch_size"] + # --- unpack exec params --- + iterations = int(execution_parameters["NMF_replicates"]) + init = execution_parameters["NMF_init"] + normalization_cutoff = execution_parameters["normalization_cutoff"] + n_cpu = int(execution_parameters["cpu"]) + resample = execution_parameters["resample"] + norm = execution_parameters["matrix_normalization"] + gpu = bool(execution_parameters["gpu"]) + batch_size = int(execution_parameters["batch_size"]) if verbose: print( - "Process " - + str(totalProcesses) - + " is in progress\n===================================>" + f"Process {totalProcesses} is in progress\n" + "===================================>" ) - if n_cpu == -1: - pool = multiprocessing.Pool() - else: - pool = multiprocessing.Pool(processes=n_cpu) - - num_full_batches = iterations // batch_size - last_batch_size = iterations % batch_size - # generators used for noise and matrix initialization + # --- build per-replicate RNG pairs --- poisson_generator = replicate_generators[0] - rep_generator = replicate_generators[1] - # spawn "iterations" number of generators for poisson - poisson_rand_list = poisson_generator[0].spawn(int(iterations)) - # spawn "iterations" number of generators for W,H - sub_rand_generators = rep_generator.spawn(int(iterations)) - generator_pair_list = [] - for i, j in zip(poisson_rand_list, sub_rand_generators): - generator_pair_list.append([i, j]) - - batches = [batch_size for _ in range(num_full_batches)] - if last_batch_size != 0: - batches.append(last_batch_size) + rep_generator = replicate_generators[1] - batch_generator_pair = [] + poisson_rand_list = poisson_generator[0].spawn(iterations) + sub_rand_generators = rep_generator.spawn(iterations) - # There will be nmf_replicate number of batch_generator_pair elements - for i, j in zip(batches, generator_pair_list): - batch_generator_pair.append([i, j]) + generator_pair_list = [[i, j] for i, j in zip(poisson_rand_list, sub_rand_generators)] - if gpu == True: - # submit jobs for parallel processing - pool_nmf = partial( - pnmf, - genomes=genomes, - totalProcesses=totalProcesses, - resample=resample, - init=init, - normalization_cutoff=normalization_cutoff, - norm=norm, - gpu=gpu, - execution_parameters=execution_parameters, - ) - result_list = pool.map(pool_nmf, batch_generator_pair) - pool.close() - pool.join() - flat_list = [item for sublist in result_list for item in sublist] + # --- batch the replicates for GPU path (batch_size may be >1), or 1 for CPU --- + if gpu: + num_full_batches = iterations // batch_size + last_batch_size = iterations % batch_size + batches = [batch_size] * num_full_batches + ([last_batch_size] if last_batch_size else []) + else: + batches = [1] * iterations # CPU path: one replicate per task + + batch_generator_pair = [] + # couple each batch size with its RNG pair + for bsz, rng_pair in zip(batches, generator_pair_list): + batch_generator_pair.append([bsz, rng_pair]) + # --- pool sizing --- + if gpu: + num_gpus = max(1, torch.cuda.device_count()) + desired = min(n_cpu, 2 * num_gpus) # at most 2 procs / GPU else: - pool_nmf = partial( - pnmf, - genomes=genomes, - totalProcesses=totalProcesses, - resample=resample, - init=init, - normalization_cutoff=normalization_cutoff, - norm=norm, - gpu=gpu, - execution_parameters=execution_parameters, + desired = n_cpu if n_cpu != -1 else multiprocessing.cpu_count() + + # never spawn more workers than tasks + max_workers = max(1, min(desired, len(batch_generator_pair))) + pool = multiprocessing.Pool(processes=max_workers) + + # --- global timing + shared counters for cumulative compute --- + start_wall = datetime.now() + print(f"[parallel_runs] to Extract {totalProcesses} Signatures Started at {start_wall.strftime('%Y-%m-%d %H:%M:%S')}") + global_start = time.perf_counter() + + manager = Manager() + shared_total = manager.Value("d", 0.0) + shared_lock = manager.Lock() + + # --- pre-convert genomes once to contiguous float32 (reduces per-child work) --- + genomes_arr = np.asarray(genomes, dtype=np.float32, order="C") + + # --- build tasks: (rep_idx, virt_id, pair, submit_time, global_start, shared_total, shared_lock) --- + tasks = [] + for rep_idx, pair in enumerate(batch_generator_pair): + virt_id = f"R{rep_idx+1:03d}-{secrets.token_hex(2)}" + submit_time = time.perf_counter() + tasks.append( + (rep_idx, virt_id, pair, submit_time, global_start, shared_total, shared_lock) ) - result_list = pool.map(pool_nmf, batch_generator_pair) - pool.close() - pool.join() + + # --- partial for pnmf (global_start/shared_* are per-task args; DON'T bind them here) --- + pool_nmf = partial( + pnmf, + genomes=genomes_arr, # pass the float32 contiguous array + totalProcesses=totalProcesses, + resample=resample, + init=init, + normalization_cutoff=normalization_cutoff, + norm=norm, + gpu=gpu, + execution_parameters=execution_parameters, + ) + + # --- run --- + result_list = pool.starmap(pool_nmf, tasks) + pool.close() + pool.join() + + # --- flatten GPU list-of-lists; CPU already returns per-replicate tuples --- + if gpu: + flat_list = [item for sublist in result_list for item in sublist] + else: flat_list = result_list - return flat_list + end_wall = datetime.now() + elapsed = time.perf_counter() - global_start + print(f"[parallel_runs] to Extract {totalProcesses} Signatures Ended at {end_wall.strftime('%Y-%m-%d %H:%M:%S')}") + print(f"[parallel_runs] Total elapsed {elapsed/60:.2f} min ({elapsed:.1f} sec)") + + return flat_list #################################### Decipher Signatures ################################################### From e04ed9753692a9625c6b05fe63392352a1976a25 Mon Sep 17 00:00:00 2001 From: S M Ashiqul Islam Date: Sat, 7 Mar 2026 11:14:06 -0500 Subject: [PATCH 2/2] Fix pandas 3 compatibility in stability export and CSV separators --- SigProfilerExtractor/subroutines.py | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/SigProfilerExtractor/subroutines.py b/SigProfilerExtractor/subroutines.py index c949c71..02de457 100755 --- a/SigProfilerExtractor/subroutines.py +++ b/SigProfilerExtractor/subroutines.py @@ -1544,7 +1544,7 @@ def export_information( + str(i) + "_Signatures" + ".txt", - "\t", + sep="\t", index_label=[processes.columns.name], ) @@ -1562,7 +1562,7 @@ def export_information( + "_S" + str(i) + "_NMF_Activities.txt", - "\t", + sep="\t", index_label=[exposures.columns.name], ) @@ -1616,7 +1616,7 @@ def export_information( + str(i) + "_Signatures_SEM_Error" + ".txt", - "\t", + sep="\t", float_format="%.2E", index_label=[processes.columns.name], ) @@ -1636,7 +1636,7 @@ def export_information( + "_S" + str(i) + "_NMF_Activities_SEM_Error.txt", - "\t", + sep="\t", float_format="%.2E", index_label=[exposures.columns.name], ) @@ -1660,7 +1660,7 @@ def export_information( + str(i) + "_" + "Signatures_stats.txt", - "\t", + sep="\t", index_label="Signatures", ) @@ -1687,7 +1687,7 @@ def export_information( + str(i) + "_" + "NMF_Convergence_Information.txt", - "\t", + sep="\t", index_label="NMF_Replicate", ) @@ -2101,7 +2101,8 @@ def stabVsRError( # add % signs data.insert(1, "Considerable Solution", stable_solutions) data.insert(2, "P-value", probabilities) - data.iloc[:, 3:7] = data.iloc[:, 3:7].astype(str) + "%" + for col_name in data.columns[3:7]: + data[col_name] = data[col_name].astype(str) + "%" data = data.reset_index() columns_list = list(data.columns) columns_list[0] = "Signatures"