Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
#!/bin/bash
#SBATCH -t250:00:00
#SBATCH -JISMIP
#SBATCH -N 11
#SBATCH -n 264
#SBATCH --ntasks-per-node=24
#SBATCH -p cpu-large # pick ONE partition; or switch to: -p inferno
#SBATCH --mem=256G # ensure this is allowed per node on that partition
#SBATCH -A gts-arobel3-atlas
#SBATCH -ostrong_scaling-%j.out
#SBATCH --mail-type=BEGIN,END,FAIL
#SBATCH --mail-user=bkyanjo3@gatech.edu

# set -euo pipefail
cd $SLURM_SUBMIT_DIR
Copy link

Copilot AI Feb 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inconsistent quoting: cd $SLURM_SUBMIT_DIR should use quotes like cd "$SLURM_SUBMIT_DIR" (as done in the weak scaling script on line 15) to handle paths with spaces safely. This is a shell scripting best practice.

Suggested change
cd $SLURM_SUBMIT_DIR
cd "$SLURM_SUBMIT_DIR"

Copilot uses AI. Check for mistakes.

# Spack env (numpy, mpi4py, h5py, etc.)
module purge || true
source /storage/home/hcoda1/8/bkyanjo3/r-arobel3-0/ICESEE-Spack/scripts/activate.sh
MPI_DIR="$(spack -e /storage/home/hcoda1/8/bkyanjo3/r-arobel3-0/ICESEE-Spack/.spack-env/icesee location -i openmpi)"
export PATH="$MPI_DIR/bin:$PATH"
export LD_LIBRARY_PATH="$MPI_DIR/lib:$LD_LIBRARY_PATH"

# --- Thread pinning ---------------------------------------------------
export ICESEE_PERFORMANCE_TEST=1

# --- Test Matrix ------------------------------------------------------
NENS=64
MODEL_NPROCS=4
# NP_LIST="64 32 16 8 4 2 1"
NP_LIST="8 64 32 16 4 2 1"

OUTROOT="strong_scaling_$(date +%Y%m%d_%H%M%S)"
mkdir -p "$OUTROOT"

# "$MPI_DIR/bin/mpirun" -np "$np" \
# srun --mpi=pmix -n "$np" \

Comment on lines +36 to +38
Copy link

Copilot AI Feb 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These commented-out lines (36-37) appear to be leftover debug or alternative implementation code. Since this is production code for scaling tests, consider removing these comments to improve readability, or if they serve as documentation for alternative MPI launch methods, add a clearer explanation of why they're preserved.

Suggested change
# "$MPI_DIR/bin/mpirun" -np "$np" \
# srun --mpi=pmix -n "$np" \

Copilot uses AI. Check for mistakes.
run_case () {
local np="$1"
local tag="np${np}_Nens${NENS}_mp${MODEL_NPROCS}"
echo "=== Running case: $tag ==="
mkdir -p "${OUTROOT}/${tag}"

/usr/bin/time -v \
srun --mpi=pmix -n "$np" \
python run_da_issm.py \
--Nens=${NENS} \
--model_nprocs=${MODEL_NPROCS} \
--verbose \
> "${OUTROOT}/${tag}/stdout.log" \
2> "${OUTROOT}/${tag}/stderr.log"

echo "=== Finished: $tag ==="
}

for np in $NP_LIST; do
(( np > NENS )) && { echo "Skip np=${np} (NENS=${NENS})"; continue; }
run_case "$np"
done

echo "All runs complete. Results in: ${OUTROOT}"
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
#!/bin/bash
#SBATCH -t250:00:00
#SBATCH -JISMIP
#SBATCH -N 14 # 1080 / 24 = 45 nodes
Copy link

Copilot AI Feb 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment on line 4 is inconsistent with the actual SLURM directive. The comment says "1080 / 24 = 45 nodes" but the directive specifies 14 nodes. Either the comment should be updated to "336 / 24 = 14 nodes" (based on the -n 336 directive on line 5), or if 45 nodes is actually needed, the -N directive should be corrected.

Suggested change
#SBATCH -N 14 # 1080 / 24 = 45 nodes
#SBATCH -N 14 # 336 / 24 = 14 nodes

