From 5e0646e8bb9433369be4084ae1c197b81217024f Mon Sep 17 00:00:00 2001 From: prinse1545 Date: Thu, 26 Jun 2025 09:27:20 -0500 Subject: [PATCH 1/3] new interpolator --- posydon/interpolation/data_scaling.py | 43 +- posydon/interpolation/new_interpolator.py | 462 ++++++++++++++++++ posydon/unit_tests/interpolation/__init__.py | 0 .../interpolation/test_IF_interpolation.py | 22 + 4 files changed, 512 insertions(+), 15 deletions(-) create mode 100644 posydon/interpolation/new_interpolator.py create mode 100644 posydon/unit_tests/interpolation/__init__.py create mode 100644 posydon/unit_tests/interpolation/test_IF_interpolation.py diff --git a/posydon/interpolation/data_scaling.py b/posydon/interpolation/data_scaling.py index 901e66f61c..7fbbd9ee7f 100644 --- a/posydon/interpolation/data_scaling.py +++ b/posydon/interpolation/data_scaling.py @@ -8,6 +8,18 @@ import numpy as np +eps = 1.0e-16 + +SCALING_OPTIONS = [ + "min_max", + "max_abs", + "standardize", + "log_min_max", + "neg_log_min_max", + "log_max_abs", + "log_standardize", + "neg_log_standardize" +] class DataScaler: """Data Normalization class. @@ -68,27 +80,28 @@ def fit(self, x, method='none', lower=-1.0, upper=1.0): if method == 'min_max': assert upper > lower, "upper must be greater than lower" self.lower, self.upper = lower, upper - self.params = [x.min(axis=0), x.max(axis=0)] + self.params = [np.nanmin(x, axis=0), np.nanmax(x, axis=0)] elif method == 'log_min_max': assert upper > lower, "upper must be greater than lower" self.lower, self.upper = lower, upper - self.params = [np.log10(x.min(axis=0)), np.log10(x.max(axis=0))] + self.params = [np.log10(np.nanmin(x, axis=0)), np.log10(np.nanmax(x, axis=0))] + elif method == 'neg_log_min_max': assert upper > lower, "upper must be greater than lower" self.lower, self.upper = lower, upper - self.params = [np.log10((-x).min(axis=0)), - np.log10((-x).max(axis=0))] + self.params = [np.log10(np.nanmin(-x, axis=0)), + np.log10(np.nanmax(-x, axis=0))] elif method == 'max_abs': - self.params = [np.abs(x).max(axis=0)] + self.params = [np.nanmax(np.abs(x), axis=0)] elif method == 'log_max_abs': - self.params = [np.abs(np.log10(x)).max(axis=0)] - elif method == 'standarize': - self.params = [x.mean(axis=0), x.std(axis=0)] - elif method == 'log_standarize': + self.params = [np.nanmax(np.abs(np.log10(x)), axis=0)] + elif method == 'standardize': + self.params = [np.nanmean(x, axis=0), np.nanstd(x, axis=0)] + elif method == 'log_standardize': # log will be computed in transform again - self.params = [np.log10(x).mean(axis=0), np.log10(x).std(axis=0)] - elif method == 'neg_log_standarize': # log(-x) - self.params = [np.log10(-x).mean(axis=0), np.log10(-x).std(axis=0)] + self.params = [np.nanmean(np.log10(x), axis=0), np.nanstd(np.log10(x), axis=0)] + elif method == 'neg_log_standardize': # log(-x) + self.params = [np.nanmean(np.log10(-x), axis=0), np.nanstd(np.log10(-x), axis=0)] elif method == 'log': self.params = [] elif method == 'none': # no transformation @@ -135,12 +148,12 @@ def transform(self, x): x_t = x / self.params[0] elif self.method == 'log_max_abs': x_t = np.log10(x) / self.params[0] - elif self.method == 'standarize': + elif self.method == 'standardize': x_t = (x - self.params[0]) / self.params[1] - elif self.method == 'log_standarize': + elif self.method == 'log_standardize': # log will be computed in transform again x_t = (np.log10(x) - self.params[0]) / self.params[1] - elif self.method == 'neg_log_standarize': + elif self.method == 'neg_log_standardize': x_t = (np.log10(-x) - self.params[0]) / self.params[1] elif self.method == 'log': x_t = np.log10(x) diff --git a/posydon/interpolation/new_interpolator.py b/posydon/interpolation/new_interpolator.py new file mode 100644 index 0000000000..e7b0b34fe6 --- /dev/null +++ b/posydon/interpolation/new_interpolator.py @@ -0,0 +1,462 @@ +""" +Module implementing initial-final (IF) interpolation. + +""" + +import numpy as np +import os +import pickle +from datetime import date + +from scipy.spatial import Delaunay +# POSYDON +from posydon.grids.psygrid import PSyGrid +from posydon.interpolation.data_scaling import DataScaler, SCALING_OPTIONS +from posydon.utils.posydonwarning import Pwarn +from posydon.interpolation.constraints import ( + find_constraints_to_apply, sanitize_interpolated_quantities) + +# ML Imports +from sklearn.neighbors import KNeighborsClassifier +from sklearn.model_selection import train_test_split +from sklearn.metrics import balanced_accuracy_score + +import sys + +eps = 1.0e-16 + +# INITIAL-FINAL INTERPOLATOR +class IFInterpolator: + """Class handling initial-final interpolation. + + Class handling initial-final interpolation with support to interpolate + certain keys by differing classes. + """ + + def __init__(self, grids = None, interpolators = None): + """Initialize the IFInterpolator class. + + Parameters + ---------- + grid : PSyGrid + The training grid or tuple of PSyGrid (training, validation) + interpolators : list of dictionaries + Contain parameters for the BaseIFIneterpolators to be constructed + + """ + self.interpolators = [] + self.interpolator_parameters = interpolators + self.grids = grids + + if self.interpolator_parameters is not None: + out_keys = [] + + for params in self.interpolator_parameters: + + if ("out_keys" not in params + and len(self.interpolator_parameters) > 1): + raise ValueError("Overlapping out keys between different " + "interpolators are not permited!") + elif "out_keys" not in params: + continue + + for key in params["out_keys"]: + if(key in out_keys): + raise ValueError( + f"Overlapping out keys between different " + f"interpolators are not permited! ({key} in more " + f"than one set of out_keys)") + else: + out_keys.append(key) + + + def train(self): + """Train the interpolator(s) on the PSyGrid used for construction.""" + for interpolator in self.interpolator_parameters: + interp = BaseIFInterpolator(grids=self.grids, + **interpolator) + interp.train() + self.interpolators.append(interp) + + self.interp_in_q = self.interpolators[0].interp_in_q + + def evaluate(self, binary, sanitization_verbose=False): + """Get the output vector approximation. + + Get the output vector approximation as well as all of the + classifications given a Binary Star + + Parameters + ---------- + binary : BinaryStar + A class which is an input vector which are to be classified and for + which an output vector will be approximated. + + Returns + ------- + Output space approximation as a tuple containing two dictionary + + """ + ynums = [] + ycats = [] + n = [] + + for interpolator in self.interpolators: + ynum, ycat, ns = interpolator.evaluate(binary) + + ynums.extend(ynum) + ycats.extend(ycat) + n.extend(ns) + + return ynums, ycats, n + + + def test_interpolator(self, initial_values, sanitization_verbose = False): + """ Method that can take in a 2-D numpy array for more efficient use of the interpolator + + Parameters + ---------- + initial_values : numpy array + A numpy array containing the in-key values of the binaries to be evolved + + Return + ------ + Interpolated values + """ + new_values = np.array(initial_values, copy = True) + + if self.interp_in_q: + new_values.T[1] = new_values.T[1] / new_values.T[0] + + final_values = [] + + for interpolator in self.interpolators: + final_values.append(interpolator.test_interpolator(new_values).T) + + return np.concatenate(final_values).T + + def test_classifiers(self, initial_values): + """ Method that can take in a 2-D numpy array for more efficient use of classifiers + + Parameters + ---------- + initial_values : numpy array + A numpy array containing the in-key values of the binaries to be classified + + Return + ------ + Classified values + """ + new_values = np.array(initial_values, copy = True) + + if self.interp_in_q: + new_values.T[1] = new_values.T[1] / new_values.T[0] + + classes = {} + + for interpolator in self.interpolators: + + classes = {**classes, **interpolator.test_classifiers(new_values)} + + return classes + + + + def load(self, filename): + """Load a saved IFInterpolator from a .pkl file. + + Parameters + ---------- + filename : string + Path to the .pkl file + + """ + with open(filename, 'rb') as f: + self.interpolators = pickle.load(f) + + self.interp_in_q = self.interpolators[0].interp_in_q + + def save(self, filename): + """Save the interpolator to a .pkl file. + + Parameters + ---------- + filename : string + Path where .pkl file will be saved + + """ + with open(filename, "wb") as f: + pickle.dump(self.interpolators, f) + +class BaseIFInterpolator: + + def __init__(self, grids, in_keys, out_keys, interpolation_class, max_k): + + if type(grids) != list: + sys.exit("Please provide a list of PSyGrids containing both a training and validation grid to train the interpolator") + else: + + self.in_keys = in_keys + + self.continous_out_keys = out_keys # keys to be interpolated which correspond to numerical quantities + self.discrete_out_keys = out_keys # keys to be interpolated which correspond to discrete quantities + + self.interpolation_class = interpolation_class + self.max_k = max_k + + self.training_grid = self.preprocess_grid(grids[0], training_grid = True) + self.validation_grid = self.preprocess_grid(grids[1]) + + self.triangulate(self.training_grid) + + def train(self): + self.find_hyperparameters() + # self.optimize_normalization() + + def evaluate(self, initial_values): + + if self.classifier is None: + sys.exit("Please find classifier hyperparameters before using interpolator") + + if not self.training: + initial_values = ( np.log10(initial_values + eps) - self.iv_min ) / (self.iv_max - self.iv_min + eps) + + classes = self.classifier.predict(initial_values) + + interpolated_values = [] + n = [] + + for iv, klass in zip(initial_values, classes): + if klass == "initial_MT": + continue + + triangulation = self.training_grid["triangulations"][klass] + + simplex = triangulation.find_simplex(iv) + + if simplex == -1: + interpolated_values.append( + self.get_nearest_neighbor(iv) + ) + continue + + vertices = triangulation.simplices[simplex] + + class_inds = self.training_grid["class_inds"][klass] + + final_values = self.training_grid["final_values"][class_inds][vertices] + + if self.training: + final_values = self.normalize_output(final_values, klass) + + barycentric_weights = self.compute_barycentric_coordinates(iv, triangulation.points[vertices])[..., np.newaxis] + + interpolated = np.sum(final_values * barycentric_weights, axis = 0) + + n.append({ + "weights": barycentric_weights, + "ics": triangulation.points[vertices], + "ic": iv, + "final_values": final_values, + "interpolated": interpolated + }) + + # interpolated = self.denormalize_output(interpolated, klass) + interpolated_values.append(interpolated) + + interpolated_values = np.array(interpolated_values) + classes = np.array(classes) + + return interpolated_values, classes, n + + def find_hyperparameters(self): + + k_accuracies = [] + + for k in range(1, self.max_k): + + validation_classifier = KNeighborsClassifier(n_neighbors = k, weights = "distance") + validation_classifier.fit(self.training_grid["initial_values"], self.training_grid["classes"]) + + predicted_classes = validation_classifier.predict(self.validation_grid["initial_values"]) + + k_accuracies.append( + balanced_accuracy_score(predicted_classes, self.validation_grid["classes"]) + ) + + self.k_accuracies = np.array(k_accuracies) + + self.k_star = self.k_accuracies.argmax() + + self.classifier = KNeighborsClassifier(n_neighbors = self.k_star, weights = "distance") + self.classifier.fit(self.training_grid["initial_values"], self.training_grid["classes"]) + + def optimize_normalization(self): + + output_dim = len(self.continous_out_keys) + + all_scalers = [] + scaler_accuracies = np.zeros(shape = (len(SCALING_OPTIONS), len(self.classes), output_dim)) + + self.training = True + + for i, option in enumerate(SCALING_OPTIONS): + scalers = [[DataScaler()] * output_dim] * len(self.classes) + + for j, klass in enumerate(self.classes): + for parameter in range(output_dim): + param_min = np.nanmin(self.training_grid["final_values"][:, parameter]) + if param_min < 0 and "log" in option: + scalers[j][parameter].fit(self.training_grid["final_values"][:, parameter], "none") + else: + scalers[j][parameter].fit(self.training_grid["final_values"][:, parameter], option) + + self.out_scalers = scalers + + class_mask = self.validation_grid["class_inds"][klass] + class_ivs = self.validation_grid["initial_values"][class_mask] + + interpolated_values, predicted_classes, _ = self.evaluate(class_ivs) + predicted_class_mask = np.where(predicted_classes != "initial_MT") # taking out mispredictions of initial MT + + relative_errors = np.abs( + (interpolated_values - self.validation_grid["final_values"][class_mask][predicted_class_mask]) + / (self.validation_grid["final_values"][class_mask][predicted_class_mask] + eps) + ).mean(axis = 0) + + scaler_accuracies[i][j] = relative_errors + + all_scalers.append(scalers) + + all_scalers = np.array(all_scalers, dtype = object) + + star_scalers = [] + star_scaler_inds = scaler_accuracies.argmax(axis = 0) + + for c in range(len(self.classes)): + star_scalers.append( + all_scalers[:, c][star_scaler_inds[c], np.arange(len(self.continous_out_keys))] + ) + + self.out_scalers = np.array(star_scalers, dtype = object) + self.normalize_triangulations() + + self.training = False + + # =================== helper methods below =========================== + + def preprocess_grid(self, grid, training_grid = False): + + final_values = np.array(grid.final_values[self.continous_out_keys].tolist()) + + valid_inds = np.where( + (grid.final_values["interpolation_class"] != "not_converged") & + (grid.final_values["interpolation_class"] != "ignored_no_RLO") & + (grid.final_values["interpolation_class"] != "ignored_no_binary_history") + )[0] + + initial_values = np.array(grid.initial_values[self.in_keys][valid_inds].tolist()) + # determining if should interp in q + if training_grid: + m1, m2 = 10**initial_values[:, 0], 10**initial_values[:, 1] + self.interp_in_q = (m2[m1 > 0.95 * m1.max()].min() / m2[m1 < 1.05 * m1.min()].min() > 2) + self.interp_in_q = False + + initial_values = np.log10(initial_values + eps) + + if self.interp_in_q: + initial_values[:, 1] = (10**initial_values[:, 1] - eps) / (10**initial_values[:, 0] - eps) + + if training_grid: + self.iv_min = initial_values.min(axis = 0, keepdims = True) + self.iv_max = initial_values.max(axis = 0, keepdims = True) + + class_labels = np.unique(grid.final_values[valid_inds][self.interpolation_class]) + class_inds = dict(zip( + class_labels, + [np.where(grid.final_values[valid_inds][self.interpolation_class] == label)[0] for label in class_labels] + )) + + return { + "initial_values": (initial_values - self.iv_min) / (self.iv_max - self.iv_min + eps), + "final_values": np.array(grid.final_values[self.continous_out_keys][valid_inds].tolist()), + "final_classes": grid.final_values[valid_inds][self.discrete_out_keys], + "classes": grid.final_values[valid_inds][self.interpolation_class], + "class_inds": class_inds, + } + + def triangulate(self, grid_dict): + + self.classes = np.unique(grid_dict["classes"]).tolist() + self.classes.remove("initial_MT") + + triangulations = {} + + for klass in self.classes: + + class_inds = grid_dict["class_inds"][klass] + triangulations[klass] = Delaunay(grid_dict["initial_values"][class_inds]) + + grid_dict["triangulations"] = triangulations + + def compute_barycentric_coordinates(self, point, coords): + + T = np.array([ + coords[0] - coords[3], + coords[1] - coords[3], + coords[2] - coords[3] + ]) # our matrix + T = T.T + T_I = np.linalg.inv(T) + + r_a = point - coords[3] + + weights = (T_I @ r_a).tolist() + + weights.append(1 - weights[0] - weights[1] - weights[2]) + + weights = np.array(weights) / sum(weights) + + return weights + + def get_nearest_neighbor(self, iv): + + dists = np.sqrt(np.square(self.training_grid["initial_values"] - iv).sum(axis = 1)) + sorted_inds = dists.argsort() + + return self.training_grid["final_values"][sorted_inds[0]] + + def normalize_output(self, input, klass): + class_ind = self.classes.index(klass) + scalers = self.out_scalers[class_ind] + + ret_value = np.zeros_like(input) + + for dim, scaler in enumerate(scalers): + ret_value[:, dim] = scaler.transform(input[:, dim]) + + return ret_value + + + def denormalize_output(self, input, klass): + class_ind = self.classes.index(klass) + scalers = self.out_scalers[class_ind] + + ret_value = np.zeros_like(input) + + for dim, scaler in enumerate(scalers): + ret_value[dim] = scaler.inv_transform(input[dim]) + + return ret_value + + def normalize_triangulations(self): + + new_final_values = np.zeros_like(self.training_grid["final_values"]) + + for i, (klass, fv) in enumerate(zip(self.training_grid["classes"], self.training_grid["final_values"])): + if klass == "initial_MT": # should be taken out in preprocessing + continue + new_final_values[i] = self.normalize_output(np.array([fv]), klass) + + self.training_grid["final_values"] = new_final_values + \ No newline at end of file diff --git a/posydon/unit_tests/interpolation/__init__.py b/posydon/unit_tests/interpolation/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/posydon/unit_tests/interpolation/test_IF_interpolation.py b/posydon/unit_tests/interpolation/test_IF_interpolation.py new file mode 100644 index 0000000000..72ed259132 --- /dev/null +++ b/posydon/unit_tests/interpolation/test_IF_interpolation.py @@ -0,0 +1,22 @@ +"""Unit tests of posydon/interpolation/IF_interpolation.py +""" + +__authors__ = [ + "Philipp Rajah de Moura Srivastava " +] + +# import the module which will be tested +from posydon.interpolation.new_interpolator import IFInterpolator + +# import other needed code for the tests, which is not already imported in the +# module you like to test + + +# define single test functions +def test_name(): + pass + +# define test classes collecting several test functions +class TestClass: + def test_name(self): + assert True From 8af666d84cadb805447cf2d58a1de1bca81a7789 Mon Sep 17 00:00:00 2001 From: prinse1545 Date: Thu, 31 Jul 2025 09:39:49 -0500 Subject: [PATCH 2/3] sync --- posydon/interpolation/constraints.py | 68 ++++ posydon/interpolation/data_scaling.py | 68 ++-- posydon/interpolation/new_interpolator.py | 403 +++++++++------------- 3 files changed, 277 insertions(+), 262 deletions(-) diff --git a/posydon/interpolation/constraints.py b/posydon/interpolation/constraints.py index 70d875a222..d7fe62b4d9 100644 --- a/posydon/interpolation/constraints.py +++ b/posydon/interpolation/constraints.py @@ -50,6 +50,14 @@ from posydon.utils.common_functions import (stefan_boltzmann_law, orbital_separation_from_period) +CLASSIFICATION_KEYS = [ + "S<*>_state", + "mt_hist", + "S<*>_MOD_SN_type", + "S<*>_MOD_CO_type" +] + +N_MODELS = 11 # how many super nova models are there? # toggle this flag to enable/disable constraints (used for debugging) INTERPOLATION_CONSTRAINTS_ON = True @@ -511,3 +519,63 @@ def sanitize_interpolated_quantities(fvalues, constraints, verbose=False): constraint["constraint"]) return sanitized + + +def mt_constraint(classes): + + interpolation_class = classes["interpolation_class"] + + if interpolation_class == "initial_MT": + classes["mt_hist"] == "ini_RLO" + elif interpolation_class == "no_MT": + classes["mt_hist"] = "no_RLO" + elif interpolation_class == "stable_MT": + pass + elif interpolation_class == "unstable_MT": + pass + elif interpolation_class == "stable_reverse_MT": + pass + + +CLASS_CONSTRAINTS = { + "S<*>_state": None, + "mt_hist": mt_constraint, + "S<*>_MOD_SN_type": None, + "S<*>_MOD_CO_type": None +} + +def apply_class_constraint(key_name, classes): + + if key_name not in classes.keys(): + return + else: + CLASS_CONSTRAINTS[key_name](classes) + +def sanitize_classes(classes, ): + + assert(type(classes) == dict) + + if "interpolation_class" not in classes.keys(): + raise ValueError( + "Interpolation class must be present as a classified quantity to enforce classification constraints!" + ) + + for key in CLASSIFICATION_KEYS: + if "<*>" in key: + + for star in range(2): + key_name = key.replace("<*>", f"{star}") + + if "MOD" in key_name: + + for model in range(N_MODELS): + key_name = key_name.replace("", f"{model}") + + apply_class_constraint(key_name, classes) + + else: + apply_class_constraint(key_name, classes) + else: + + apply_class_constraint(key, classes) + \ No newline at end of file diff --git a/posydon/interpolation/data_scaling.py b/posydon/interpolation/data_scaling.py index 7fbbd9ee7f..84f27d92f1 100644 --- a/posydon/interpolation/data_scaling.py +++ b/posydon/interpolation/data_scaling.py @@ -7,18 +7,24 @@ import numpy as np +import warnings +import sys + +# Convert UserWarning to an error +warnings.simplefilter("error", RuntimeWarning) eps = 1.0e-16 SCALING_OPTIONS = [ + "none", "min_max", "max_abs", - "standardize", - "log_min_max", - "neg_log_min_max", - "log_max_abs", - "log_standardize", - "neg_log_standardize" + # "standardize", + "log_min_max", # has + # "neg_log_min_max", # has + "log_max_abs", # has + # "log_standardize", # has + # "neg_log_standardize" # has ] class DataScaler: @@ -84,24 +90,24 @@ def fit(self, x, method='none', lower=-1.0, upper=1.0): elif method == 'log_min_max': assert upper > lower, "upper must be greater than lower" self.lower, self.upper = lower, upper - self.params = [np.log10(np.nanmin(x, axis=0)), np.log10(np.nanmax(x, axis=0))] + self.params = [self.log(np.nanmin(x, axis=0)), self.log(np.nanmax(x, axis=0))] elif method == 'neg_log_min_max': assert upper > lower, "upper must be greater than lower" self.lower, self.upper = lower, upper - self.params = [np.log10(np.nanmin(-x, axis=0)), - np.log10(np.nanmax(-x, axis=0))] + self.params = [self.log(np.nanmin(-x, axis=0)), + self.log(np.nanmax(-x, axis=0))] elif method == 'max_abs': self.params = [np.nanmax(np.abs(x), axis=0)] elif method == 'log_max_abs': - self.params = [np.nanmax(np.abs(np.log10(x)), axis=0)] + self.params = [np.nanmax(np.abs(self.log(x)), axis=0)] elif method == 'standardize': self.params = [np.nanmean(x, axis=0), np.nanstd(x, axis=0)] elif method == 'log_standardize': # log will be computed in transform again - self.params = [np.nanmean(np.log10(x), axis=0), np.nanstd(np.log10(x), axis=0)] + self.params = [np.nanmean(self.log(x), axis=0), np.nanstd(self.log(x), axis=0)] elif method == 'neg_log_standardize': # log(-x) - self.params = [np.nanmean(np.log10(-x), axis=0), np.nanstd(np.log10(-x), axis=0)] + self.params = [np.nanmean(self.log(-x), axis=0), np.nanstd(self.log(-x), axis=0)] elif method == 'log': self.params = [] elif method == 'none': # no transformation @@ -137,26 +143,26 @@ def transform(self, x): x_t = ((x - self.params[0]) / (self.params[1] - self.params[0]) * (self.upper - self.lower) + self.lower) elif self.method == 'log_min_max': - x_t = ((np.log10(x) - self.params[0]) + x_t = ((self.log(x) - self.params[0]) / (self.params[1] - self.params[0]) * (self.upper - self.lower) + self.lower) elif self.method == 'neg_log_min_max': - x_t = ((np.log10(-x) - self.params[0]) + x_t = ((self.log(-x) - self.params[0]) / (self.params[1] - self.params[0]) * (self.upper - self.lower) + self.lower) elif self.method == 'max_abs': x_t = x / self.params[0] elif self.method == 'log_max_abs': - x_t = np.log10(x) / self.params[0] + x_t = self.log(x) / self.params[0] elif self.method == 'standardize': x_t = (x - self.params[0]) / self.params[1] elif self.method == 'log_standardize': # log will be computed in transform again - x_t = (np.log10(x) - self.params[0]) / self.params[1] + x_t = (self.log(x) - self.params[0]) / self.params[1] elif self.method == 'neg_log_standardize': - x_t = (np.log10(-x) - self.params[0]) / self.params[1] + x_t = (self.log(-x) - self.params[0]) / self.params[1] elif self.method == 'log': - x_t = np.log10(x) + x_t = self.log(x) else: # no transformation x_t = x @@ -214,24 +220,38 @@ def inv_transform(self, x_t): / (self.upper - self.lower) * (self.params[1] - self.params[0]) + self.params[0]) elif self.method == 'log_min_max': - x = 10 ** ((x_t - self.lower) / (self.upper - self.lower) + x = self.unlog((x_t - self.lower) / (self.upper - self.lower) * (self.params[1] - self.params[0]) + self.params[0]) elif self.method == 'neg_log_min_max': - x = -10 ** ((x_t - self.lower) / (self.upper - self.lower) + x = -self.unlog((x_t - self.lower) / (self.upper - self.lower) * (self.params[1] - self.params[0]) + self.params[0]) elif self.method == 'max_abs': x = x_t * self.params[0] elif self.method == 'log_max_abs': - x = 10 ** (x_t * self.params[0]) + x = self.unlog(x_t * self.params[0]) elif self.method == 'standarize': x = x_t * self.params[1] + self.params[0] elif self.method == 'log_standarize': - x = 10 ** (x_t * self.params[1] + self.params[0]) + x = self.unlog(x_t * self.params[1] + self.params[0]) elif self.method == 'neg_log_standarize': - x = -10 ** (x_t * self.params[1] + self.params[0]) + x = -self.unlog(x_t * self.params[1] + self.params[0]) elif self.method == 'log': - x = 10 ** x_t + x = self.unlog(x_t) else: # no transformation x = x_t return x + + def log(self, x): + logged = None + try: + logged = np.log10(x + eps) + except RuntimeWarning: + print(self.method) + print(x, np.isinf(x).any(), np.isnan(x).any(), (x < 0).any(), np.nanmin(x)) + # sys.exit() + + return logged + + def unlog(self, x): + return (10 ** x) - eps diff --git a/posydon/interpolation/new_interpolator.py b/posydon/interpolation/new_interpolator.py index e7b0b34fe6..1e5db82275 100644 --- a/posydon/interpolation/new_interpolator.py +++ b/posydon/interpolation/new_interpolator.py @@ -25,303 +25,212 @@ eps = 1.0e-16 -# INITIAL-FINAL INTERPOLATOR -class IFInterpolator: - """Class handling initial-final interpolation. - - Class handling initial-final interpolation with support to interpolate - certain keys by differing classes. - """ - - def __init__(self, grids = None, interpolators = None): - """Initialize the IFInterpolator class. - - Parameters - ---------- - grid : PSyGrid - The training grid or tuple of PSyGrid (training, validation) - interpolators : list of dictionaries - Contain parameters for the BaseIFIneterpolators to be constructed - - """ - self.interpolators = [] - self.interpolator_parameters = interpolators - self.grids = grids - - if self.interpolator_parameters is not None: - out_keys = [] - - for params in self.interpolator_parameters: - - if ("out_keys" not in params - and len(self.interpolator_parameters) > 1): - raise ValueError("Overlapping out keys between different " - "interpolators are not permited!") - elif "out_keys" not in params: - continue - - for key in params["out_keys"]: - if(key in out_keys): - raise ValueError( - f"Overlapping out keys between different " - f"interpolators are not permited! ({key} in more " - f"than one set of out_keys)") - else: - out_keys.append(key) - - - def train(self): - """Train the interpolator(s) on the PSyGrid used for construction.""" - for interpolator in self.interpolator_parameters: - interp = BaseIFInterpolator(grids=self.grids, - **interpolator) - interp.train() - self.interpolators.append(interp) - - self.interp_in_q = self.interpolators[0].interp_in_q - - def evaluate(self, binary, sanitization_verbose=False): - """Get the output vector approximation. - - Get the output vector approximation as well as all of the - classifications given a Binary Star - - Parameters - ---------- - binary : BinaryStar - A class which is an input vector which are to be classified and for - which an output vector will be approximated. - - Returns - ------- - Output space approximation as a tuple containing two dictionary - - """ - ynums = [] - ycats = [] - n = [] - - for interpolator in self.interpolators: - ynum, ycat, ns = interpolator.evaluate(binary) - - ynums.extend(ynum) - ycats.extend(ycat) - n.extend(ns) - - return ynums, ycats, n - - - def test_interpolator(self, initial_values, sanitization_verbose = False): - """ Method that can take in a 2-D numpy array for more efficient use of the interpolator - Parameters - ---------- - initial_values : numpy array - A numpy array containing the in-key values of the binaries to be evolved - - Return - ------ - Interpolated values - """ - new_values = np.array(initial_values, copy = True) - - if self.interp_in_q: - new_values.T[1] = new_values.T[1] / new_values.T[0] - - final_values = [] - - for interpolator in self.interpolators: - final_values.append(interpolator.test_interpolator(new_values).T) +class IFInterpolator: - return np.concatenate(final_values).T + def __init__(self, grids, in_keys, out_keys, max_k): - def test_classifiers(self, initial_values): - """ Method that can take in a 2-D numpy array for more efficient use of classifiers + if type(grids) != list: + sys.exit("Please provide a list of PSyGrids containing both a training and validation grid to train the interpolator") + else: - Parameters - ---------- - initial_values : numpy array - A numpy array containing the in-key values of the binaries to be classified + self.in_keys = in_keys - Return - ------ - Classified values - """ - new_values = np.array(initial_values, copy = True) + self.out_key_dict = out_keys + self.continuous_out_keys = sum(list(out_keys.values()), []) # keys to be interpolated which correspond to numerical quantities + self.discrete_out_keys = list(out_keys.keys()) # keys to be interpolated which correspond to discrete quantities + self.constraints = find_constraints_to_apply(self.continuous_out_keys) - if self.interp_in_q: - new_values.T[1] = new_values.T[1] / new_values.T[0] + # ============= checks ============= + if "interpolation_class" not in self.discrete_out_keys: + sys.exit("The key \"interpolation_class\" needs to be provided as one of the interpolation keys") - classes = {} - - for interpolator in self.interpolators: + self.max_k = max_k - classes = {**classes, **interpolator.test_classifiers(new_values)} + self.training_grid = self.preprocess_grid(grids[0], training_grid = True) + self.validation_grid = self.preprocess_grid(grids[1]) - return classes + self.triangulate(self.training_grid) + # =============== usage statistics variables ============ + self.outside_convex_hull = dict(zip(self.discrete_out_keys, [0] * len(self.discrete_out_keys))) + self.inside_convex_hull = dict(zip(self.discrete_out_keys, [0] * len(self.discrete_out_keys))) + def stats(self, _print = False): + percentages = [] + + for key in self.discrete_out_keys: + percentages.append( + self.outside_convex_hull[key] / (self.outside_convex_hull[key] + self.inside_convex_hull[key]) + ) + if _print: + print(f"Total of {sum(pecentages) / len(percentages):.2f} outside of hul") + return dict(zip(self.discrete_out_keys, percentages)) - def load(self, filename): - """Load a saved IFInterpolator from a .pkl file. + def train(self): + self.classifiers = dict( + zip( + self.discrete_out_keys, + [self.find_hyperparameters(key) for key in self.discrete_out_keys] + ) + ) + # self.out_scalers = dict( + # zip( + # self.discrete_out_keys, + # [self.optimize_normalization(key) for key in self.discrete_out_keys] + # ) + # ) + self.training = False + + def interpolate(self, iv, klass): - Parameters - ---------- - filename : string - Path to the .pkl file + interpolated = [] + ics = {} + weights = {} - """ - with open(filename, 'rb') as f: - self.interpolators = pickle.load(f) + for key, c in zip(self.discrete_out_keys, klass): - self.interp_in_q = self.interpolators[0].interp_in_q + triangulation = self.training_grid["triangulations"][key][c] - def save(self, filename): - """Save the interpolator to a .pkl file. + simplex = triangulation.find_simplex(iv) - Parameters - ---------- - filename : string - Path where .pkl file will be saved + if simplex == -1: + interpolated.extend( + self.get_nearest_neighbor(iv, key) + ) + self.outside_convex_hull[key] += 1 + continue + else: + self.inside_convex_hull[key] += 1 - """ - with open(filename, "wb") as f: - pickle.dump(self.interpolators, f) + vertices = triangulation.simplices[simplex] + ics[key] = triangulation.points[vertices] -class BaseIFInterpolator: + class_inds = self.training_grid["class_inds"][key][c] - def __init__(self, grids, in_keys, out_keys, interpolation_class, max_k): + final_values = np.array(self.training_grid["final_values"][key][class_inds][vertices].tolist()) - if type(grids) != list: - sys.exit("Please provide a list of PSyGrids containing both a training and validation grid to train the interpolator") - else: + # if self.training: + # final_values = self.normalize_output(final_values, c) - self.in_keys = in_keys + barycentric_weights = self.compute_barycentric_coordinates(iv, triangulation.points[vertices])[..., np.newaxis] + weights[key] = barycentric_weights - self.continous_out_keys = out_keys # keys to be interpolated which correspond to numerical quantities - self.discrete_out_keys = out_keys # keys to be interpolated which correspond to discrete quantities + # denormalized = self.denormalize_output( + # np.sum(final_values * barycentric_weights, axis = 0), + # klass + # ) - self.interpolation_class = interpolation_class - self.max_k = max_k + interpolated.extend(np.sum(final_values * barycentric_weights, axis = 0)) - self.training_grid = self.preprocess_grid(grids[0], training_grid = True) - self.validation_grid = self.preprocess_grid(grids[1]) + meta_data = { + "weights": weights, + "ics": ics, + "ic": iv, + "interpolated": interpolated + } - self.triangulate(self.training_grid) - - def train(self): - self.find_hyperparameters() - # self.optimize_normalization() + return interpolated, meta_data def evaluate(self, initial_values): - if self.classifier is None: + if self.classifiers is None: sys.exit("Please find classifier hyperparameters before using interpolator") + + interpolation_class_ind = self.discrete_out_keys.index("interpolation_class") if not self.training: initial_values = ( np.log10(initial_values + eps) - self.iv_min ) / (self.iv_max - self.iv_min + eps) - classes = self.classifier.predict(initial_values) + classes = np.array([clasifier["classifier"].predict(initial_values) for clasifier in self.classifiers.values()]).T interpolated_values = [] n = [] for iv, klass in zip(initial_values, classes): - if klass == "initial_MT": + if klass[interpolation_class_ind] == "initial_MT": continue - triangulation = self.training_grid["triangulations"][klass] - - simplex = triangulation.find_simplex(iv) - - if simplex == -1: - interpolated_values.append( - self.get_nearest_neighbor(iv) - ) - continue - - vertices = triangulation.simplices[simplex] - - class_inds = self.training_grid["class_inds"][klass] - - final_values = self.training_grid["final_values"][class_inds][vertices] - - if self.training: - final_values = self.normalize_output(final_values, klass) - - barycentric_weights = self.compute_barycentric_coordinates(iv, triangulation.points[vertices])[..., np.newaxis] - - interpolated = np.sum(final_values * barycentric_weights, axis = 0) - - n.append({ - "weights": barycentric_weights, - "ics": triangulation.points[vertices], - "ic": iv, - "final_values": final_values, - "interpolated": interpolated - }) + interpolated, meta_data = self.interpolate(iv, klass) + interpolated = self.apply_continuous_constraints(interpolated) - # interpolated = self.denormalize_output(interpolated, klass) interpolated_values.append(interpolated) + n.append(meta_data) interpolated_values = np.array(interpolated_values) classes = np.array(classes) - + print("interp_values", interpolated_values.shape) return interpolated_values, classes, n - def find_hyperparameters(self): + def find_hyperparameters(self, klass): k_accuracies = [] for k in range(1, self.max_k): validation_classifier = KNeighborsClassifier(n_neighbors = k, weights = "distance") - validation_classifier.fit(self.training_grid["initial_values"], self.training_grid["classes"]) + validation_classifier.fit( + self.training_grid["initial_values"], + self.training_grid["final_classes"][klass] + ) predicted_classes = validation_classifier.predict(self.validation_grid["initial_values"]) k_accuracies.append( - balanced_accuracy_score(predicted_classes, self.validation_grid["classes"]) + balanced_accuracy_score( + predicted_classes, + self.validation_grid["final_classes"][klass] + ) ) self.k_accuracies = np.array(k_accuracies) - self.k_star = self.k_accuracies.argmax() + k_star = self.k_accuracies.argmax() + + classifier = KNeighborsClassifier(n_neighbors = k_star, weights = "distance") + classifier.fit( + self.training_grid["initial_values"], + self.training_grid["final_classes"][klass] + ) - self.classifier = KNeighborsClassifier(n_neighbors = self.k_star, weights = "distance") - self.classifier.fit(self.training_grid["initial_values"], self.training_grid["classes"]) + return {"classifier": classifier, "k_star": k_star, "k_accuracies": k_accuracies} - def optimize_normalization(self): + def optimize_normalization(self, key): - output_dim = len(self.continous_out_keys) + output_dim = len(self.out_key_dict[key]) + classes = np.unique(self.training_grid["final_classes"][key]) all_scalers = [] - scaler_accuracies = np.zeros(shape = (len(SCALING_OPTIONS), len(self.classes), output_dim)) + scaler_accuracies = np.zeros(shape = (len(SCALING_OPTIONS), len(classes), output_dim)) self.training = True + self.scaler_types = [] for i, option in enumerate(SCALING_OPTIONS): - scalers = [[DataScaler()] * output_dim] * len(self.classes) + scalers = [[DataScaler() for _ in range(output_dim)] for _ in range(len(classes))] - for j, klass in enumerate(self.classes): + for j, klass in enumerate(classes): for parameter in range(output_dim): - param_min = np.nanmin(self.training_grid["final_values"][:, parameter]) + param_min = np.nanmin(self.training_grid["final_values"][keys][:, parameter]) + if param_min < 0 and "log" in option: - scalers[j][parameter].fit(self.training_grid["final_values"][:, parameter], "none") + scalers[j][parameter].fit(self.training_grid["final_values"][keys][:, parameter], "none") else: - scalers[j][parameter].fit(self.training_grid["final_values"][:, parameter], option) - - self.out_scalers = scalers + scalers[j][parameter].fit(self.training_grid["final_values"][keys][:, parameter], option) + + self.out_scalers = scalers + + for j, klass in enumerate(self.classes): class_mask = self.validation_grid["class_inds"][klass] class_ivs = self.validation_grid["initial_values"][class_mask] - + interpolated_values, predicted_classes, _ = self.evaluate(class_ivs) predicted_class_mask = np.where(predicted_classes != "initial_MT") # taking out mispredictions of initial MT relative_errors = np.abs( - (interpolated_values - self.validation_grid["final_values"][class_mask][predicted_class_mask]) - / (self.validation_grid["final_values"][class_mask][predicted_class_mask] + eps) + (interpolated_values - self.validation_grid["final_values"][keys][class_mask][predicted_class_mask]) + / (self.validation_grid["final_values"][keys][class_mask][predicted_class_mask] + eps) ).mean(axis = 0) scaler_accuracies[i][j] = relative_errors @@ -332,22 +241,24 @@ def optimize_normalization(self): star_scalers = [] star_scaler_inds = scaler_accuracies.argmax(axis = 0) + self.scaler_accuracies = scaler_accuracies - for c in range(len(self.classes)): + for c in range(len(classes)): star_scalers.append( - all_scalers[:, c][star_scaler_inds[c], np.arange(len(self.continous_out_keys))] + all_scalers[:, c][star_scaler_inds[c], np.arange(len(keys))] ) self.out_scalers = np.array(star_scalers, dtype = object) self.normalize_triangulations() self.training = False + return np.array(star_scalers, dtype = object) # =================== helper methods below =========================== def preprocess_grid(self, grid, training_grid = False): - final_values = np.array(grid.final_values[self.continous_out_keys].tolist()) + final_values = np.array(grid.final_values[self.continuous_out_keys].tolist()) valid_inds = np.where( (grid.final_values["interpolation_class"] != "not_converged") & @@ -371,31 +282,39 @@ def preprocess_grid(self, grid, training_grid = False): self.iv_min = initial_values.min(axis = 0, keepdims = True) self.iv_max = initial_values.max(axis = 0, keepdims = True) - class_labels = np.unique(grid.final_values[valid_inds][self.interpolation_class]) - class_inds = dict(zip( - class_labels, - [np.where(grid.final_values[valid_inds][self.interpolation_class] == label)[0] for label in class_labels] - )) + class_inds = {} + + for key in self.discrete_out_keys: + class_labels = np.unique(grid.final_values[valid_inds][key]) + class_inds[key] = dict(zip( + class_labels, + [np.where(grid.final_values[valid_inds][key] == label)[0] for label in class_labels] + )) return { "initial_values": (initial_values - self.iv_min) / (self.iv_max - self.iv_min + eps), - "final_values": np.array(grid.final_values[self.continous_out_keys][valid_inds].tolist()), - "final_classes": grid.final_values[valid_inds][self.discrete_out_keys], - "classes": grid.final_values[valid_inds][self.interpolation_class], + "final_values": dict(zip(self.out_key_dict.keys(), [grid.final_values[valid_inds][keys] for keys in self.out_key_dict.values()])), # np.array(grid.final_values[self.continuous_out_keys][valid_inds].tolist()), + "final_classes": dict(zip(self.discrete_out_keys, np.array(grid.final_values[self.discrete_out_keys][valid_inds].tolist()).T)), "class_inds": class_inds, } def triangulate(self, grid_dict): - self.classes = np.unique(grid_dict["classes"]).tolist() - self.classes.remove("initial_MT") - triangulations = {} - for klass in self.classes: + for label_name in self.discrete_out_keys: + classes = np.unique(grid_dict["final_classes"][label_name]).tolist() + if "initial_MT" in classes: + classes.remove("initial_MT") - class_inds = grid_dict["class_inds"][klass] - triangulations[klass] = Delaunay(grid_dict["initial_values"][class_inds]) + class_triangulations = {} + + for klass in classes: + + class_inds = grid_dict["class_inds"][label_name][klass] + class_triangulations[klass] = Delaunay(grid_dict["initial_values"][class_inds]) + + triangulations[label_name] = class_triangulations grid_dict["triangulations"] = triangulations @@ -419,12 +338,20 @@ def compute_barycentric_coordinates(self, point, coords): return weights - def get_nearest_neighbor(self, iv): + def get_nearest_neighbor(self, iv, key): dists = np.sqrt(np.square(self.training_grid["initial_values"] - iv).sum(axis = 1)) sorted_inds = dists.argsort() - return self.training_grid["final_values"][sorted_inds[0]] + return np.array(self.training_grid["final_values"][key][sorted_inds[0]].tolist()) + + def apply_continuous_constraints(self, interpolated): + sanitized = sanitize_interpolated_quantities( + dict(zip(self.continuous_out_keys, interpolated)), + self.constraints, verbose=False + ) + + return np.array([sanitized[key] for key in self.continuous_out_keys]) def normalize_output(self, input, klass): class_ind = self.classes.index(klass) @@ -449,14 +376,14 @@ def denormalize_output(self, input, klass): return ret_value - def normalize_triangulations(self): + def normalize_triangulations(self, out_keys): - new_final_values = np.zeros_like(self.training_grid["final_values"]) + new_final_values = np.zeros_like(self.training_grid["final_values"][out_keys]) - for i, (klass, fv) in enumerate(zip(self.training_grid["classes"], self.training_grid["final_values"])): + for i, (klass, fv) in enumerate(zip(self.training_grid["classes"], self.training_grid["final_values"][out_keys])): if klass == "initial_MT": # should be taken out in preprocessing continue new_final_values[i] = self.normalize_output(np.array([fv]), klass) - self.training_grid["final_values"] = new_final_values + self.training_grid["final_values"][out_keys] = new_final_values \ No newline at end of file From e8082e6ca0af8cc020efa25de68a81332f12c70d Mon Sep 17 00:00:00 2001 From: prinse1545 Date: Thu, 15 Jan 2026 09:26:35 -0600 Subject: [PATCH 3/3] mainly syncing / backing up, but rewrote much of the normalization logic --- posydon/interpolation/IF_interpolation.py | 10 +- posydon/interpolation/new_interpolator.py | 274 +++++++++++++++------- posydon/interpolation/preprocessing.py | 83 +++++++ 3 files changed, 278 insertions(+), 89 deletions(-) create mode 100644 posydon/interpolation/preprocessing.py diff --git a/posydon/interpolation/IF_interpolation.py b/posydon/interpolation/IF_interpolation.py index 29936402d8..786364fd74 100644 --- a/posydon/interpolation/IF_interpolation.py +++ b/posydon/interpolation/IF_interpolation.py @@ -196,6 +196,7 @@ class relies on the BaseIFInterpolator class to perform the interpolation from posydon.interpolation.constraints import ( find_constraints_to_apply, sanitize_interpolated_quantities) +import time # INITIAL-FINAL INTERPOLATOR class IFInterpolator: @@ -269,13 +270,16 @@ def evaluate(self, binary, sanitization_verbose=False): """ ynums = {} ycats = {} - + # s = time.time() for interpolator in self.interpolators: ynum, ycat = interpolator.evaluate(binary, sanitization_verbose) ynums = {**ynums, **ynum} ycats = {**ycats, **ycat} + # e = time.time() + # print(f"Iterated over {len(self.interpolators)} interpolators in {e - s}") + return ynums, ycats @@ -666,7 +670,11 @@ def test_interpolator(self, Xt): if isinstance(self.interp_method, list): Xtn = self.X_scaler.normalize(Xt, classes) + # s = time.time() Ypredn = self.interpolator.predict(Xtn, classes, self.X_scaler) + # e = time.time() + + # print(f"Predicted one interpolator value in {e - s} seconds") else: Xtn = self.X_scaler.normalize(Xt) Ypredn = self.interpolator.predict(Xtn) diff --git a/posydon/interpolation/new_interpolator.py b/posydon/interpolation/new_interpolator.py index 1e5db82275..6c8afbe9af 100644 --- a/posydon/interpolation/new_interpolator.py +++ b/posydon/interpolation/new_interpolator.py @@ -3,6 +3,11 @@ """ +__authors__ = [ + "Philipp Moura Srivastava ", +] + + import numpy as np import os import pickle @@ -12,6 +17,14 @@ # POSYDON from posydon.grids.psygrid import PSyGrid from posydon.interpolation.data_scaling import DataScaler, SCALING_OPTIONS +from posydon.interpolation.preprocessing import ( + normalize, + unnormalize, + find_normalization_evaluation_matrix, + compute_statistics, + IN_SCALING_OPTIONS, + OUT_SCALING_OPTIONS) + from posydon.utils.posydonwarning import Pwarn from posydon.interpolation.constraints import ( find_constraints_to_apply, sanitize_interpolated_quantities) @@ -22,6 +35,7 @@ from sklearn.metrics import balanced_accuracy_score import sys +import time eps = 1.0e-16 @@ -63,7 +77,7 @@ def stats(self, _print = False): self.outside_convex_hull[key] / (self.outside_convex_hull[key] + self.inside_convex_hull[key]) ) if _print: - print(f"Total of {sum(pecentages) / len(percentages):.2f} outside of hul") + print(f"Total of {sum(percentages) / len(percentages):.2f} outside of hull") return dict(zip(self.discrete_out_keys, percentages)) @@ -74,25 +88,30 @@ def train(self): [self.find_hyperparameters(key) for key in self.discrete_out_keys] ) ) - # self.out_scalers = dict( - # zip( - # self.discrete_out_keys, - # [self.optimize_normalization(key) for key in self.discrete_out_keys] - # ) - # ) - self.training = False + self.out_scalers = dict( + zip( + self.discrete_out_keys, + [self.optimize_normalization(key) for key in self.discrete_out_keys] + ) + ) - def interpolate(self, iv, klass): + + def interpolate(self, iv, klass, sn_model): interpolated = [] ics = {} weights = {} - for key, c in zip(self.discrete_out_keys, klass): + interpolation_class_ind = self.discrete_out_keys.index("interpolation_class") + sn_class_ind = self.discrete_out_keys.index(sn_model) + klass = [klass[interpolation_class_ind], klass[sn_class_ind]] + classification_schemes = ["interpolation_class", sn_model] + + for key, c in zip(classification_schemes, klass): triangulation = self.training_grid["triangulations"][key][c] - simplex = triangulation.find_simplex(iv) + simplex = triangulation.find_simplex(iv) if simplex == -1: interpolated.extend( @@ -109,19 +128,36 @@ def interpolate(self, iv, klass): class_inds = self.training_grid["class_inds"][key][c] final_values = np.array(self.training_grid["final_values"][key][class_inds][vertices].tolist()) - - # if self.training: - # final_values = self.normalize_output(final_values, c) - + print("Before", final_values) + if np.isnan(final_values).any(): + print("Do final values have NaNs?", np.isnan(final_values).any()) + # if False or self.training: + # if not np.isnan(final_values).any(): + # final_values = normalize(final_values, self._stats[0], self._stats[1], True) + print("After", final_values) barycentric_weights = self.compute_barycentric_coordinates(iv, triangulation.points[vertices])[..., np.newaxis] - weights[key] = barycentric_weights + if np.isnan(barycentric_weights).any(): + print("Do barycentric weights have NaNs?", np.isnan(barycentric_weights).any()) + weights[key] = barycentric_weights + print(final_values, barycentric_weights) + # if self.training: + # if not np.isnan(final_values).any(): + # final_values = unnormalize( + # np.sum(final_values * barycentric_weights, axis = 0), + # self._stats[0], self._stats[1], True + # ) + # else: + # final_values = np.sum(final_values * barycentric_weights, axis = 0) + + if np.isnan(final_values).any(): + print("Output has nans") # denormalized = self.denormalize_output( # np.sum(final_values * barycentric_weights, axis = 0), # klass # ) - - interpolated.extend(np.sum(final_values * barycentric_weights, axis = 0)) + interpolated.extend(final_values) + meta_data = { "weights": weights, @@ -132,127 +168,187 @@ def interpolate(self, iv, klass): return interpolated, meta_data - def evaluate(self, initial_values): + def evaluate(self, initial_values, sn_model = "S1_SN_MODEL_v2_01_SN_type"): if self.classifiers is None: sys.exit("Please find classifier hyperparameters before using interpolator") interpolation_class_ind = self.discrete_out_keys.index("interpolation_class") - if not self.training: - initial_values = ( np.log10(initial_values + eps) - self.iv_min ) / (self.iv_max - self.iv_min + eps) - - classes = np.array([clasifier["classifier"].predict(initial_values) for clasifier in self.classifiers.values()]).T + classes = np.array([ + cl["classifier"].predict(normalize(initial_values, cl["stats"][0], cl["stats"][1], cl["log"])) + for cl in self.classifiers.values()]).T interpolated_values = [] n = [] - + for iv, klass in zip(initial_values, classes): if klass[interpolation_class_ind] == "initial_MT": continue - interpolated, meta_data = self.interpolate(iv, klass) - interpolated = self.apply_continuous_constraints(interpolated) + interpolated, meta_data = self.interpolate(iv, klass, sn_model) + interpolated = self.apply_continuous_constraints(interpolated, sn_model) interpolated_values.append(interpolated) n.append(meta_data) interpolated_values = np.array(interpolated_values) classes = np.array(classes) - print("interp_values", interpolated_values.shape) + return interpolated_values, classes, n def find_hyperparameters(self, klass): - - k_accuracies = [] + input_matrix = [] + for k in range(1, self.max_k): + row = [] + for opt in IN_SCALING_OPTIONS: + row.append( + [k, opt] + ) + input_matrix.append(row) + + kwargs = { + "input_matrix": input_matrix, + "self": self, + "klass": klass + } + + def kwargs_fnc(**kwargs): + + kwargs = { + "self": kwargs["kwargs"]["self"], + "k": kwargs["item"][0], + "scaling": kwargs["item"][1] + } + + return kwargs + + def eval_fnc(self, k, scaling): validation_classifier = KNeighborsClassifier(n_neighbors = k, weights = "distance") + + training_initial_values = self.training_grid["initial_values"] + + stats = compute_statistics(training_initial_values, scaling) + training_initial_values = normalize( + training_initial_values, stats[0], stats[1], "log" in scaling) + validation_classifier.fit( - self.training_grid["initial_values"], + training_initial_values, self.training_grid["final_classes"][klass] ) - predicted_classes = validation_classifier.predict(self.validation_grid["initial_values"]) - - k_accuracies.append( - balanced_accuracy_score( - predicted_classes, - self.validation_grid["final_classes"][klass] - ) + validation_initial_values = self.validation_grid["initial_values"] + validation_initial_values = normalize( + validation_initial_values, stats[0], stats[1], "log" in scaling) + predicted_classes = validation_classifier.predict(validation_initial_values) + + bacc = balanced_accuracy_score( + self.validation_grid["final_classes"][klass], + predicted_classes ) - self.k_accuracies = np.array(k_accuracies) + return bacc, stats + + eval_matrix, stat_matrix = find_normalization_evaluation_matrix(eval_fnc, kwargs_fnc, kwargs) + + k_star = list(np.unravel_index(eval_matrix.argmax(), eval_matrix.shape)) + + classifier = KNeighborsClassifier(n_neighbors = k_star[0] + 1, weights = "distance") + + training_initial_values = self.training_grid["initial_values"] + + scaling = IN_SCALING_OPTIONS[k_star[1]] - k_star = self.k_accuracies.argmax() + stats = compute_statistics(training_initial_values, scaling) + training_initial_values = normalize( + training_initial_values, stats[0], stats[1], "log" in scaling) - classifier = KNeighborsClassifier(n_neighbors = k_star, weights = "distance") classifier.fit( - self.training_grid["initial_values"], + training_initial_values, self.training_grid["final_classes"][klass] ) - return {"classifier": classifier, "k_star": k_star, "k_accuracies": k_accuracies} + return { + "classifier": classifier, + "stats": stat_matrix[*k_star], + "log": "log" in IN_SCALING_OPTIONS[k_star[1]], + "k_star": k_star, + "eval_matrix": eval_matrix + } def optimize_normalization(self, key): - output_dim = len(self.out_key_dict[key]) - classes = np.unique(self.training_grid["final_classes"][key]) + input_matrix = [] - all_scalers = [] - scaler_accuracies = np.zeros(shape = (len(SCALING_OPTIONS), len(classes), output_dim)) + labels = np.unique(self.training_grid["final_classes"][key]) + labels = np.delete(labels, np.where(labels == "initial_MT")[0]) - self.training = True - self.scaler_types = [] + for label in labels: + row = [] + for opt in OUT_SCALING_OPTIONS: + row.append( + [label, opt] + ) + input_matrix.append(row) + + kwargs = { + "input_matrix": input_matrix, + "self": self, + "key": key + } - for i, option in enumerate(SCALING_OPTIONS): - scalers = [[DataScaler() for _ in range(output_dim)] for _ in range(len(classes))] + def kwargs_fnc(**kwargs): - for j, klass in enumerate(classes): - for parameter in range(output_dim): - param_min = np.nanmin(self.training_grid["final_values"][keys][:, parameter]) + kwargs = { + "self": kwargs["kwargs"]["self"], + "key": kwargs["kwargs"]["key"], + "klass": kwargs["item"][0], + "scaling": kwargs["item"][1] + } - if param_min < 0 and "log" in option: - scalers[j][parameter].fit(self.training_grid["final_values"][keys][:, parameter], "none") - else: - scalers[j][parameter].fit(self.training_grid["final_values"][keys][:, parameter], option) + return kwargs - self.out_scalers = scalers + def eval_fnc(self, key, klass, scaling): + self.training = True + self.scaling = scaling - for j, klass in enumerate(self.classes): + klass_inds = np.where(self.validation_grid["final_classes"][key] == klass)[0] - class_mask = self.validation_grid["class_inds"][klass] - class_ivs = self.validation_grid["initial_values"][class_mask] - - interpolated_values, predicted_classes, _ = self.evaluate(class_ivs) - predicted_class_mask = np.where(predicted_classes != "initial_MT") # taking out mispredictions of initial MT + training_final_values = self.training_grid["final_values"][key][klass_inds] + self._stats = compute_statistics(training_final_values, scaling) + print("Does input have NaNs?", np.isnan(self.validation_grid["initial_values"][klass_inds]).any()) + interpolated, classes, _ = self.evaluate(self.validation_grid["initial_values"][klass_inds]) - relative_errors = np.abs( - (interpolated_values - self.validation_grid["final_values"][keys][class_mask][predicted_class_mask]) - / (self.validation_grid["final_values"][keys][class_mask][predicted_class_mask] + eps) - ).mean(axis = 0) + classes = classes[np.where(classes[:, 0] != "initial_MT")[0]] + predicted_klass_inds = np.where((classes[:, 0] == klass) | (classes[:, 1] == klass))[0] - scaler_accuracies[i][j] = relative_errors + # needs to be fixed to include any arbitrary SN model but this will do for now + ground_truth = np.concatenate( + [self.validation_grid["final_values"]["interpolation_class"], self.validation_grid["final_values"]["S1_SN_MODEL_v2_01_SN_type"]], axis = 1 + ) - all_scalers.append(scalers) + errors = np.abs( + (interpolated[predicted_klass_inds] - ground_truth[klass_inds][predicted_klass_inds]) / + (ground_truth[klass_inds][predicted_klass_inds] + eps) + ) - all_scalers = np.array(all_scalers, dtype = object) + self.training = False - star_scalers = [] - star_scaler_inds = scaler_accuracies.argmax(axis = 0) - self.scaler_accuracies = scaler_accuracies + return errors.mean(), self._stats - for c in range(len(classes)): - star_scalers.append( - all_scalers[:, c][star_scaler_inds[c], np.arange(len(keys))] - ) + eval_matrix, stat_matrix = find_normalization_evaluation_matrix(eval_fnc, kwargs_fnc, kwargs) - self.out_scalers = np.array(star_scalers, dtype = object) - self.normalize_triangulations() + opt = list(np.unravel_index(eval_matrix.argmax(), eval_matrix.shape)) - self.training = False - return np.array(star_scalers, dtype = object) + return { + "stats": stat_matrix[opt], + "log": "log" in OUT_SCALING_OPTIONS[opt[1]], + "scaling": OUT_SCALING_OPTIONS[opt[1]], + "eval_matrix": eval_matrix + } # =================== helper methods below =========================== @@ -292,8 +388,8 @@ def preprocess_grid(self, grid, training_grid = False): )) return { - "initial_values": (initial_values - self.iv_min) / (self.iv_max - self.iv_min + eps), - "final_values": dict(zip(self.out_key_dict.keys(), [grid.final_values[valid_inds][keys] for keys in self.out_key_dict.values()])), # np.array(grid.final_values[self.continuous_out_keys][valid_inds].tolist()), + "initial_values": 10**initial_values, + "final_values": dict(zip(self.out_key_dict.keys(), [np.array(grid.final_values[valid_inds][keys].tolist()) for keys in self.out_key_dict.values()])), # np.array(grid.final_values[self.continuous_out_keys][valid_inds].tolist()), "final_classes": dict(zip(self.discrete_out_keys, np.array(grid.final_values[self.discrete_out_keys][valid_inds].tolist()).T)), "class_inds": class_inds, } @@ -345,13 +441,14 @@ def get_nearest_neighbor(self, iv, key): return np.array(self.training_grid["final_values"][key][sorted_inds[0]].tolist()) - def apply_continuous_constraints(self, interpolated): + def apply_continuous_constraints(self, interpolated, sn_model): + keys = self.out_key_dict["interpolation_class"] + self.out_key_dict[sn_model] + sanitized = sanitize_interpolated_quantities( - dict(zip(self.continuous_out_keys, interpolated)), + dict(zip(keys, interpolated)), self.constraints, verbose=False ) - - return np.array([sanitized[key] for key in self.continuous_out_keys]) + return np.array([sanitized[key] for key in keys]) def normalize_output(self, input, klass): class_ind = self.classes.index(klass) @@ -386,4 +483,5 @@ def normalize_triangulations(self, out_keys): new_final_values[i] = self.normalize_output(np.array([fv]), klass) self.training_grid["final_values"][out_keys] = new_final_values + \ No newline at end of file diff --git a/posydon/interpolation/preprocessing.py b/posydon/interpolation/preprocessing.py new file mode 100644 index 0000000000..8db1482718 --- /dev/null +++ b/posydon/interpolation/preprocessing.py @@ -0,0 +1,83 @@ +""" +Module implementing preprocessing for IF Interpolation +""" + +__authors__ = [ + "Philipp Moura Srivastava ", +] + +import numpy as np + +eps = 1.0e-16 + +IN_SCALING_OPTIONS = [ + "min-max", + "standard", + "log_min-max", + "log_standard" +] + +OUT_SCALING_OPTIONS = [ + # "log_min-max", + # "log_standard" + "min-max" +] + + +def normalize(data, shift, scale, log = False): + if log: + if np.isnan(data).any() or np.isinf(data).any(): + raise ValueError("nans or infs detected in crucial grid parameters") + + if not (data < 0).any(): + data = np.log10(data + eps) + + return (data - shift) / (scale + eps) + +def unnormalize(data, shift, scale, log = False): + data = (data * (scale + eps)) + shift + + if log: + data = 10**data - eps + + return data + +def compute_statistics(data, scaling): + + computations = [ + lambda data: [data.min(axis = 0), data.max(axis = 0) - data.min(axis = 0)], + lambda data: [data.mean(axis = 0), data.std(axis = 0)], + lambda data: [np.log10(data + eps).min(axis = 0), np.log10(data + eps).max(axis = 0) - np.log10(data + eps).min(axis = 0)], + lambda data: [np.log10(data + eps).mean(axis = 0), np.log10(data + eps).std(axis = 0)], + ] + compute = dict(zip(IN_SCALING_OPTIONS, computations)) # this line assumes that all other options are a subset of IN_SCALING_OPTION + return compute[scaling](data) + + +def find_normalization_evaluation_matrix(eval_fnc, kwarg_fnc, kwargs): + # eval_fnc - test_classifier?, what's changing + # kwarg_fnc - input to eval_fnc + # kwargs - inputs we iterate over + + normalization_eval_matrix = [] + normalization_stat_matrix = [] + + for row in kwargs["input_matrix"]: + eval_row = [] + stat_row = [] + + for col in row: + acc, stat = eval_fnc(**kwarg_fnc(**{"item": col, "kwargs": kwargs})) + + eval_row.append( + acc + ) + stat_row.append(stat) + + normalization_eval_matrix.append(eval_row) + normalization_stat_matrix.append(stat_row) + + normalization_eval_matrix = np.array(normalization_eval_matrix) + normalization_stat_matrix = np.array(normalization_stat_matrix) + + return normalization_eval_matrix, normalization_stat_matrix