diff --git a/posydon/interpolation/IF_interpolation.py b/posydon/interpolation/IF_interpolation.py index 29936402d8..786364fd74 100644 --- a/posydon/interpolation/IF_interpolation.py +++ b/posydon/interpolation/IF_interpolation.py @@ -196,6 +196,7 @@ class relies on the BaseIFInterpolator class to perform the interpolation from posydon.interpolation.constraints import ( find_constraints_to_apply, sanitize_interpolated_quantities) +import time # INITIAL-FINAL INTERPOLATOR class IFInterpolator: @@ -269,13 +270,16 @@ def evaluate(self, binary, sanitization_verbose=False): """ ynums = {} ycats = {} - + # s = time.time() for interpolator in self.interpolators: ynum, ycat = interpolator.evaluate(binary, sanitization_verbose) ynums = {**ynums, **ynum} ycats = {**ycats, **ycat} + # e = time.time() + # print(f"Iterated over {len(self.interpolators)} interpolators in {e - s}") + return ynums, ycats @@ -666,7 +670,11 @@ def test_interpolator(self, Xt): if isinstance(self.interp_method, list): Xtn = self.X_scaler.normalize(Xt, classes) + # s = time.time() Ypredn = self.interpolator.predict(Xtn, classes, self.X_scaler) + # e = time.time() + + # print(f"Predicted one interpolator value in {e - s} seconds") else: Xtn = self.X_scaler.normalize(Xt) Ypredn = self.interpolator.predict(Xtn) diff --git a/posydon/interpolation/constraints.py b/posydon/interpolation/constraints.py index 70d875a222..d7fe62b4d9 100644 --- a/posydon/interpolation/constraints.py +++ b/posydon/interpolation/constraints.py @@ -50,6 +50,14 @@ from posydon.utils.common_functions import (stefan_boltzmann_law, orbital_separation_from_period) +CLASSIFICATION_KEYS = [ + "S<*>_state", + "mt_hist", + "S<*>_MOD_SN_type", + "S<*>_MOD_CO_type" +] + +N_MODELS = 11 # how many super nova models are there? # toggle this flag to enable/disable constraints (used for debugging) INTERPOLATION_CONSTRAINTS_ON = True @@ -511,3 +519,63 @@ def sanitize_interpolated_quantities(fvalues, constraints, verbose=False): constraint["constraint"]) return sanitized + + +def mt_constraint(classes): + + interpolation_class = classes["interpolation_class"] + + if interpolation_class == "initial_MT": + classes["mt_hist"] == "ini_RLO" + elif interpolation_class == "no_MT": + classes["mt_hist"] = "no_RLO" + elif interpolation_class == "stable_MT": + pass + elif interpolation_class == "unstable_MT": + pass + elif interpolation_class == "stable_reverse_MT": + pass + + +CLASS_CONSTRAINTS = { + "S<*>_state": None, + "mt_hist": mt_constraint, + "S<*>_MOD_SN_type": None, + "S<*>_MOD_CO_type": None +} + +def apply_class_constraint(key_name, classes): + + if key_name not in classes.keys(): + return + else: + CLASS_CONSTRAINTS[key_name](classes) + +def sanitize_classes(classes, ): + + assert(type(classes) == dict) + + if "interpolation_class" not in classes.keys(): + raise ValueError( + "Interpolation class must be present as a classified quantity to enforce classification constraints!" + ) + + for key in CLASSIFICATION_KEYS: + if "<*>" in key: + + for star in range(2): + key_name = key.replace("<*>", f"{star}") + + if "MOD" in key_name: + + for model in range(N_MODELS): + key_name = key_name.replace("", f"{model}") + + apply_class_constraint(key_name, classes) + + else: + apply_class_constraint(key_name, classes) + else: + + apply_class_constraint(key, classes) + \ No newline at end of file diff --git a/posydon/interpolation/data_scaling.py b/posydon/interpolation/data_scaling.py index 901e66f61c..84f27d92f1 100644 --- a/posydon/interpolation/data_scaling.py +++ b/posydon/interpolation/data_scaling.py @@ -7,7 +7,25 @@ import numpy as np - +import warnings +import sys + +# Convert UserWarning to an error +warnings.simplefilter("error", RuntimeWarning) + +eps = 1.0e-16 + +SCALING_OPTIONS = [ + "none", + "min_max", + "max_abs", + # "standardize", + "log_min_max", # has + # "neg_log_min_max", # has + "log_max_abs", # has + # "log_standardize", # has + # "neg_log_standardize" # has +] class DataScaler: """Data Normalization class. @@ -68,27 +86,28 @@ def fit(self, x, method='none', lower=-1.0, upper=1.0): if method == 'min_max': assert upper > lower, "upper must be greater than lower" self.lower, self.upper = lower, upper - self.params = [x.min(axis=0), x.max(axis=0)] + self.params = [np.nanmin(x, axis=0), np.nanmax(x, axis=0)] elif method == 'log_min_max': assert upper > lower, "upper must be greater than lower" self.lower, self.upper = lower, upper - self.params = [np.log10(x.min(axis=0)), np.log10(x.max(axis=0))] + self.params = [self.log(np.nanmin(x, axis=0)), self.log(np.nanmax(x, axis=0))] + elif method == 'neg_log_min_max': assert upper > lower, "upper must be greater than lower" self.lower, self.upper = lower, upper - self.params = [np.log10((-x).min(axis=0)), - np.log10((-x).max(axis=0))] + self.params = [self.log(np.nanmin(-x, axis=0)), + self.log(np.nanmax(-x, axis=0))] elif method == 'max_abs': - self.params = [np.abs(x).max(axis=0)] + self.params = [np.nanmax(np.abs(x), axis=0)] elif method == 'log_max_abs': - self.params = [np.abs(np.log10(x)).max(axis=0)] - elif method == 'standarize': - self.params = [x.mean(axis=0), x.std(axis=0)] - elif method == 'log_standarize': + self.params = [np.nanmax(np.abs(self.log(x)), axis=0)] + elif method == 'standardize': + self.params = [np.nanmean(x, axis=0), np.nanstd(x, axis=0)] + elif method == 'log_standardize': # log will be computed in transform again - self.params = [np.log10(x).mean(axis=0), np.log10(x).std(axis=0)] - elif method == 'neg_log_standarize': # log(-x) - self.params = [np.log10(-x).mean(axis=0), np.log10(-x).std(axis=0)] + self.params = [np.nanmean(self.log(x), axis=0), np.nanstd(self.log(x), axis=0)] + elif method == 'neg_log_standardize': # log(-x) + self.params = [np.nanmean(self.log(-x), axis=0), np.nanstd(self.log(-x), axis=0)] elif method == 'log': self.params = [] elif method == 'none': # no transformation @@ -124,26 +143,26 @@ def transform(self, x): x_t = ((x - self.params[0]) / (self.params[1] - self.params[0]) * (self.upper - self.lower) + self.lower) elif self.method == 'log_min_max': - x_t = ((np.log10(x) - self.params[0]) + x_t = ((self.log(x) - self.params[0]) / (self.params[1] - self.params[0]) * (self.upper - self.lower) + self.lower) elif self.method == 'neg_log_min_max': - x_t = ((np.log10(-x) - self.params[0]) + x_t = ((self.log(-x) - self.params[0]) / (self.params[1] - self.params[0]) * (self.upper - self.lower) + self.lower) elif self.method == 'max_abs': x_t = x / self.params[0] elif self.method == 'log_max_abs': - x_t = np.log10(x) / self.params[0] - elif self.method == 'standarize': + x_t = self.log(x) / self.params[0] + elif self.method == 'standardize': x_t = (x - self.params[0]) / self.params[1] - elif self.method == 'log_standarize': + elif self.method == 'log_standardize': # log will be computed in transform again - x_t = (np.log10(x) - self.params[0]) / self.params[1] - elif self.method == 'neg_log_standarize': - x_t = (np.log10(-x) - self.params[0]) / self.params[1] + x_t = (self.log(x) - self.params[0]) / self.params[1] + elif self.method == 'neg_log_standardize': + x_t = (self.log(-x) - self.params[0]) / self.params[1] elif self.method == 'log': - x_t = np.log10(x) + x_t = self.log(x) else: # no transformation x_t = x @@ -201,24 +220,38 @@ def inv_transform(self, x_t): / (self.upper - self.lower) * (self.params[1] - self.params[0]) + self.params[0]) elif self.method == 'log_min_max': - x = 10 ** ((x_t - self.lower) / (self.upper - self.lower) + x = self.unlog((x_t - self.lower) / (self.upper - self.lower) * (self.params[1] - self.params[0]) + self.params[0]) elif self.method == 'neg_log_min_max': - x = -10 ** ((x_t - self.lower) / (self.upper - self.lower) + x = -self.unlog((x_t - self.lower) / (self.upper - self.lower) * (self.params[1] - self.params[0]) + self.params[0]) elif self.method == 'max_abs': x = x_t * self.params[0] elif self.method == 'log_max_abs': - x = 10 ** (x_t * self.params[0]) + x = self.unlog(x_t * self.params[0]) elif self.method == 'standarize': x = x_t * self.params[1] + self.params[0] elif self.method == 'log_standarize': - x = 10 ** (x_t * self.params[1] + self.params[0]) + x = self.unlog(x_t * self.params[1] + self.params[0]) elif self.method == 'neg_log_standarize': - x = -10 ** (x_t * self.params[1] + self.params[0]) + x = -self.unlog(x_t * self.params[1] + self.params[0]) elif self.method == 'log': - x = 10 ** x_t + x = self.unlog(x_t) else: # no transformation x = x_t return x + + def log(self, x): + logged = None + try: + logged = np.log10(x + eps) + except RuntimeWarning: + print(self.method) + print(x, np.isinf(x).any(), np.isnan(x).any(), (x < 0).any(), np.nanmin(x)) + # sys.exit() + + return logged + + def unlog(self, x): + return (10 ** x) - eps diff --git a/posydon/interpolation/new_interpolator.py b/posydon/interpolation/new_interpolator.py new file mode 100644 index 0000000000..6c8afbe9af --- /dev/null +++ b/posydon/interpolation/new_interpolator.py @@ -0,0 +1,487 @@ +""" +Module implementing initial-final (IF) interpolation. + +""" + +__authors__ = [ + "Philipp Moura Srivastava ", +] + + +import numpy as np +import os +import pickle +from datetime import date + +from scipy.spatial import Delaunay +# POSYDON +from posydon.grids.psygrid import PSyGrid +from posydon.interpolation.data_scaling import DataScaler, SCALING_OPTIONS +from posydon.interpolation.preprocessing import ( + normalize, + unnormalize, + find_normalization_evaluation_matrix, + compute_statistics, + IN_SCALING_OPTIONS, + OUT_SCALING_OPTIONS) + +from posydon.utils.posydonwarning import Pwarn +from posydon.interpolation.constraints import ( + find_constraints_to_apply, sanitize_interpolated_quantities) + +# ML Imports +from sklearn.neighbors import KNeighborsClassifier +from sklearn.model_selection import train_test_split +from sklearn.metrics import balanced_accuracy_score + +import sys +import time + +eps = 1.0e-16 + + +class IFInterpolator: + + def __init__(self, grids, in_keys, out_keys, max_k): + + if type(grids) != list: + sys.exit("Please provide a list of PSyGrids containing both a training and validation grid to train the interpolator") + else: + + self.in_keys = in_keys + + self.out_key_dict = out_keys + self.continuous_out_keys = sum(list(out_keys.values()), []) # keys to be interpolated which correspond to numerical quantities + self.discrete_out_keys = list(out_keys.keys()) # keys to be interpolated which correspond to discrete quantities + self.constraints = find_constraints_to_apply(self.continuous_out_keys) + + # ============= checks ============= + if "interpolation_class" not in self.discrete_out_keys: + sys.exit("The key \"interpolation_class\" needs to be provided as one of the interpolation keys") + + self.max_k = max_k + + self.training_grid = self.preprocess_grid(grids[0], training_grid = True) + self.validation_grid = self.preprocess_grid(grids[1]) + + self.triangulate(self.training_grid) + # =============== usage statistics variables ============ + self.outside_convex_hull = dict(zip(self.discrete_out_keys, [0] * len(self.discrete_out_keys))) + self.inside_convex_hull = dict(zip(self.discrete_out_keys, [0] * len(self.discrete_out_keys))) + + def stats(self, _print = False): + percentages = [] + + for key in self.discrete_out_keys: + percentages.append( + self.outside_convex_hull[key] / (self.outside_convex_hull[key] + self.inside_convex_hull[key]) + ) + if _print: + print(f"Total of {sum(percentages) / len(percentages):.2f} outside of hull") + + return dict(zip(self.discrete_out_keys, percentages)) + + def train(self): + self.classifiers = dict( + zip( + self.discrete_out_keys, + [self.find_hyperparameters(key) for key in self.discrete_out_keys] + ) + ) + self.out_scalers = dict( + zip( + self.discrete_out_keys, + [self.optimize_normalization(key) for key in self.discrete_out_keys] + ) + ) + + + def interpolate(self, iv, klass, sn_model): + + interpolated = [] + ics = {} + weights = {} + + interpolation_class_ind = self.discrete_out_keys.index("interpolation_class") + sn_class_ind = self.discrete_out_keys.index(sn_model) + klass = [klass[interpolation_class_ind], klass[sn_class_ind]] + classification_schemes = ["interpolation_class", sn_model] + + for key, c in zip(classification_schemes, klass): + + triangulation = self.training_grid["triangulations"][key][c] + + simplex = triangulation.find_simplex(iv) + + if simplex == -1: + interpolated.extend( + self.get_nearest_neighbor(iv, key) + ) + self.outside_convex_hull[key] += 1 + continue + else: + self.inside_convex_hull[key] += 1 + + vertices = triangulation.simplices[simplex] + ics[key] = triangulation.points[vertices] + + class_inds = self.training_grid["class_inds"][key][c] + + final_values = np.array(self.training_grid["final_values"][key][class_inds][vertices].tolist()) + print("Before", final_values) + if np.isnan(final_values).any(): + print("Do final values have NaNs?", np.isnan(final_values).any()) + # if False or self.training: + # if not np.isnan(final_values).any(): + # final_values = normalize(final_values, self._stats[0], self._stats[1], True) + print("After", final_values) + barycentric_weights = self.compute_barycentric_coordinates(iv, triangulation.points[vertices])[..., np.newaxis] + if np.isnan(barycentric_weights).any(): + print("Do barycentric weights have NaNs?", np.isnan(barycentric_weights).any()) + + weights[key] = barycentric_weights + print(final_values, barycentric_weights) + # if self.training: + # if not np.isnan(final_values).any(): + # final_values = unnormalize( + # np.sum(final_values * barycentric_weights, axis = 0), + # self._stats[0], self._stats[1], True + # ) + # else: + # final_values = np.sum(final_values * barycentric_weights, axis = 0) + + if np.isnan(final_values).any(): + print("Output has nans") + # denormalized = self.denormalize_output( + # np.sum(final_values * barycentric_weights, axis = 0), + # klass + # ) + interpolated.extend(final_values) + + + meta_data = { + "weights": weights, + "ics": ics, + "ic": iv, + "interpolated": interpolated + } + + return interpolated, meta_data + + def evaluate(self, initial_values, sn_model = "S1_SN_MODEL_v2_01_SN_type"): + + if self.classifiers is None: + sys.exit("Please find classifier hyperparameters before using interpolator") + + interpolation_class_ind = self.discrete_out_keys.index("interpolation_class") + + classes = np.array([ + cl["classifier"].predict(normalize(initial_values, cl["stats"][0], cl["stats"][1], cl["log"])) + for cl in self.classifiers.values()]).T + + interpolated_values = [] + n = [] + + for iv, klass in zip(initial_values, classes): + if klass[interpolation_class_ind] == "initial_MT": + continue + + interpolated, meta_data = self.interpolate(iv, klass, sn_model) + interpolated = self.apply_continuous_constraints(interpolated, sn_model) + + interpolated_values.append(interpolated) + n.append(meta_data) + + interpolated_values = np.array(interpolated_values) + classes = np.array(classes) + + return interpolated_values, classes, n + + def find_hyperparameters(self, klass): + + input_matrix = [] + + for k in range(1, self.max_k): + row = [] + for opt in IN_SCALING_OPTIONS: + row.append( + [k, opt] + ) + input_matrix.append(row) + + kwargs = { + "input_matrix": input_matrix, + "self": self, + "klass": klass + } + + def kwargs_fnc(**kwargs): + + kwargs = { + "self": kwargs["kwargs"]["self"], + "k": kwargs["item"][0], + "scaling": kwargs["item"][1] + } + + return kwargs + + def eval_fnc(self, k, scaling): + + validation_classifier = KNeighborsClassifier(n_neighbors = k, weights = "distance") + + training_initial_values = self.training_grid["initial_values"] + + stats = compute_statistics(training_initial_values, scaling) + training_initial_values = normalize( + training_initial_values, stats[0], stats[1], "log" in scaling) + + validation_classifier.fit( + training_initial_values, + self.training_grid["final_classes"][klass] + ) + + validation_initial_values = self.validation_grid["initial_values"] + validation_initial_values = normalize( + validation_initial_values, stats[0], stats[1], "log" in scaling) + predicted_classes = validation_classifier.predict(validation_initial_values) + + bacc = balanced_accuracy_score( + self.validation_grid["final_classes"][klass], + predicted_classes + ) + + return bacc, stats + + eval_matrix, stat_matrix = find_normalization_evaluation_matrix(eval_fnc, kwargs_fnc, kwargs) + + k_star = list(np.unravel_index(eval_matrix.argmax(), eval_matrix.shape)) + + classifier = KNeighborsClassifier(n_neighbors = k_star[0] + 1, weights = "distance") + + training_initial_values = self.training_grid["initial_values"] + + scaling = IN_SCALING_OPTIONS[k_star[1]] + + stats = compute_statistics(training_initial_values, scaling) + training_initial_values = normalize( + training_initial_values, stats[0], stats[1], "log" in scaling) + + classifier.fit( + training_initial_values, + self.training_grid["final_classes"][klass] + ) + + return { + "classifier": classifier, + "stats": stat_matrix[*k_star], + "log": "log" in IN_SCALING_OPTIONS[k_star[1]], + "k_star": k_star, + "eval_matrix": eval_matrix + } + + def optimize_normalization(self, key): + + input_matrix = [] + + labels = np.unique(self.training_grid["final_classes"][key]) + labels = np.delete(labels, np.where(labels == "initial_MT")[0]) + + for label in labels: + row = [] + for opt in OUT_SCALING_OPTIONS: + row.append( + [label, opt] + ) + input_matrix.append(row) + + kwargs = { + "input_matrix": input_matrix, + "self": self, + "key": key + } + + def kwargs_fnc(**kwargs): + + kwargs = { + "self": kwargs["kwargs"]["self"], + "key": kwargs["kwargs"]["key"], + "klass": kwargs["item"][0], + "scaling": kwargs["item"][1] + } + + return kwargs + + def eval_fnc(self, key, klass, scaling): + self.training = True + self.scaling = scaling + + klass_inds = np.where(self.validation_grid["final_classes"][key] == klass)[0] + + training_final_values = self.training_grid["final_values"][key][klass_inds] + self._stats = compute_statistics(training_final_values, scaling) + print("Does input have NaNs?", np.isnan(self.validation_grid["initial_values"][klass_inds]).any()) + interpolated, classes, _ = self.evaluate(self.validation_grid["initial_values"][klass_inds]) + + classes = classes[np.where(classes[:, 0] != "initial_MT")[0]] + predicted_klass_inds = np.where((classes[:, 0] == klass) | (classes[:, 1] == klass))[0] + + # needs to be fixed to include any arbitrary SN model but this will do for now + ground_truth = np.concatenate( + [self.validation_grid["final_values"]["interpolation_class"], self.validation_grid["final_values"]["S1_SN_MODEL_v2_01_SN_type"]], axis = 1 + ) + + errors = np.abs( + (interpolated[predicted_klass_inds] - ground_truth[klass_inds][predicted_klass_inds]) / + (ground_truth[klass_inds][predicted_klass_inds] + eps) + ) + + self.training = False + + return errors.mean(), self._stats + + eval_matrix, stat_matrix = find_normalization_evaluation_matrix(eval_fnc, kwargs_fnc, kwargs) + + opt = list(np.unravel_index(eval_matrix.argmax(), eval_matrix.shape)) + + return { + "stats": stat_matrix[opt], + "log": "log" in OUT_SCALING_OPTIONS[opt[1]], + "scaling": OUT_SCALING_OPTIONS[opt[1]], + "eval_matrix": eval_matrix + } + + # =================== helper methods below =========================== + + def preprocess_grid(self, grid, training_grid = False): + + final_values = np.array(grid.final_values[self.continuous_out_keys].tolist()) + + valid_inds = np.where( + (grid.final_values["interpolation_class"] != "not_converged") & + (grid.final_values["interpolation_class"] != "ignored_no_RLO") & + (grid.final_values["interpolation_class"] != "ignored_no_binary_history") + )[0] + + initial_values = np.array(grid.initial_values[self.in_keys][valid_inds].tolist()) + # determining if should interp in q + if training_grid: + m1, m2 = 10**initial_values[:, 0], 10**initial_values[:, 1] + self.interp_in_q = (m2[m1 > 0.95 * m1.max()].min() / m2[m1 < 1.05 * m1.min()].min() > 2) + self.interp_in_q = False + + initial_values = np.log10(initial_values + eps) + + if self.interp_in_q: + initial_values[:, 1] = (10**initial_values[:, 1] - eps) / (10**initial_values[:, 0] - eps) + + if training_grid: + self.iv_min = initial_values.min(axis = 0, keepdims = True) + self.iv_max = initial_values.max(axis = 0, keepdims = True) + + class_inds = {} + + for key in self.discrete_out_keys: + class_labels = np.unique(grid.final_values[valid_inds][key]) + class_inds[key] = dict(zip( + class_labels, + [np.where(grid.final_values[valid_inds][key] == label)[0] for label in class_labels] + )) + + return { + "initial_values": 10**initial_values, + "final_values": dict(zip(self.out_key_dict.keys(), [np.array(grid.final_values[valid_inds][keys].tolist()) for keys in self.out_key_dict.values()])), # np.array(grid.final_values[self.continuous_out_keys][valid_inds].tolist()), + "final_classes": dict(zip(self.discrete_out_keys, np.array(grid.final_values[self.discrete_out_keys][valid_inds].tolist()).T)), + "class_inds": class_inds, + } + + def triangulate(self, grid_dict): + + triangulations = {} + + for label_name in self.discrete_out_keys: + classes = np.unique(grid_dict["final_classes"][label_name]).tolist() + if "initial_MT" in classes: + classes.remove("initial_MT") + + class_triangulations = {} + + for klass in classes: + + class_inds = grid_dict["class_inds"][label_name][klass] + class_triangulations[klass] = Delaunay(grid_dict["initial_values"][class_inds]) + + triangulations[label_name] = class_triangulations + + grid_dict["triangulations"] = triangulations + + def compute_barycentric_coordinates(self, point, coords): + + T = np.array([ + coords[0] - coords[3], + coords[1] - coords[3], + coords[2] - coords[3] + ]) # our matrix + T = T.T + T_I = np.linalg.inv(T) + + r_a = point - coords[3] + + weights = (T_I @ r_a).tolist() + + weights.append(1 - weights[0] - weights[1] - weights[2]) + + weights = np.array(weights) / sum(weights) + + return weights + + def get_nearest_neighbor(self, iv, key): + + dists = np.sqrt(np.square(self.training_grid["initial_values"] - iv).sum(axis = 1)) + sorted_inds = dists.argsort() + + return np.array(self.training_grid["final_values"][key][sorted_inds[0]].tolist()) + + def apply_continuous_constraints(self, interpolated, sn_model): + keys = self.out_key_dict["interpolation_class"] + self.out_key_dict[sn_model] + + sanitized = sanitize_interpolated_quantities( + dict(zip(keys, interpolated)), + self.constraints, verbose=False + ) + return np.array([sanitized[key] for key in keys]) + + def normalize_output(self, input, klass): + class_ind = self.classes.index(klass) + scalers = self.out_scalers[class_ind] + + ret_value = np.zeros_like(input) + + for dim, scaler in enumerate(scalers): + ret_value[:, dim] = scaler.transform(input[:, dim]) + + return ret_value + + + def denormalize_output(self, input, klass): + class_ind = self.classes.index(klass) + scalers = self.out_scalers[class_ind] + + ret_value = np.zeros_like(input) + + for dim, scaler in enumerate(scalers): + ret_value[dim] = scaler.inv_transform(input[dim]) + + return ret_value + + def normalize_triangulations(self, out_keys): + + new_final_values = np.zeros_like(self.training_grid["final_values"][out_keys]) + + for i, (klass, fv) in enumerate(zip(self.training_grid["classes"], self.training_grid["final_values"][out_keys])): + if klass == "initial_MT": # should be taken out in preprocessing + continue + new_final_values[i] = self.normalize_output(np.array([fv]), klass) + + self.training_grid["final_values"][out_keys] = new_final_values + + \ No newline at end of file diff --git a/posydon/interpolation/preprocessing.py b/posydon/interpolation/preprocessing.py new file mode 100644 index 0000000000..8db1482718 --- /dev/null +++ b/posydon/interpolation/preprocessing.py @@ -0,0 +1,83 @@ +""" +Module implementing preprocessing for IF Interpolation +""" + +__authors__ = [ + "Philipp Moura Srivastava ", +] + +import numpy as np + +eps = 1.0e-16 + +IN_SCALING_OPTIONS = [ + "min-max", + "standard", + "log_min-max", + "log_standard" +] + +OUT_SCALING_OPTIONS = [ + # "log_min-max", + # "log_standard" + "min-max" +] + + +def normalize(data, shift, scale, log = False): + if log: + if np.isnan(data).any() or np.isinf(data).any(): + raise ValueError("nans or infs detected in crucial grid parameters") + + if not (data < 0).any(): + data = np.log10(data + eps) + + return (data - shift) / (scale + eps) + +def unnormalize(data, shift, scale, log = False): + data = (data * (scale + eps)) + shift + + if log: + data = 10**data - eps + + return data + +def compute_statistics(data, scaling): + + computations = [ + lambda data: [data.min(axis = 0), data.max(axis = 0) - data.min(axis = 0)], + lambda data: [data.mean(axis = 0), data.std(axis = 0)], + lambda data: [np.log10(data + eps).min(axis = 0), np.log10(data + eps).max(axis = 0) - np.log10(data + eps).min(axis = 0)], + lambda data: [np.log10(data + eps).mean(axis = 0), np.log10(data + eps).std(axis = 0)], + ] + compute = dict(zip(IN_SCALING_OPTIONS, computations)) # this line assumes that all other options are a subset of IN_SCALING_OPTION + return compute[scaling](data) + + +def find_normalization_evaluation_matrix(eval_fnc, kwarg_fnc, kwargs): + # eval_fnc - test_classifier?, what's changing + # kwarg_fnc - input to eval_fnc + # kwargs - inputs we iterate over + + normalization_eval_matrix = [] + normalization_stat_matrix = [] + + for row in kwargs["input_matrix"]: + eval_row = [] + stat_row = [] + + for col in row: + acc, stat = eval_fnc(**kwarg_fnc(**{"item": col, "kwargs": kwargs})) + + eval_row.append( + acc + ) + stat_row.append(stat) + + normalization_eval_matrix.append(eval_row) + normalization_stat_matrix.append(stat_row) + + normalization_eval_matrix = np.array(normalization_eval_matrix) + normalization_stat_matrix = np.array(normalization_stat_matrix) + + return normalization_eval_matrix, normalization_stat_matrix diff --git a/posydon/unit_tests/interpolation/__init__.py b/posydon/unit_tests/interpolation/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/posydon/unit_tests/interpolation/test_IF_interpolation.py b/posydon/unit_tests/interpolation/test_IF_interpolation.py new file mode 100644 index 0000000000..72ed259132 --- /dev/null +++ b/posydon/unit_tests/interpolation/test_IF_interpolation.py @@ -0,0 +1,22 @@ +"""Unit tests of posydon/interpolation/IF_interpolation.py +""" + +__authors__ = [ + "Philipp Rajah de Moura Srivastava " +] + +# import the module which will be tested +from posydon.interpolation.new_interpolator import IFInterpolator + +# import other needed code for the tests, which is not already imported in the +# module you like to test + + +# define single test functions +def test_name(): + pass + +# define test classes collecting several test functions +class TestClass: + def test_name(self): + assert True