From 860af88e62a7d87f82bc0db4b550d3d86b16189a Mon Sep 17 00:00:00 2001 From: JP Date: Thu, 1 Aug 2024 16:51:15 -0700 Subject: [PATCH 1/3] Update run_summary.py --- aurora/pipelines/run_summary.py | 36 ++++++++++++++++++++++++--------- 1 file changed, 27 insertions(+), 9 deletions(-) diff --git a/aurora/pipelines/run_summary.py b/aurora/pipelines/run_summary.py index 8963dfae..fc833c25 100644 --- a/aurora/pipelines/run_summary.py +++ b/aurora/pipelines/run_summary.py @@ -24,10 +24,10 @@ import pandas as pd -from mt_metadata.transfer_functions.processing.aurora.channel_nomenclature import ( +from mt_metadata.transfer_functions import ( ALLOWED_INPUT_CHANNELS, ) -from mt_metadata.transfer_functions.processing.aurora.channel_nomenclature import ( +from mt_metadata.transfer_functions import ( ALLOWED_OUTPUT_CHANNELS, ) import mth5 @@ -73,7 +73,13 @@ def __init__(self, **kwargs): self.column_dtypes = [str, str, pd.Timestamp, pd.Timestamp] self._input_dict = kwargs.get("input_dict", None) self.df = kwargs.get("df", None) - self._mini_summary_columns = ["survey", "station_id", "run_id", "start", "end"] + self._mini_summary_columns = [ + "survey", + "station_id", + "run_id", + "start", + "end", + ] def clone(self): """ @@ -120,7 +126,9 @@ def check_runs_are_valid(self, drop=False, **kwargs): run_obj = m.get_run(row.station_id, row.run_id, row.survey) runts = run_obj.to_runts() if runts.dataset.to_array().data.__abs__().sum() == 0: - logger.critical("CRITICAL: Detected a run with all zero values") + logger.critical( + "CRITICAL: Detected a run with all zero values" + ) self.df["valid"].at[i_row] = False # load each run, and take the median of the sum of the absolute values if drop: @@ -221,7 +229,9 @@ def channel_summary_to_run_summary( channel_scale_factors = n_station_runs * [None] i = 0 for group_values, group in grouper: - group_info = dict(zip(group_by_columns, group_values)) # handy for debug + group_info = dict( + zip(group_by_columns, group_values) + ) # handy for debug # for k, v in group_info.items(): # print(f"{k} = {v}") survey_ids[i] = group_info["survey"] @@ -232,9 +242,15 @@ def channel_summary_to_run_summary( sample_rates[i] = group.sample_rate.iloc[0] channels_list = group.component.to_list() num_channels = len(channels_list) - input_channels[i] = [x for x in channels_list if x in allowed_input_channels] - output_channels[i] = [x for x in channels_list if x in allowed_output_channels] - channel_scale_factors[i] = dict(zip(channels_list, num_channels * [1.0])) + input_channels[i] = [ + x for x in channels_list if x in allowed_input_channels + ] + output_channels[i] = [ + x for x in channels_list if x in allowed_output_channels + ] + channel_scale_factors[i] = dict( + zip(channels_list, num_channels * [1.0]) + ) i += 1 data_dict = {} @@ -286,7 +302,9 @@ def extract_run_summary_from_mth5(mth5_obj, summary_type="run"): return out_df -def extract_run_summaries_from_mth5s(mth5_list, summary_type="run", deduplicate=True): +def extract_run_summaries_from_mth5s( + mth5_list, summary_type="run", deduplicate=True +): """ ToDo: Move this method into mth5? or mth5_helpers? ToDo: Make this a class so that the __repr__ is a nice visual representation of the From bf85ce926900048379a68d04a86104f653027e9c Mon Sep 17 00:00:00 2001 From: JP Date: Fri, 17 Jan 2025 15:56:47 -0800 Subject: [PATCH 2/3] Update transfer_function_kernel.py --- aurora/pipelines/transfer_function_kernel.py | 75 ++++++++++++++------ 1 file changed, 52 insertions(+), 23 deletions(-) diff --git a/aurora/pipelines/transfer_function_kernel.py b/aurora/pipelines/transfer_function_kernel.py index 77d070d5..eeb3adb1 100644 --- a/aurora/pipelines/transfer_function_kernel.py +++ b/aurora/pipelines/transfer_function_kernel.py @@ -127,17 +127,15 @@ def update_dataset_df(self, i_dec_level): if not self.is_valid_dataset(row, i_dec_level): continue if row.fc: - row_ssr_str = ( - f"survey: {row.survey}, station: {row.station}, run: {row.run}" - ) + row_ssr_str = f"survey: {row.survey}, station: {row.station}, run: {row.run}" msg = f"FC already exists for {row_ssr_str} -- skipping decimation" logger.info(msg) continue run_xrds = row["run_dataarray"].to_dataset("channel") decimation = self.config.decimations[i_dec_level].decimation decimated_xrds = prototype_decimate(decimation, run_xrds) - self.dataset_df["run_dataarray"].at[i] = decimated_xrds.to_array( - "channel" + self.dataset_df["run_dataarray"].at[i] = ( + decimated_xrds.to_array("channel") ) # See Note 1 above logger.info( @@ -159,7 +157,9 @@ def apply_clock_zero(self, dec_level_config): The modified DecimationLevel with clock-zero information set. """ if dec_level_config.window.clock_zero_type == "data start": - dec_level_config.window.clock_zero = str(self.dataset_df.start.min()) + dec_level_config.window.clock_zero = str( + self.dataset_df.start.min() + ) return dec_level_config @property @@ -169,6 +169,7 @@ def all_fcs_already_exist(self) -> bool: self.check_if_fcs_already_exist() # these should all be booleans now + print(self.kernel_dataset.df["fc"]) assert not self.kernel_dataset.df["fc"].isna().any() return self.kernel_dataset.df.fc.all() @@ -243,11 +244,12 @@ def check_if_fcs_already_exist(self): msg += "Skip time series processing is OK" else: msg = f"Some, but not all fc_levels already exist = {self.dataset_df['fc']}" + logger.info(msg) + return True else: msg = "FC levels not present" - logger.info(msg) - - return + logger.info(msg) + return False def show_processing_summary( self, @@ -296,11 +298,15 @@ def make_processing_summary(self): decimation_info = self.config.decimation_info() for i_dec, dec_factor in decimation_info.items(): tmp[i_dec] = dec_factor - tmp = tmp.melt(id_vars=id_vars, value_name="dec_factor", var_name="dec_level") + tmp = tmp.melt( + id_vars=id_vars, value_name="dec_factor", var_name="dec_level" + ) sortby = ["survey", "station", "run", "start", "dec_level"] tmp.sort_values(by=sortby, inplace=True) tmp.reset_index(drop=True, inplace=True) - tmp.drop("sample_rate", axis=1, inplace=True) # not valid for decimated data + tmp.drop( + "sample_rate", axis=1, inplace=True + ) # not valid for decimated data # Add window info group_by = [ @@ -317,7 +323,9 @@ def make_processing_summary(self): cond = (df.dec_level.diff()[1:] == 1).all() assert cond # dec levels increment by 1 except AssertionError: - msg = f"Skipping {group} because decimation levels are messy." + msg = ( + f"Skipping {group} because decimation levels are messy." + ) logger.info(msg) continue assert df.dec_factor.iloc[0] == 1 @@ -382,7 +390,8 @@ def validate_decimation_scheme_and_dataset_compatability( for x in self.processing_config.decimations } min_stft_window_list = [ - min_stft_window_info[x] for x in self.processing_summary.dec_level + min_stft_window_info[x] + for x in self.processing_summary.dec_level ] min_num_stft_windows = pd.Series(min_stft_window_list) @@ -408,7 +417,9 @@ def validate_processing(self): self.config.drop_reference_channels() for decimation in self.config.decimations: if decimation.estimator.engine == "RME_RR": - logger.info("No RR station specified, switching RME_RR to RME") + logger.info( + "No RR station specified, switching RME_RR to RME" + ) decimation.estimator.engine = "RME" # Make sure that a local station is defined @@ -440,7 +451,9 @@ def valid_decimations(self): valid_levels = tmp.dec_level.unique() dec_levels = [x for x in self.config.decimations] - dec_levels = [x for x in dec_levels if x.decimation.level in valid_levels] + dec_levels = [ + x for x in dec_levels if x.decimation.level in valid_levels + ] msg = f"After validation there are {len(dec_levels)} valid decimation levels" logger.info(msg) return dec_levels @@ -458,7 +471,9 @@ def validate_save_fc_settings(self): # if dec_level_config.save_fcs: dec_level_config.save_fcs = False if self.config.stations.remote: - save_any_fcs = np.array([x.save_fcs for x in self.config.decimations]).any() + save_any_fcs = np.array( + [x.save_fcs for x in self.config.decimations] + ).any() if save_any_fcs: msg = "\n Saving FCs for remote reference processing is not supported" msg = f"{msg} \n - To save FCs, process as single station, then you can use the FCs for RR processing" @@ -574,13 +589,21 @@ def make_decimation_dict_for_tf(tf_collection, processing_config): decimation_dict = {} - for i_dec, dec_level_cfg in enumerate(processing_config.decimations): + for i_dec, dec_level_cfg in enumerate( + processing_config.decimations + ): for i_band, band in enumerate(dec_level_cfg.bands): period_key = f"{band.center_period:{PERIOD_FORMAT}}" period_value = {} - period_value["level"] = i_dec + 1 # +1 to match EMTF standard - period_value["bands"] = tuple(band.harmonic_indices[np.r_[0, -1]]) - period_value["sample_rate"] = dec_level_cfg.sample_rate_decimation + period_value["level"] = ( + i_dec + 1 + ) # +1 to match EMTF standard + period_value["bands"] = tuple( + band.harmonic_indices[np.r_[0, -1]] + ) + period_value["sample_rate"] = ( + dec_level_cfg.sample_rate_decimation + ) try: period_value["npts"] = tf_collection.tf_dict[ i_dec @@ -624,7 +647,9 @@ def make_decimation_dict_for_tf(tf_collection, processing_config): # Set key as first el't of dict, nor currently supporting mixed surveys in TF tf_cls.survey_metadata = self.dataset.local_survey_metadata - tf_cls.station_metadata.transfer_function.processing_type = self.processing_type + tf_cls.station_metadata.transfer_function.processing_type = ( + self.processing_type + ) # tf_cls.station_metadata.transfer_function.processing_config = ( # self.processing_config # ) @@ -655,7 +680,9 @@ def memory_check(self) -> None: num_samples = self.dataset_df.duration * self.dataset_df.sample_rate total_samples = num_samples.sum() total_bytes = total_samples * bytes_per_sample - logger.info(f"Total Bytes of Raw Data: {total_bytes / (1024 ** 3):.3f} GB") + logger.info( + f"Total Bytes of Raw Data: {total_bytes / (1024 ** 3):.3f} GB" + ) ram_fraction = 1.0 * total_bytes / total_memory logger.info(f"Raw Data will use: {100 * ram_fraction:.3f} % of memory") @@ -676,7 +703,9 @@ def memory_check(self) -> None: @path_or_mth5_object -def mth5_has_fcs(m, survey_id, station_id, run_id, remote, processing_config, **kwargs): +def mth5_has_fcs( + m, survey_id, station_id, run_id, remote, processing_config, **kwargs +): """ Checks if all needed fc-levels for survey-station-run are present under processing_config From fe2ad47b4cb747014ade1cc485f189d5d2877f8d Mon Sep 17 00:00:00 2001 From: JP Date: Fri, 17 Jan 2025 16:41:32 -0800 Subject: [PATCH 3/3] Update transfer_function_kernel.py --- aurora/pipelines/transfer_function_kernel.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/aurora/pipelines/transfer_function_kernel.py b/aurora/pipelines/transfer_function_kernel.py index eeb3adb1..f4254dcb 100644 --- a/aurora/pipelines/transfer_function_kernel.py +++ b/aurora/pipelines/transfer_function_kernel.py @@ -166,10 +166,13 @@ def apply_clock_zero(self, dec_level_config): def all_fcs_already_exist(self) -> bool: """Return true of all FCs needed to process data already exist in the mth5s""" if self.kernel_dataset.df["fc"].isna().any(): - self.check_if_fcs_already_exist() + has_fcs = self.check_if_fcs_already_exist() + if not has_fcs: + self.kernel_dataset.df["fc"] = self.kernel_dataset.df[ + "fc" + ].fillna(False) # these should all be booleans now - print(self.kernel_dataset.df["fc"]) assert not self.kernel_dataset.df["fc"].isna().any() return self.kernel_dataset.df.fc.all() @@ -242,10 +245,12 @@ def check_if_fcs_already_exist(self): if self.dataset_df["fc"].all(): msg = "All fc_levels already exist" msg += "Skip time series processing is OK" + logger.info(msg) + return True else: msg = f"Some, but not all fc_levels already exist = {self.dataset_df['fc']}" - logger.info(msg) - return True + logger.info(msg) + return False else: msg = "FC levels not present" logger.info(msg)