diff --git a/applications/issm_model/examples/ISMIP_Choi/run_job_strong_spack.sbatch b/applications/issm_model/examples/ISMIP_Choi/run_job_strong_spack.sbatch new file mode 100644 index 0000000..d09d29b --- /dev/null +++ b/applications/issm_model/examples/ISMIP_Choi/run_job_strong_spack.sbatch @@ -0,0 +1,62 @@ +#!/bin/bash +#SBATCH -t250:00:00 +#SBATCH -JISMIP +#SBATCH -N 11 +#SBATCH -n 264 +#SBATCH --ntasks-per-node=24 +#SBATCH -p cpu-large # pick ONE partition; or switch to: -p inferno +#SBATCH --mem=256G # ensure this is allowed per node on that partition +#SBATCH -A gts-arobel3-atlas +#SBATCH -ostrong_scaling-%j.out +#SBATCH --mail-type=BEGIN,END,FAIL +#SBATCH --mail-user=bkyanjo3@gatech.edu + +# set -euo pipefail +cd $SLURM_SUBMIT_DIR + +# Spack env (numpy, mpi4py, h5py, etc.) +module purge || true +source /storage/home/hcoda1/8/bkyanjo3/r-arobel3-0/ICESEE-Spack/scripts/activate.sh +MPI_DIR="$(spack -e /storage/home/hcoda1/8/bkyanjo3/r-arobel3-0/ICESEE-Spack/.spack-env/icesee location -i openmpi)" +export PATH="$MPI_DIR/bin:$PATH" +export LD_LIBRARY_PATH="$MPI_DIR/lib:$LD_LIBRARY_PATH" + +# --- Thread pinning --------------------------------------------------- +export ICESEE_PERFORMANCE_TEST=1 + +# --- Test Matrix ------------------------------------------------------ +NENS=64 +MODEL_NPROCS=4 +# NP_LIST="64 32 16 8 4 2 1" +NP_LIST="8 64 32 16 4 2 1" + +OUTROOT="strong_scaling_$(date +%Y%m%d_%H%M%S)" +mkdir -p "$OUTROOT" + + # "$MPI_DIR/bin/mpirun" -np "$np" \ + # srun --mpi=pmix -n "$np" \ + +run_case () { + local np="$1" + local tag="np${np}_Nens${NENS}_mp${MODEL_NPROCS}" + echo "=== Running case: $tag ===" + mkdir -p "${OUTROOT}/${tag}" + + /usr/bin/time -v \ + srun --mpi=pmix -n "$np" \ + python run_da_issm.py \ + --Nens=${NENS} \ + --model_nprocs=${MODEL_NPROCS} \ + --verbose \ + > "${OUTROOT}/${tag}/stdout.log" \ + 2> "${OUTROOT}/${tag}/stderr.log" + + echo "=== Finished: $tag ===" +} + +for np in $NP_LIST; do + (( np > NENS )) && { echo "Skip np=${np} (NENS=${NENS})"; continue; } + run_case "$np" +done + +echo "All runs complete. Results in: ${OUTROOT}" diff --git a/applications/issm_model/examples/ISMIP_Choi/run_job_weak_spack.sbatch b/applications/issm_model/examples/ISMIP_Choi/run_job_weak_spack.sbatch new file mode 100644 index 0000000..40493a5 --- /dev/null +++ b/applications/issm_model/examples/ISMIP_Choi/run_job_weak_spack.sbatch @@ -0,0 +1,68 @@ +#!/bin/bash +#SBATCH -t250:00:00 +#SBATCH -JISMIP +#SBATCH -N 14 # 1080 / 24 = 45 nodes +#SBATCH -n 336 # request enough tasks for the max np +#SBATCH --ntasks-per-node=24 +#SBATCH -p cpu-large # pick ONE partition; or switch to: -p inferno +#SBATCH --mem=256G # per-node; ensure this is allowed on the partition +#SBATCH -A gts-arobel3-atlas +#SBATCH -oweak_scaling-%j.out +#SBATCH --mail-type=BEGIN,END,FAIL +#SBATCH --mail-user=bankyanjo@gmail.com + +# set -euo pipefail +cd "$SLURM_SUBMIT_DIR" + +# Spack env (numpy, mpi4py, h5py, etc.) +module purge || true +source /storage/home/hcoda1/8/bkyanjo3/r-arobel3-0/ICESEE-Spack/scripts/activate.sh +MPI_DIR="$(spack -e /storage/home/hcoda1/8/bkyanjo3/r-arobel3-0/ICESEE-Spack/.spack-env/icesee location -i openmpi)" +export PATH="$MPI_DIR/bin:$PATH" +export LD_LIBRARY_PATH="$MPI_DIR/lib:$LD_LIBRARY_PATH" + +# --- Performance flags ------------------------------------------------ +export ICESEE_PERFORMANCE_TEST=1 + +# --- Weak scaling: np = Nens; model_nprocs fixed --------------------- +MODEL_NPROCS=4 +NP_LIST="4 8 16 32 64 128 256" + +OUTROOT="weak_scaling_$(date +%Y%m%d_%H%M%S)" +mkdir -p "$OUTROOT" + +run_case () { + local np="$1" + local nens="$np" # enforce np = Nens + local tag="np${np}_Nens${nens}_mp${MODEL_NPROCS}" + echo "=== Running case: $tag ===" + mkdir -p "${OUTROOT}/${tag}" + + # Compute total ranks used = np * (MODEL_NPROCS + 1) + local rank_size=$(( np * (MODEL_NPROCS + 1) )) + local data_path="_modelrun_datasets" + + /usr/bin/time -v \ + "$MPI_DIR/bin/mpirun" -np "$np" \ + python run_da_issm.py \ + --Nens="${nens}" \ + --model_nprocs="${MODEL_NPROCS}" \ + --verbose \ + > "${OUTROOT}/${tag}/stdout.log" \ + 2> "${OUTROOT}/${tag}/stderr.log" + + echo "=== Finished: $tag ===" +} + +# Safety: skip if requested np exceeds SLURM allocation +for np in $NP_LIST; do + if (( np > SLURM_NTASKS )); then + echo "Skip np=${np} (exceeds allocation SLURM_NTASKS=${SLURM_NTASKS})" + continue + fi + run_case "$np" + # pause for 2 minutes between runs to allow for cleanup and resource availability + sleep 120 +done + +echo "All runs complete. Results in: ${OUTROOT}" diff --git a/src/EnKF/python_enkf/EnKF.py b/src/EnKF/python_enkf/EnKF.py index 5df8aa5..9ce083e 100644 --- a/src/EnKF/python_enkf/EnKF.py +++ b/src/EnKF/python_enkf/EnKF.py @@ -122,10 +122,12 @@ def forecast_step(self, ensemble=None, forecast_step_single=None, **model_kwargs noise = np.concatenate(q0, axis=0) model_kwargs.update({"noise": noise}) del noise_all, q0, noise_, W + # q0 = generate_enkf_field(None, np.sqrt(Lx*Ly), hdim, params["total_state_param_vars"], rh=len_scale, verbose=False) # for key,value in updated_state.items(): - # ensemble[indx_map[key],ens] = value + q0[:len(value)] + # ensemble[indx_map[key],ens] = value + q0[:len(value)] + # ensemble[indx_map[key],ens] = value + np.random.multivariate_normal(np.zeros(len(value)), 0.0*np.eye(len(value))) return ensemble @@ -402,13 +404,14 @@ def EnKF_Analysis(self, ensemble): ensemble_analysis[:,ens] = ensemble[:,ens] + KalGain @ (virtual_observations[:,ens] - self.Observation_function(ensemble[:,ens])) # compute the analysis error covariance - difference = ensemble_analysis - np.mean(ensemble_analysis, axis=1,keepdims=True) - analysis_error_cov =(1/(Nens-1)) * difference @ difference.T + # difference = ensemble_analysis - np.mean(ensemble_analysis, axis=1,keepdims=True) + # analysis_error_cov =(1/(Nens-1)) * difference @ difference.T # Debug # print(f"[Debug] max of analysis error covariance: {np.max(analysis_error_cov[567:,567:])}") - return ensemble_analysis, analysis_error_cov + # return ensemble_analysis, analysis_error_cov + return ensemble_analysis def DEnKF_Analysis(self, ensemble): """ @@ -445,132 +448,10 @@ def DEnKF_Analysis(self, ensemble): ensemble_analysis = analysis_anomalies + analysis_mean.reshape(-1,1) # compute the analysis error covariance - analysis_error_cov =(1/(N-1)) * analysis_anomalies @ analysis_anomalies.T - - return ensemble_analysis, analysis_error_cov - - # def DEnKF_Analysis(self, ensemble): - # """ - # Deterministic Ensemble Kalman Filter (DEnKF) analysis step using MPI. - - # Parameters: - # ensemble: ndarray - Ensemble matrix (n x N). - - # Returns: - # ensemble_analysis: updated ensemble matrix (n x N). - # analysis_error_cov: ndarray - Analysis error covariance matrix (n x n). - # """ + # analysis_error_cov =(1/(N-1)) * analysis_anomalies @ analysis_anomalies.T - # if re.match(r"\Aserial\Z", self.parallel_flag, re.IGNORECASE): - - # print("DEnKF Analysis: Starting analysis step") - - # # Compute the Kalman gain - # print("DEnKF Analysis: Computing Kalman gain") - # KalGain = self._compute_kalman_gain() - - # # Get dimensions - # n, N = ensemble.shape - # m = self.Observation_vec.shape[0] - # print(f"DEnKF Analysis: Ensemble dimensions (n={n}, N={N}), Observation size (m={m})") - - # # Compute ensemble mean - # print("DEnKF Analysis: Computing ensemble forecast mean") - # ensemble_forecast_mean = np.mean(ensemble, axis=1) - # print(f"DEnKF Analysis: Ensemble forecast mean calculated: {ensemble_forecast_mean}") - - # # Compute analysis mean - # print("DEnKF Analysis: Computing analysis mean") - # analysis_mean = ensemble_forecast_mean + KalGain @ (self.Observation_vec - self.Observation_function(ensemble_forecast_mean)) - # print(f"DEnKF Analysis: Analysis mean calculated: {analysis_mean}") - - # # Compute forecast and analysis anomalies - # print("DEnKF Analysis: Computing forecast and analysis anomalies") - # forecast_anomalies = np.zeros_like(ensemble) - # analysis_anomalies = np.zeros_like(ensemble) - - # for i in range(N): - # forecast_anomalies[:, i] = ensemble[:, i] - ensemble_forecast_mean - # analysis_anomalies[:, i] = forecast_anomalies[:, i] - 0.5 * KalGain @ self.Observation_function(forecast_anomalies[:, i]) - # print(f"DEnKF Analysis: Anomalies for member {i} calculated") - - # # Compute the analysis ensemble - # print("DEnKF Analysis: Computing analysis ensemble") - # ensemble_analysis = analysis_anomalies + analysis_mean.reshape(-1, 1) - # print(f"DEnKF Analysis: Analysis ensemble computed: {ensemble_analysis}") - - # # Compute the analysis error covariance - # print("DEnKF Analysis: Computing analysis error covariance") - # analysis_error_cov = (1 / (N - 1)) * analysis_anomalies @ analysis_anomalies.T - # print(f"DEnKF Analysis: Analysis error covariance computed: {analysis_error_cov}") - - # print("DEnKF Analysis: Analysis step completed") - # return ensemble_analysis, analysis_error_cov - - # elif re.match(r"\AMPI\Z", self.parallel_flag, re.IGNORECASE): - - # from mpi4py import MPI - # comm = MPI.COMM_WORLD - # rank = comm.Get_rank() - # size = comm.Get_size() - - # n, N = ensemble.shape - - # # Distribute the ensemble among processes - # chunk_sizes = [(N // size) + (1 if i < (N % size) else 0) for i in range(size)] - # displacements = [sum(chunk_sizes[:i]) for i in range(size)] - # start_idx = displacements[rank] - # end_idx = start_idx + chunk_sizes[rank] - # local_ensemble = ensemble[:, start_idx:end_idx] - - # # 1. Compute local ensemble mean - # print(f"Rank {rank}: Starting local mean calculation") - # local_mean = np.mean(local_ensemble, axis=1) - - # # 2. Compute the global mean - # global_mean = np.zeros(n) - # comm.Allreduce(local_mean, global_mean, op=MPI.SUM) - # global_mean /= N - # print(f"Rank {rank}: Global mean calculated: {global_mean}") - - # # 3. Compute Kalman gain - # KalGain = self._compute_kalman_gain() - - # # 4. Compute the analysis mean - # if rank == 0: - # print(f"Rank {rank}: Computing analysis mean") - # analysis_mean = global_mean + KalGain @ (self.Observation_vec - self.Observation_function(global_mean)) - - # # 5. Compute local anomalies - # local_anomalies = local_ensemble - global_mean.reshape(-1, 1) - - # # 6. Apply correction to local anomalies - # local_analysis_anomalies = np.zeros_like(local_anomalies) - # for i in range(local_anomalies.shape[1]): - # local_analysis_anomalies[:, i] = ( - # local_anomalies[:, i] - 0.5 * KalGain @ self.Observation_function(local_anomalies[:, i]) - # ) - # print(f"Rank {rank}: Local analysis anomalies calculated") - - # # 7. Gather analysis anomalies on rank 0 - # gathered_anomalies = comm.gather(local_analysis_anomalies, root=0) - - # # 8. Assemble analysis ensemble and compute covariance on rank 0 - # if rank == 0: - # print(f"Rank {rank}: Assembling analysis anomalies") - # analysis_anomalies = np.hstack(gathered_anomalies) - # ensemble_analysis = analysis_anomalies + analysis_mean.reshape(-1, 1) - # analysis_error_cov = (1 / (N - 1)) * analysis_anomalies @ analysis_anomalies.T - # else: - # ensemble_analysis = None - # analysis_error_cov = None - - # # 9. Broadcast results to all processes - # ensemble_analysis = comm.bcast(ensemble_analysis, root=0) - # analysis_error_cov = comm.bcast(analysis_error_cov, root=0) - # print(f"Rank {rank}: Final analysis broadcast completed") - - # return ensemble_analysis, analysis_error_cov + # return ensemble_analysis, analysis_error_cov + return ensemble_analysis def EnRSKF_Analysis(self, ensemble): """ @@ -597,13 +478,13 @@ def EnRSKF_Analysis(self, ensemble): # compute the analysis ensemble ensemble_analysis = ensemble_forecast_mean.reshape(-1,1) + self.Cov_model @ V.T @ np.linalg.solve(IN, obs_anomaly) - # singular value decomposition - U,S,Vt = np.linalg.svd(I - V.T @ np.linalg.solve(IN, V)) + # # singular value decomposition + # U,S,Vt = np.linalg.svd(I - V.T @ np.linalg.solve(IN, V)) - # compute the analysis error covariance - analysis_error_cov = ensemble_analysis + self.Cov_model@(U@np.diag(np.sqrt(S))@U.T) + # # compute the analysis error covariance + # analysis_error_cov = ensemble_analysis + self.Cov_model@(U@np.diag(np.sqrt(S))@U.T) - return ensemble_analysis, analysis_error_cov + return ensemble_analysis def EnTKF_Analysis(self, ensemble): """ @@ -637,12 +518,13 @@ def EnTKF_Analysis(self, ensemble): ensemble_analysis = ensemble_forecast_mean.reshape(-1,1) + self.Cov_model@inv_analysis_error_cov@rhs # compute the analysis ensemble increment - ensemble_increment = self.Cov_model @ U @ np.diag(np.sqrt(1/(S+1))) @ U.T + # ensemble_increment = self.Cov_model @ U @ np.diag(np.sqrt(1/(S+1))) @ U.T - # compute the analysis error covariance - analysis_error_cov = ensemble_analysis + ensemble_increment + # # compute the analysis error covariance + # analysis_error_cov = ensemble_analysis + ensemble_increment - return ensemble_analysis, analysis_error_cov + # return ensemble_analysis, analysis_error_cov + return ensemble_analysis # if __name__ == '__main__': diff --git a/src/run_model_da/icesee_da_serial.py b/src/run_model_da/icesee_da_serial.py index 52813e8..e677d37 100644 --- a/src/run_model_da/icesee_da_serial.py +++ b/src/run_model_da/icesee_da_serial.py @@ -492,13 +492,13 @@ def icesee_model_data_assimilation_serial(**model_kwargs): # Compute the analysis ensemble if EnKF_flag: - ensemble_vec, Cov_model = analysis.EnKF_Analysis(ensemble_vec) + ensemble_vec = analysis.EnKF_Analysis(ensemble_vec) elif DEnKF_flag: - ensemble_vec, Cov_model = analysis.DEnKF_Analysis(ensemble_vec) + ensemble_vec = analysis.DEnKF_Analysis(ensemble_vec) elif EnRSKF_flag: - ensemble_vec, Cov_model = analysis.EnRSKF_Analysis(ensemble_vec) + ensemble_vec = analysis.EnRSKF_Analysis(ensemble_vec) elif EnTKF_flag: - ensemble_vec, Cov_model = analysis.EnTKF_Analysis(ensemble_vec) + ensemble_vec = analysis.EnTKF_Analysis(ensemble_vec) else: raise ValueError("Filter type not supported")