Copilot uses AI. Check for mistakes.
#SBATCH -n 336 # request enough tasks for the max np
#SBATCH --ntasks-per-node=24
#SBATCH -p cpu-large # pick ONE partition; or switch to: -p inferno
#SBATCH --mem=256G # per-node; ensure this is allowed on the partition
#SBATCH -A gts-arobel3-atlas
#SBATCH -oweak_scaling-%j.out
#SBATCH --mail-type=BEGIN,END,FAIL
#SBATCH --mail-user=bankyanjo@gmail.com

# set -euo pipefail
cd "$SLURM_SUBMIT_DIR"

# Spack env (numpy, mpi4py, h5py, etc.)
module purge || true
source /storage/home/hcoda1/8/bkyanjo3/r-arobel3-0/ICESEE-Spack/scripts/activate.sh
MPI_DIR="$(spack -e /storage/home/hcoda1/8/bkyanjo3/r-arobel3-0/ICESEE-Spack/.spack-env/icesee location -i openmpi)"
export PATH="$MPI_DIR/bin:$PATH"
export LD_LIBRARY_PATH="$MPI_DIR/lib:$LD_LIBRARY_PATH"

# --- Performance flags ------------------------------------------------
export ICESEE_PERFORMANCE_TEST=1

# --- Weak scaling: np = Nens; model_nprocs fixed ---------------------
MODEL_NPROCS=4
NP_LIST="4 8 16 32 64 128 256"

OUTROOT="weak_scaling_$(date +%Y%m%d_%H%M%S)"
mkdir -p "$OUTROOT"

run_case () {
local np="$1"
local nens="$np" # enforce np = Nens
local tag="np${np}_Nens${nens}_mp${MODEL_NPROCS}"
echo "=== Running case: $tag ==="
mkdir -p "${OUTROOT}/${tag}"

# Compute total ranks used = np * (MODEL_NPROCS + 1)
local rank_size=$(( np * (MODEL_NPROCS + 1) ))
Comment on lines +41 to +42
Copy link

Copilot AI Feb 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The variable rank_size is computed on line 42 but never used. This calculation appears intended for resource allocation or validation but is not utilized. Either remove this unused variable or add the necessary logic that uses it for validation or configuration.

Suggested change
# Compute total ranks used = np * (MODEL_NPROCS + 1)
local rank_size=$(( np * (MODEL_NPROCS + 1) ))

Copilot uses AI. Check for mistakes.
local data_path="_modelrun_datasets"
Copy link

Copilot AI Feb 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The variable data_path is defined on line 43 but never used. Either remove this unused variable or add the necessary logic that references this path.

Suggested change
local data_path="_modelrun_datasets"

Copilot uses AI. Check for mistakes.

/usr/bin/time -v \
"$MPI_DIR/bin/mpirun" -np "$np" \
python run_da_issm.py \
--Nens="${nens}" \
--model_nprocs="${MODEL_NPROCS}" \
--verbose \
> "${OUTROOT}/${tag}/stdout.log" \
2> "${OUTROOT}/${tag}/stderr.log"

echo "=== Finished: $tag ==="
}

# Safety: skip if requested np exceeds SLURM allocation
for np in $NP_LIST; do
if (( np > SLURM_NTASKS )); then
echo "Skip np=${np} (exceeds allocation SLURM_NTASKS=${SLURM_NTASKS})"
continue
fi
run_case "$np"
# pause for 2 minutes between runs to allow for cleanup and resource availability
sleep 120
done

echo "All runs complete. Results in: ${OUTROOT}"
158 changes: 20 additions & 138 deletions src/EnKF/python_enkf/EnKF.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,10 +122,12 @@ def forecast_step(self, ensemble=None, forecast_step_single=None, **model_kwargs
noise = np.concatenate(q0, axis=0)
model_kwargs.update({"noise": noise})
del noise_all, q0, noise_, W

# q0 = generate_enkf_field(None, np.sqrt(Lx*Ly), hdim, params["total_state_param_vars"], rh=len_scale, verbose=False)

# for key,value in updated_state.items():
# ensemble[indx_map[key],ens] = value + q0[:len(value)]
# ensemble[indx_map[key],ens] = value + q0[:len(value)]
# ensemble[indx_map[key],ens] = value + np.random.multivariate_normal(np.zeros(len(value)), 0.0*np.eye(len(value)))
Comment on lines +129 to +130
Copy link

Copilot AI Feb 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The indentation for these commented-out lines appears inconsistent. Line 129 has correct indentation (matching the comment above it), but line 130 has extra indentation that doesn't align with the surrounding code structure. This should be corrected for consistency, even though the code is commented out.

Copilot uses AI. Check for mistakes.

return ensemble

Expand Down Expand Up @@ -402,13 +404,14 @@ def EnKF_Analysis(self, ensemble):
ensemble_analysis[:,ens] = ensemble[:,ens] + KalGain @ (virtual_observations[:,ens] - self.Observation_function(ensemble[:,ens]))

# compute the analysis error covariance
difference = ensemble_analysis - np.mean(ensemble_analysis, axis=1,keepdims=True)
analysis_error_cov =(1/(Nens-1)) * difference @ difference.T
# difference = ensemble_analysis - np.mean(ensemble_analysis, axis=1,keepdims=True)
# analysis_error_cov =(1/(Nens-1)) * difference @ difference.T

# Debug
# print(f"[Debug] max of analysis error covariance: {np.max(analysis_error_cov[567:,567:])}")

return ensemble_analysis, analysis_error_cov
# return ensemble_analysis, analysis_error_cov
return ensemble_analysis
Comment on lines +413 to +414
Copy link

Copilot AI Feb 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The docstring for this method (lines 380-389, outside the changed region) still indicates it returns both ensemble_analysis and analysis_error_cov, but the implementation now only returns ensemble_analysis. While the docstring itself cannot be edited in this PR since it's outside the changed regions, this creates an inconsistency between documentation and implementation that should be addressed.

Copilot uses AI. Check for mistakes.

def DEnKF_Analysis(self, ensemble):
"""
Expand Down Expand Up @@ -445,132 +448,10 @@ def DEnKF_Analysis(self, ensemble):
ensemble_analysis = analysis_anomalies + analysis_mean.reshape(-1,1)

# compute the analysis error covariance
analysis_error_cov =(1/(N-1)) * analysis_anomalies @ analysis_anomalies.T

return ensemble_analysis, analysis_error_cov

# def DEnKF_Analysis(self, ensemble):
# """
# Deterministic Ensemble Kalman Filter (DEnKF) analysis step using MPI.

# Parameters:
# ensemble: ndarray - Ensemble matrix (n x N).

# Returns:
# ensemble_analysis: updated ensemble matrix (n x N).
# analysis_error_cov: ndarray - Analysis error covariance matrix (n x n).
# """
# analysis_error_cov =(1/(N-1)) * analysis_anomalies @ analysis_anomalies.T

# if re.match(r"\Aserial\Z", self.parallel_flag, re.IGNORECASE):

# print("DEnKF Analysis: Starting analysis step")

# # Compute the Kalman gain
# print("DEnKF Analysis: Computing Kalman gain")
# KalGain = self._compute_kalman_gain()

# # Get dimensions
# n, N = ensemble.shape
# m = self.Observation_vec.shape[0]
# print(f"DEnKF Analysis: Ensemble dimensions (n={n}, N={N}), Observation size (m={m})")

# # Compute ensemble mean
# print("DEnKF Analysis: Computing ensemble forecast mean")
# ensemble_forecast_mean = np.mean(ensemble, axis=1)
# print(f"DEnKF Analysis: Ensemble forecast mean calculated: {ensemble_forecast_mean}")

# # Compute analysis mean
# print("DEnKF Analysis: Computing analysis mean")
# analysis_mean = ensemble_forecast_mean + KalGain @ (self.Observation_vec - self.Observation_function(ensemble_forecast_mean))
# print(f"DEnKF Analysis: Analysis mean calculated: {analysis_mean}")

# # Compute forecast and analysis anomalies
# print("DEnKF Analysis: Computing forecast and analysis anomalies")
# forecast_anomalies = np.zeros_like(ensemble)
# analysis_anomalies = np.zeros_like(ensemble)

# for i in range(N):
# forecast_anomalies[:, i] = ensemble[:, i] - ensemble_forecast_mean
# analysis_anomalies[:, i] = forecast_anomalies[:, i] - 0.5 * KalGain @ self.Observation_function(forecast_anomalies[:, i])
# print(f"DEnKF Analysis: Anomalies for member {i} calculated")

# # Compute the analysis ensemble
# print("DEnKF Analysis: Computing analysis ensemble")
# ensemble_analysis = analysis_anomalies + analysis_mean.reshape(-1, 1)
# print(f"DEnKF Analysis: Analysis ensemble computed: {ensemble_analysis}")

# # Compute the analysis error covariance
# print("DEnKF Analysis: Computing analysis error covariance")
# analysis_error_cov = (1 / (N - 1)) * analysis_anomalies @ analysis_anomalies.T
# print(f"DEnKF Analysis: Analysis error covariance computed: {analysis_error_cov}")

# print("DEnKF Analysis: Analysis step completed")
# return ensemble_analysis, analysis_error_cov

# elif re.match(r"\AMPI\Z", self.parallel_flag, re.IGNORECASE):

# from mpi4py import MPI
# comm = MPI.COMM_WORLD
# rank = comm.Get_rank()
# size = comm.Get_size()

# n, N = ensemble.shape

# # Distribute the ensemble among processes
# chunk_sizes = [(N // size) + (1 if i < (N % size) else 0) for i in range(size)]
# displacements = [sum(chunk_sizes[:i]) for i in range(size)]
# start_idx = displacements[rank]
# end_idx = start_idx + chunk_sizes[rank]
# local_ensemble = ensemble[:, start_idx:end_idx]

# # 1. Compute local ensemble mean
# print(f"Rank {rank}: Starting local mean calculation")
# local_mean = np.mean(local_ensemble, axis=1)

# # 2. Compute the global mean
# global_mean = np.zeros(n)
# comm.Allreduce(local_mean, global_mean, op=MPI.SUM)
# global_mean /= N
# print(f"Rank {rank}: Global mean calculated: {global_mean}")

# # 3. Compute Kalman gain
# KalGain = self._compute_kalman_gain()

# # 4. Compute the analysis mean
# if rank == 0:
# print(f"Rank {rank}: Computing analysis mean")
# analysis_mean = global_mean + KalGain @ (self.Observation_vec - self.Observation_function(global_mean))

# # 5. Compute local anomalies
# local_anomalies = local_ensemble - global_mean.reshape(-1, 1)

# # 6. Apply correction to local anomalies
# local_analysis_anomalies = np.zeros_like(local_anomalies)
# for i in range(local_anomalies.shape[1]):
# local_analysis_anomalies[:, i] = (
# local_anomalies[:, i] - 0.5 * KalGain @ self.Observation_function(local_anomalies[:, i])
# )
# print(f"Rank {rank}: Local analysis anomalies calculated")

# # 7. Gather analysis anomalies on rank 0
# gathered_anomalies = comm.gather(local_analysis_anomalies, root=0)

# # 8. Assemble analysis ensemble and compute covariance on rank 0
# if rank == 0:
# print(f"Rank {rank}: Assembling analysis anomalies")
# analysis_anomalies = np.hstack(gathered_anomalies)
# ensemble_analysis = analysis_anomalies + analysis_mean.reshape(-1, 1)
# analysis_error_cov = (1 / (N - 1)) * analysis_anomalies @ analysis_anomalies.T
# else:
# ensemble_analysis = None
# analysis_error_cov = None

# # 9. Broadcast results to all processes
# ensemble_analysis = comm.bcast(ensemble_analysis, root=0)
# analysis_error_cov = comm.bcast(analysis_error_cov, root=0)
# print(f"Rank {rank}: Final analysis broadcast completed")

# return ensemble_analysis, analysis_error_cov
# return ensemble_analysis, analysis_error_cov
return ensemble_analysis
Comment on lines +453 to +454
Copy link

Copilot AI Feb 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The docstring for this method (lines 417-426, outside the changed region) still indicates it returns both ensemble_analysis and analysis_error_cov, but the implementation now only returns ensemble_analysis. This creates an inconsistency between documentation and implementation that should be addressed.

Copilot uses AI. Check for mistakes.

def EnRSKF_Analysis(self, ensemble):
"""
Expand All @@ -597,13 +478,13 @@ def EnRSKF_Analysis(self, ensemble):
# compute the analysis ensemble
ensemble_analysis = ensemble_forecast_mean.reshape(-1,1) + self.Cov_model @ V.T @ np.linalg.solve(IN, obs_anomaly)

# singular value decomposition
U,S,Vt = np.linalg.svd(I - V.T @ np.linalg.solve(IN, V))
# # singular value decomposition
# U,S,Vt = np.linalg.svd(I - V.T @ np.linalg.solve(IN, V))

# compute the analysis error covariance
analysis_error_cov = ensemble_analysis + self.Cov_model@(U@np.diag(np.sqrt(S))@U.T)
# # compute the analysis error covariance
# analysis_error_cov = ensemble_analysis + self.Cov_model@(U@np.diag(np.sqrt(S))@U.T)

return ensemble_analysis, analysis_error_cov
return ensemble_analysis
Copy link

Copilot AI Feb 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The docstring for this method (lines 457-466, outside the changed region) still indicates it returns both ensemble_analysis and analysis_error_cov, but the implementation now only returns ensemble_analysis. This creates an inconsistency between documentation and implementation that should be addressed.

Copilot uses AI. Check for mistakes.

def EnTKF_Analysis(self, ensemble):
"""
Expand Down Expand Up @@ -637,12 +518,13 @@ def EnTKF_Analysis(self, ensemble):
ensemble_analysis = ensemble_forecast_mean.reshape(-1,1) + self.Cov_model@inv_analysis_error_cov@rhs

# compute the analysis ensemble increment
ensemble_increment = self.Cov_model @ U @ np.diag(np.sqrt(1/(S+1))) @ U.T
# ensemble_increment = self.Cov_model @ U @ np.diag(np.sqrt(1/(S+1))) @ U.T

# compute the analysis error covariance
analysis_error_cov = ensemble_analysis + ensemble_increment
# # compute the analysis error covariance
# analysis_error_cov = ensemble_analysis + ensemble_increment

return ensemble_analysis, analysis_error_cov
# return ensemble_analysis, analysis_error_cov
return ensemble_analysis
Comment on lines +526 to +527
Copy link

Copilot AI Feb 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The docstring for this method (lines 490-499, outside the changed region) still indicates it returns both ensemble_analysis and analysis_error_cov, but the implementation now only returns ensemble_analysis. This creates an inconsistency between documentation and implementation that should be addressed.

Copilot uses AI. Check for mistakes.


# if __name__ == '__main__':
Expand Down
8 changes: 4 additions & 4 deletions src/run_model_da/icesee_da_serial.py
Original file line number Diff line number Diff line change
Expand Up @@ -492,13 +492,13 @@ def icesee_model_data_assimilation_serial(**model_kwargs):

# Compute the analysis ensemble
if EnKF_flag:
ensemble_vec, Cov_model = analysis.EnKF_Analysis(ensemble_vec)
ensemble_vec = analysis.EnKF_Analysis(ensemble_vec)
elif DEnKF_flag:
ensemble_vec, Cov_model = analysis.DEnKF_Analysis(ensemble_vec)
ensemble_vec = analysis.DEnKF_Analysis(ensemble_vec)
elif EnRSKF_flag:
ensemble_vec, Cov_model = analysis.EnRSKF_Analysis(ensemble_vec)
ensemble_vec = analysis.EnRSKF_Analysis(ensemble_vec)
elif EnTKF_flag:
ensemble_vec, Cov_model = analysis.EnTKF_Analysis(ensemble_vec)
ensemble_vec = analysis.EnTKF_Analysis(ensemble_vec)
else:
raise ValueError("Filter type not supported")

Expand Down
Loading