diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..7d1e390 --- /dev/null +++ b/.gitignore @@ -0,0 +1,97 @@ +*~ +.DS_Store +.ipynb_checkpoints +*. +*.egg-info +__pycache__ +*.pyc +*.so.dSYM +.idea/ + + +# Covers JetBrains IDEs: IntelliJ, GoLand, RubyMine, PhpStorm, AppCode, PyCharm, CLion, Android Studio, WebStorm and Rider +# Reference: https://intellij-support.jetbrains.com/hc/en-us/articles/206544839 + +# User-specific stuff +**/.idea/**/workspace.xml +**/.idea/**/tasks.xml +**/.idea/**/usage.statistics.xml +**/.idea/**/dictionaries +**/.idea/**/shelf + +# AWS User-specific +**/.idea/**/aws.xml + +# Generated files +**/.idea/**/contentModel.xml + +# Sensitive or high-churn files +**/.idea/**/dataSources/ +**/.idea/**/dataSources.ids +**/.idea/**/dataSources.local.xml +**/.idea/**/sqlDataSources.xml +**/.idea/**/dynamic.xml +**/.idea/**/uiDesigner.xml +**/.idea/**/dbnavigator.xml + +# Gradle +**/.idea/**/gradle.xml +**/.idea/**/libraries + +# Gradle and Maven with auto-import +# When using Gradle or Maven with auto-import, you should exclude module files, +# since they will be recreated, and may cause churn. Uncomment if using +# auto-import. +# .idea/artifacts +# .idea/compiler.xml +# .idea/jarRepositories.xml +# .idea/modules.xml +# .idea/*.iml +# .idea/modules +# *.iml +# *.ipr + +# CMake +cmake-build-*/ + +# Mongo Explorer plugin +**/.idea/**/mongoSettings.xml + +# File-based project format +*.iws + +# IntelliJ +out/ + +# mpeltonen/sbt-idea plugin +.idea_modules/ + +# JIRA plugin +atlassian-ide-plugin.xml + +# Cursive Clojure plugin +**/.idea/replstate.xml + +# SonarLint plugin +**/.idea/sonarlint/ +**/.idea/sonarlint.xml # see https://community.sonarsource.com/t/is-the-file-idea-idea-idea-sonarlint-xml-intended-to-be-under-source-control/121119 + +# Crashlytics plugin (for Android Studio and IntelliJ) +com_crashlytics_export_strings.xml +crashlytics.properties +crashlytics-build.properties +fabric.properties + +# Editor-based HTTP Client +**/.idea/httpRequests +http-client.private.env.json + +# Android studio 3.1+ serialized cache file +**/.idea/caches/build_file_checksums.ser + +# Apifox Helper cache +**/.idea/.cache/.Apifox_Helper +**/.idea/ApifoxUploaderProjectSetting.xml + +# Github Copilot persisted session migrations, see: https://github.com/microsoft/copilot-intellij-feedback/issues/712#issuecomment-3322062215 +**/.idea/**/copilot.data.migration.*.xml \ No newline at end of file diff --git a/eregion/configs/config.py b/eregion/configs/config.py index 9a12fed..86732a6 100644 --- a/eregion/configs/config.py +++ b/eregion/configs/config.py @@ -1,6 +1,6 @@ from abc import ABC, abstractmethod import yaml -import logging +from utils.misc_utils import configure_logger # A yaml constructor for slice objects def slice_constructor(loader, node): @@ -31,7 +31,7 @@ def __init__(self, config_input): Path to a YAML config file or config data as a string or dictionary. """ self.config = None - self.logger = logging.getLogger(__name__) + self.logger = configure_logger(self.__class__.__name__) if isinstance(config_input, str) and config_input.endswith(('.yaml', '.yml')): self.set_from_file(config_input) @@ -110,7 +110,8 @@ def validate_config(self): ### Pipeline Configuration Class ### class PipelineConfig(ConfigLoader): - required_keys = ['pipeline'] + required_keys = ['pipelines'] + required_pipeline_keys = ['name', 'lazy', 'nodes'] def __init__(self, config_input): """ @@ -124,3 +125,11 @@ def validate_config(self): for key in self.required_keys: if key not in self.config: raise ValueError(f"Missing required config key: {key}") + + for pipeline in self.config['pipelines']: + for key in self.required_pipeline_keys: + if key not in pipeline: + raise ValueError(f"Missing required pipeline key: {key} in pipeline {pipeline.get('name', 'unknown')}") + + if pipeline['lazy']: + assert 'source' in pipeline, f"Missing required key 'source' for lazy pipeline {pipeline.get('name', 'unknown')}" diff --git a/eregion/configs/detectors/deimos_singledet.yaml b/eregion/configs/detectors/deimos_singledet.yaml index 7ca03c9..8903d4d 100644 --- a/eregion/configs/detectors/deimos_singledet.yaml +++ b/eregion/configs/detectors/deimos_singledet.yaml @@ -20,10 +20,10 @@ objects: ext_id: 1 # FITS extension ID ext_slice: [!slice [0, 4125], !slice [0, 1094]] # Slice of the ext_id that has the data for this output data_slice: [!slice [0, 4125], !slice [0, 1094]] # Slice of the full DetImage data array where this output's data will go - serial_prescan: !slice [0, 0] - serial_overscan: !slice [1094, 1094] - parallel_prescan: !slice [0, 0] - parallel_overscan: !slice [4125, 4125] + serial_prescan: !slice [0, 50] + serial_overscan: !slice [1074, 1094] + parallel_prescan: !slice [0, 50] + parallel_overscan: !slice [4105, 4125] parallel_axis: 'y' # First axis in the data array (rows) represent parallel readout direction readout_pixel: [0, 0] # top left pixel gain: 1.0 # electrons/ADU @@ -33,10 +33,10 @@ objects: ext_id: 2 # FITS extension ID ext_slice: [!slice [0, 4125], !slice [0, 1094]] # Slice of the ext_id that has the data for this output data_slice: [!slice [0, 4125], !slice [1094, 2188]] # Slice of the full DetImage data array where this output's data will go - serial_prescan: !slice [0, 0] - serial_overscan: !slice [1094, 1094] - parallel_prescan: !slice [0, 0] - parallel_overscan: !slice [4125, 4125] + serial_prescan: !slice [1094, 1044] + serial_overscan: !slice [20, 0] + parallel_prescan: !slice [0, 50] + parallel_overscan: !slice [4105, 4125] parallel_axis: 'y' # First axis in the data array (rows) represent parallel readout direction readout_pixel: [0, 1094] # top right pixel gain: 1.0 # electrons/ADU diff --git a/eregion/configs/pipeline_flows/example.yaml b/eregion/configs/pipeline_flows/example.yaml new file mode 100644 index 0000000..8dc630c --- /dev/null +++ b/eregion/configs/pipeline_flows/example.yaml @@ -0,0 +1,67 @@ +# This is an example pipeline flow configuration + +debug: false # Optional: set to true to enable debug mode (more verbose logging, etc.) + +pipelines: + - name: PIPE_1 # Name of the pipeline flow, required + description: Pipeline flow 1 + lazy: false # Set true if this sub-pipeline should be run lazily (i.e. as images arrive) + + nodes: # List of tasks (nodes) in the pipeline flow + - name: TASK_1 # Name of the task node, required + task: package.module.class # Path to the Class of the task to run, must be a subclass of `Task` defined in tasks.task + + init: # Initialization parameters (for Task.__init__) + inputs: # Specify any args needed from outputs of other tasks in this config + arg_1: pipe_name.node_name.data.key # Output of tasks are wrapped in TaskResult objects by the engine, and the data produced by the task is in the TaskResult.data dict; specify the path to the data you want to use as input for this task + # etc. + params: # Specify any additional kwargs (which are not task outputs) needed; refer to the task documentation for required and optional params and kwargs + param_1: value + param_2: value + # etc. + + run: # Run-time (Task.run() or Task.lazy_run()) inputs and parameters, as above, use `inputs` to specify data coming from outputs of other tasks, and `params` for any additional parameters + inputs: + arg_1: pipe_name.node_name.data.key + # etc. + params: + param_1: value + param_2: value + # etc. + + - name: TASK_2 + task: package.module.class + init: + inputs: + arg_1: PIPE_1.TASK_1.data.key # Example of using output from TASK_1 as input for TASK_2 + params: + param_1: value + # etc. + run: + inputs: + arg_1: PIPE_1.TASK_1.data.key # Example of using output from TASK_1 as input for TASK_2 + params: + param_1: value + # etc. + + depends_on: [TASK_1] # should be specified if this task depends on the output of another task; ensures correct execution order in the pipeline flow + + - name: PIPE_2 + description: Pipeline flow 2 + lazy: true # This sub-pipeline will be run lazily (i.e. as images arrive) + nodes: + - name: TASK_3 + task: package.module.class + init: + inputs: + arg_1: PIPE_1.TASK_1.data.key # Example of using output from a task in another pipeline flow as input + params: + param_1: value + # etc. + run: + inputs: + arg_1: PIPE_1.TASK_2.data.key # Example of using output from a task in another pipeline flow as input + params: + param_1: value + # etc. + depends_on: [PIPE_1.TASK_1, PIPE_1.TASK_2] # specify dependencies across pipeline flows as well \ No newline at end of file diff --git a/eregion/configs/pipeline_flows/masterbias_example.yaml b/eregion/configs/pipeline_flows/masterbias_example.yaml index a26630e..1ab6fc0 100644 --- a/eregion/configs/pipeline_flows/masterbias_example.yaml +++ b/eregion/configs/pipeline_flows/masterbias_example.yaml @@ -1,32 +1,69 @@ # This is an example pipeline flow configuration for creating a master bias frame -# Example -debug: false # Optional: set to true to enable debug mode (more verbose logging, etc.) -pipeline: - - name: image_creator - task: tasks.imagegen.ImageCreator +debug: false +pipelines: + - name: calib_flow + description: Pipeline flow to create a master bias frame from bias images lazy: false - init: # Initialization parameters for the image creator - detector_config: "/Users/yashvi/Desktop/Detector Characterization Tools/eregion/configs/detectors/deimos_singledet.yaml" # For required params, see the task documentation - # Any additional params are passed as kwargs that get set in the task's meta dict - # See task documentation for which kwargs are used + nodes: + - name: image_creator + task: tasks.imagegen.ImageCreator + init: + params: + detector_config: "/Users/yashvi/Desktop/Detector Characterization Tools/eregion/eregion/configs/detectors/deimos_singledet.yaml" + run: + params: + input_source: "/Users/yashvi/Desktop/Detector Characterization Tools/DTU_dettest/DTU_singledet_acceptance/PTC/SCI/20250812-101350/*_bias_*.fits" + identifier_func: tasks.custom.guess_image_type_from_filename_DEIMOS - run: # Run-time inputs and parameters for the image creator - params: # Check which params go here in the task documentation - input_source: "/Users/yashvi/Desktop/Detector Characterization Tools/DTU_dettest/DTU_singledet_acceptance/PTC/SCI/20250812-101350/*_bias_*.fits" - identifier_func: tasks.custom.guess_image_type_from_filename_DEIMOS + - name: master_bias + task: tasks.calibration.MasterBias + init: + params: + method: "median" + run: + inputs: + bias_images: calib_flow.image_creator.data.bias + depends_on: [calib_flow.image_creator] - - - name: master_bias - task: tasks.calibration.MasterBias + - name: preproc_flow + description: Example pre-processing pipeline flow lazy: false - init: - method: "median" # Optional param (kwarg) method to combine bias frames; see task documentation for options - run: - inputs: # use inputs to specify data input coming in from output of other tasks - bias_images: image_creator.data.images # E.g. Input images from the image creator task's output - # Each task's outputs are objects of class TaskResult - # TaskResult.data is a dict containing the actual data produced by the task - # Check which outputs are available in the task documentation + nodes: + - name: image_creator + task: tasks.imagegen.ImageCreator + init: + params: + detector_config: "/Users/yashvi/Desktop/Detector Characterization Tools/eregion/eregion/configs/detectors/deimos_singledet.yaml" + run: + params: + input_source: "/Users/yashvi/Desktop/Detector Characterization Tools/DTU_dettest/DTU_singledet_acceptance/PTC/SCI/20250812-101350/*flat_0.000*.fits" + identifier_func: tasks.custom.guess_image_type_from_filename_DEIMOS + + - name: bias_subtraction + task: tasks.preprocessing.BiasSubtraction + init: + inputs: + master_biases: calib_flow.master_bias.data.master_biases + run: + inputs: + images: preproc_flow.image_creator.data.flat + depends_on: [preproc_flow.image_creator, calib_flow.master_bias] + + - name: overscan_subtraction + task: tasks.preprocessing.ScanSubtraction + init: + params: + which_scan: 'serial_overscan' + method: 'median_by_axis' + run: + inputs: + images: preproc_flow.bias_subtraction.data.flat + depends_on: [preproc_flow.bias_subtraction] - depends_on: [image_creator] + - name: badpixel_masking + task: tasks.preprocessing.SigmaClipMasking + run: + inputs: + images: preproc_flow.overscan_subtraction.data.flat + depends_on: [preproc_flow.overscan_subtraction] diff --git a/eregion/core/image_operations.py b/eregion/core/image_operations.py index 2ce07b1..c859bd3 100644 --- a/eregion/core/image_operations.py +++ b/eregion/core/image_operations.py @@ -1,6 +1,6 @@ ### Collection of utility functions for image processing tasks. import numpy as np -from typing import Callable, Any +from typing import Callable, Any, Optional from astropy.stats import sigma_clip def median_combine(images: list[np.ndarray]) -> np.ndarray: @@ -37,7 +37,7 @@ def mean_combine(images: list[np.ndarray]) -> np.ndarray: stacked_images = np.stack(images, axis=0) return np.mean(stacked_images, axis=0) -def subtract_from_image(image: np.ndarray, subtract_object: np.ndarray | float, method: Callable, *args): +def subtract_from_image(image: np.ndarray, subtract_object: np.ndarray | float, method: Optional[Callable]=None, **kwargs): """ Subtract a given object (array or scalar) from an image. @@ -46,16 +46,19 @@ def subtract_from_image(image: np.ndarray, subtract_object: np.ndarray | float, image : np.ndarray 2D numpy array representing the image. subtract_object : np.ndarray or float - The object to subtract from the image. Can be an array of any size from which the value to subtract is derived. - method : Callable + The object to subtract from the image. Can be an array of any size from which the object to subtract is derived. + method : Optional[Callable] A function that takes the subtract_object and returns a scalar/array to subtract from the image. + kwargs : + Additional keyword arguments to pass to the method function. Returns ------- np.ndarray The resulting image after subtraction. """ - value_to_subtract = method(subtract_object, *args) - return image - value_to_subtract, value_to_subtract + if method: + subtract_object = method(subtract_object, **kwargs) + return image - subtract_object, subtract_object def simple_median(data: np.ndarray, *args) -> Any: return np.median(data) diff --git a/eregion/datamodels/image.py b/eregion/datamodels/image.py index c9559aa..0e3c5b9 100644 --- a/eregion/datamodels/image.py +++ b/eregion/datamodels/image.py @@ -8,11 +8,11 @@ import xarray as xr import matplotlib.pyplot as plt from astropy.io import fits -import logging -from eregion.datamodels.image_utils import ensure_dataarray, slice_data +from utils.image_utils import ensure_dataarray, slice_data, ensure_numpy +from utils.misc_utils import configure_logger -logger = logging.getLogger(__name__) +logger = configure_logger(__name__) class DetectorProperties(BaseModel): @@ -50,6 +50,45 @@ def update(self, other: dict[str, Any]): if not hasattr(self, key): setattr(self, key, value) + # --- Mapping protocol implementation --------------------------------- + def __getitem__(self, key: str) -> Any: + try: + return getattr(self, key) + except AttributeError as exc: + raise KeyError(key) from exc + + def __setitem__(self, key: str, value: Any) -> None: + setattr(self, key, value) + + def __delitem__(self, key: str) -> None: + if hasattr(self, key): + delattr(self, key) + else: + raise KeyError(key) + + def __iter__(self): + # iterate over keys present in the model dump + return iter(self.model_dump().keys()) + + def __len__(self) -> int: + return len(self.model_dump()) + + def keys(self): + return self.model_dump().keys() + + def items(self): + return self.model_dump().items() + + def values(self): + return self.model_dump().values() + + def get(self, key: str, default: Any = None) -> Any: + return getattr(self, key, default) + + def to_dict(self) -> dict: + """Return a plain dict snapshot of the current model state.""" + return self.model_dump() + class Output(BaseModel): """ One amplifier/output region within a detector image. @@ -81,10 +120,10 @@ def data(self) -> xr.DataArray: def set_data_in_parent(self, new_data: xr.DataArray | np.ndarray): if self.parent is None or getattr(self.parent, "data", None) is None: raise ValueError("Attach this Output to a DetImage with valid data.") - # Ensure new_data is xr.DataArray - new_data_da = ensure_dataarray(new_data) + # Convert new_data to numpy array if it's an xarray DataArray, to ensure compatibility with parent data array. + new_data_np = ensure_numpy(new_data) # Assign new data to the appropriate slice in the parent DetImage - self.parent.data[self.output_slice] = new_data_da + self.parent.data.values[self.output_slice] = new_data_np def show(self, ax=None, save=None, **imshow_kwargs): if ax is None: @@ -236,9 +275,14 @@ def output_by_id(self, output_id: str) -> Output: except: raise ValueError(f"Output with id {output_id} not found.") - def add_output(self, output: Output): + def add_output(self, output: Output, overwrite: bool = True): output.parent = self - self.outputs[output.id] = output + if output.id in self.outputs: + logger.warning(f"Output with id {output.id} already exists, overwrite is set to {overwrite}.") + if overwrite: + self.outputs[output.id] = output + else: + self.outputs[output.id] = output @property def num_outputs(self) -> int: diff --git a/eregion/pipeline/engine.py b/eregion/pipeline/engine.py index f499ee1..29e576c 100644 --- a/eregion/pipeline/engine.py +++ b/eregion/pipeline/engine.py @@ -1,131 +1,415 @@ from typing import Iterator -from eregion.tasks.task import TaskResult -from eregion.configs.config import PipelineConfig +from copy import deepcopy +import graphlib + +from tasks.task import TaskResult +from configs.config import PipelineConfig +from utils.misc_utils import configure_logger +from utils.misc_utils import load_class from prefect import task, flow -import importlib -import logging +from prefect.futures import wait +from prefect.task_runners import ThreadPoolTaskRunner + +import concurrent.futures as cf +logger = configure_logger(__name__) class PipelineEngine: - def __init__(self, pipeline_config: str | dict): - self.pipeline_config = PipelineConfig(pipeline_config) - self.pipeline = self.build_flow() + """ + Engine to construct and manage execution of pipelines described in a yaml configuration. + + The engine reads a pipeline configuration (path or dict), builds task node objects for each + pipeline, computes dependency-based execution orders (both pipeline-level and per-pipeline tasks), + and saves execution results. + + Parameters + ---------- + pipeline_config_input : str | dict + Path to a configuration file (e.g. YAML/JSON) or a dictionary containing pipeline + definitions that will be parsed by `PipelineConfig`. + + Attributes + ---------- + pipeline_config : PipelineConfig + Parsed and validated pipeline configuration object. + debug : bool + Flag controlling verbose logging; taken from configuration if present. + pipelines : dict + Mapping of pipeline name -> pipeline configuration augmented with `nodes_dict` that + maps task nodes to their runtime objects and metadata. + execution_orders : tuple + Tuple of (pipeline_order, node_orders) describing topological execution order: + - pipeline_order: ordered list/generations of pipeline names based on inter-pipeline deps + - node_orders: mapping pipeline_name -> ordered generations of node names + results : dict + Stores TaskResult objects keyed by node/task names produced during runs. + """ + + def __init__(self, pipeline_config_input: str | dict): + + self.pipeline_config = PipelineConfig(pipeline_config_input) + self.debug = self.pipeline_config.config.get("debug", False) + logger.info("Number of pipelines defined: {}".format(len(self.pipeline_config.config["pipelines"]))) + + # Build the pipeline nodes and note their dependencies + all_node_dependencies = {} + self.pipelines = {} + for pipeline_cfg in self.pipeline_config.config["pipelines"]: + nodes, node_dependencies = self.build_pipeline_nodes(pipeline_cfg) + pipeline_cfg["nodes_dict"] = nodes + self.pipelines[pipeline_cfg["name"]] = pipeline_cfg + all_node_dependencies.update(node_dependencies) + + self.execution_orders = self.build_execution_order(all_node_dependencies) + self.results = {} def update(self, new_config: str | dict): - self.pipeline_config = PipelineConfig(new_config) - self.pipeline = self.build_flow() + """ + Update the pipeline configuration and rebuild the pipelines and execution orders. + :param new_config: str or dict + New pipeline configuration as a path to a YAML/JSON file or a dictionary. + """ + self.__init__(pipeline_config_input=new_config) - @staticmethod - def load_task_class(path: str): - module, cls = path.rsplit(".", 1) - return getattr(importlib.import_module(module), cls) + def build_pipeline_nodes(self, pipeline_cfg: dict): + """ + Build the task nodes for a single pipeline based on its configuration. + - For each node, load the corresponding task class + - Extract the inputs (that are outputs of other nodes) for init and run arguments from the config + - Track the dependencies of each node based on the inputs and explicit depends_on field + :param pipeline_cfg: dict + Configuration dictionary for a single pipeline, containing at least the keys "name", "lazy", and "nodes". + Each node should have at least "name" and "task", and can optionally have "init", "run", and "depends_on" + fields. + :return: tuple + A tuple containing: + - nodes: dict mapping node names to their corresponding task instances, inputs, parameters + and upstream dependencies. + - node_dependencies: dict mapping node names to a set of their upstream dependencies + """ + node_dependencies = {} + nodes = {} + for node in pipeline_cfg["nodes"]: + eregion_task = load_class(node["task"]) + init_block = node.get("init", {}) + init_inputs = init_block.get("inputs", {}) + init_params = init_block.get("params", {}) + run_block = node.get("run", {}) + run_inputs = run_block.get("inputs", {}) + run_params = run_block.get("params", {}) + upstream = node.get("depends_on", []) + + # Ensure that node names in upstream list are named fully with pipeline name + for i, dep in enumerate(upstream): + if '.' not in dep: + upstream[i] = f"{pipeline_cfg['name']}.{dep}" + + # sanity check: if init_inputs and run_inputs are not empty, the corresponding dependencies must be listed in depends_on + # If not, log a warning and add them to the dependencies list + def check_inputs(inputs): + for arg_name, arg_ref in inputs.items(): + upnode_name, _, key = arg_ref.partition(".data.") + # Ensure that task_path (which should be a node name) is named fully with pipeline name + if '.' not in upnode_name: + logger.warning(f"Input '{arg_name}' for task '{node['name']}' references '{upnode_name}' " + f"without pipeline name. Assuming it is from the same pipeline and converting it " + f"to '{pipeline_cfg['name']}.{upnode_name}'.") + upnode_name = f"{pipeline_cfg['name']}.{upnode_name}" + inputs[arg_name] = f"{upnode_name}.data.{key}" + if upnode_name not in upstream: + logger.warning(f"Input '{arg_name}' for task '{node['name']}' references '{upnode_name}' " + f"which is not listed in 'depends_on'. Adding it to the dependencies.") + upstream.append(upnode_name) + + check_inputs(init_inputs) + check_inputs(run_inputs) + + node_dependencies[f"{pipeline_cfg['name']}.{node['name']}"] = set(upstream) + node_dict = {"task": eregion_task, "init_inputs": init_inputs, "init_params": init_params, + "run_inputs": run_inputs, "run_params": run_params, "upstream": upstream, + "params": {'init': init_params | init_inputs, 'run': run_params | run_inputs}} + nodes[f"{pipeline_cfg['name']}.{node['name']}"] = node_dict + + return nodes, node_dependencies @staticmethod - def resolve_inputs(input_spec: dict, results: dict) -> dict: - resolved = {} - for arg_name, ref in input_spec.items(): - task_name, _, key = ref.partition(".data.") - if task_name not in results: - raise ValueError(f"Upstream task '{task_name}' not found") - resolved[arg_name] = results[task_name].data[key] - return resolved + def build_execution_order(node_dependencies: dict): + # First determine the execution order of the pipelines based on their inter-pipeline dependencies + pipe_names = set([name.split('.')[0] for name in node_dependencies.keys()]) # Get unique pipeline names + pipe_dependencies = {pipe: set() for pipe in pipe_names} + + # For each node, check which pipelines its dependencies belong to and add those to the node's pipeline dependencies set + for node, node_dep in node_dependencies.items(): + pipe_name = node.split('.')[0] + dep_pipe_names = set([dep.split('.')[0] for dep in node_dep]) + pipe_dependencies[pipe_name].update(dep_pipe_names - {pipe_name}) + pipe_order = get_dag_order(pipe_dependencies) # Get DAG order of pipelines + logger.info(f"Pipeline order: {pipe_order}") + + # Then determine the execution order of the tasks within each pipeline + node_orders = {} + for pnames in pipe_order: + for pipe_name in pnames: + # Get the subset of nodes that belong to this pipeline and their dependencies + # All dependencies are kept as is to get the full ordering, the correct handling of execution is done later + pipe_nodes = {node: deps for node, deps in node_dependencies.items() if node.startswith(pipe_name + '.')} + node_order = get_dag_order(pipe_nodes) # Get DAG order of nodes within this pipeline + node_orders[pipe_name] = node_order + logger.info(f"Pipeline {pipe_name} order: {node_order}") + + return pipe_order, node_orders @staticmethod - def execute_task( - task_cls, - name: str, - init_params: dict, - run_inputs: dict, - run_params: dict, - upstream: list[str], - lazy: bool = False, - debug: bool = False, - ) -> TaskResult | Iterator[TaskResult]: - - eregion_task = task_cls(name=name, **init_params) - eregion_task.set_logging_level(logging.DEBUG if debug else logging.INFO) - - merged_params = { - "init": init_params, - "run": run_params, - "lazy": lazy - } + def execute_task(node_name, node_dict, upstream_results, lazy=False) -> TaskResult: + """ + Execute a single task given its node_dict containing the task instance, inputs, and parameters. + - Resolve the inputs using self.resolve_inputs + - Call the task's run or lazy_run method with the resolved inputs and parameters + - Return a TaskResult containing the task name, output data, upstream dependencies, and parameters + - If lazy is True, call the task's lazy_run method and return an iterator wrapped in TaskResult. + The caller is responsible for iterating through the results and feeding them downstream. + :param node_name: The name of the node to execute. + :param node_dict: dict + A dictionary containing the task instance, inputs, parameters, and upstream dependencies. + :param upstream_results: dict + A dictionary containing the results of previously executed tasks that are dependencies, used for resolving inputs. + :param lazy: bool + Whether to execute the task in lazy mode. If True, the task must have a lazy_run method that returns an iterator. + :return: TaskResult + A TaskResult containing the task name, output data (or iterator if lazy), upstream dependencies, and parameters. + """ + + def resolve_inputs(input_spec: dict) -> dict: + resolved = {} + for arg_name, ref in input_spec.items(): + task_path, _, key = ref.partition(".data.") + if task_path not in upstream_results: + raise ValueError(f"Upstream task '{task_path}' not found in results") + resolved[arg_name] = upstream_results[task_path].data[key] + return resolved + + init_inputs = resolve_inputs(node_dict["init_inputs"]) + init_params = node_dict["init_params"] + run_inputs = resolve_inputs(node_dict["run_inputs"]) + run_params = node_dict["run_params"] + eregion_task = node_dict["task"](name=node_name, **init_params, **init_inputs) if lazy: if not hasattr(eregion_task, "lazy_run"): - raise TypeError(f"Task '{name}' does not support lazy execution") - - def generator(): - for item in eregion_task.lazy_run(**run_inputs, **run_params): - if not isinstance(item, dict): - raise TypeError("lazy_run must yield dicts") - yield TaskResult( - task_name=name, - data=item, - params=merged_params, - upstream=upstream, - ) - return generator() - - # eager execution - result = eregion_task.run(**run_inputs, **run_params) - if not isinstance(result, dict): - raise TypeError("run() must return a dict") - return TaskResult( - task_name=name, - data=result, - params=merged_params, - upstream=upstream, - ) - - def build_flow(self): - cfg = self.pipeline_config.config - - @flow - def pipeline_flow(): - results = {} - - for node in cfg["pipeline"]: - name = node["name"] - task_cls = self.load_task_class(node["task"]) - - lazy = node.get("lazy", False) - init_params = node.get("init", {}) - - run_block = node.get("run", {}) - input_spec = run_block.get("inputs", {}) - run_params = run_block.get("params", {}) - - # upstream_results = { - # dep: results[dep] - # for dep in node.get("depends_on", []) - # } - upstream = node.get("depends_on", []) - - run_inputs = self.resolve_inputs(input_spec, results) - - @task(name=name) - def prefect_task(): - return self.execute_task( - task_cls=task_cls, - name=name, - init_params=init_params, - run_inputs=run_inputs, - run_params=run_params, - upstream=upstream, - lazy=lazy, - debug=cfg.get("debug", False), - ) - - results[name] = prefect_task() - - return results - - return pipeline_flow - - - ## Execute the pipeline - def run(self): - return self.pipeline() + raise ValueError(f"Task '{eregion_task.name}' does not have a 'lazy_run' method for lazy execution") + res = eregion_task.lazy_run(**run_inputs, **run_params) + else: + res = eregion_task.run(**run_inputs, **run_params) + return TaskResult(task_name=eregion_task.name, data=res, upstream=node_dict["upstream"], + params=node_dict["params"]) + + def execute_pipeline(self, pipe_name, node_order, nodes_dict, results): + """ + Execute a single pipeline given its name, execution order of its nodes, the nodes_dict containing task instances + , and the results dict containing results of previously executed tasks (both from this pipeline and other pipelines). + - Each node task is wrapped in a Prefect task instance that calls self.execute_task + - Nodes belonging to same generation in the execution order are executed in parallel using Prefect's task runner + - Only nodes belonging to the current pipeline are executed, nodes from other pipelines in the order that are + dependencies are checked for their results in the results dict and passed as upstream_results to the current nodes + :param pipe_name: str + Name of the pipeline being executed, used for logging and checking node ownership. + :param node_order: list of sets + Execution order of the nodes in this pipeline, as a list of generations (sets) of node names that can be executed in parallel. + :param nodes_dict: dict + Dictionary mapping node names to their corresponding task instances, inputs, parameters and upstream dependencies. + :param results: dict + Dictionary containing results, keys are node/task names, values are TaskResult objects. + In eager pipelines, same as self.results + In lazy pipelines, a temp results dict that is updated iteratively and merged into self.results after each iteration. + :return: dict + Updated results dictionary after adding results of all executed tasks from this pipeline. + """ + for node_names in node_order: + submitted, futures = [], [] + for node_name in node_names: + # For nodes belonging to other pipelines, check that their results are available + if node_name.split('.')[0] != pipe_name: + if node_name not in results: + raise ValueError(f"Upstream task '{node_name}' not found in results") + continue + + prefect_task = make_prefect_task(node_name, self.execute_task) # Wrap in Prefect task + upstream_results = {up: results[up] for up in nodes_dict[node_name]["upstream"]} + futures.append(prefect_task.submit(node_name=node_name, node_dict=nodes_dict[node_name], + upstream_results=upstream_results, lazy=False)) # Submit for parallel execution + submitted.append(node_name) + + wait(futures) # Wait for all tasks in this generation to complete before moving to the next generation + for node_name, future in zip(submitted, futures): + results[node_name] = future.result() + + return results + + def execute_eager_pipeline(self, pipe_name): + """ + Execute a single eager pipeline (non-lazy) given its name. + - This is a wrapper around self.execute_pipeline that retrieves the node order and nodes_dict for the given + pipeline name + - Pipeline is executed as a Prefect flow that calls self.execute_pipeline + - self.results is updated directly + :param pipe_name: str + Name of the pipeline being executed, used for retrieving node order and nodes_dict, and for logging. + :return: None, self.results is updated in place with the results of executing this pipeline. + """ + node_order = self.execution_orders[1][pipe_name] + nodes_dict = self.pipelines[pipe_name]["nodes_dict"] + pipe_prefect_flow = make_prefect_flow(pipe_name, self.execute_pipeline) + self.results = pipe_prefect_flow(pipe_name, node_order, nodes_dict, self.results) + return self.results + + def execute_lazy_pipeline(self, pipe_name): + """ + Execute a single lazy pipeline given its name. + - Lazy pipelines must have a single source node in the first generation of the node order that returns an + iterator when executed with self.execute_task with lazy=True + - Execute the rest of the pipeline (as a Prefect flow) for each item in the source node's iterator, + feeding the result of each iteration as input, storing iteration results in a temp results dict. + - After each iteration, merge the temp results into self.results by combining results of the same nodes across iterations + :param pipe_name: str + Name of the pipeline being executed, used for retrieving node order and nodes_dict, and for logging. + :return: Generator that yields self.results after each iteration of the source node. + """ + node_order = self.execution_orders[1][pipe_name] + nodes_dict = self.pipelines[pipe_name]["nodes_dict"] + source_node = self.pipelines[pipe_name]["source"] + source_node = f"{pipe_name}.{source_node}" if '.' not in source_node else source_node + + # Sanity check: source_node must be in the first generation of the node order + if source_node not in node_order[0]: + raise ValueError(f"Source node '{source_node}' for lazy pipeline '{pipe_name}' must be in the " + f"first generation of the node order") + # Check that rest of the nodes in first generation have been executed and their results are available + for node_name in node_order[0]: + if node_name.split('.')[0] != pipe_name: + if node_name not in self.results: + raise ValueError(f"Upstream task '{node_name}' not found in results") + + # For lazy pipelines, we need to execute the source node using self.execute_task + # that returns an iterator wrapped in TaskResult, and then feed it downstream. + temp_results = deepcopy(self.results) # Use a temporary results dict to store the iterating results + source_res = self.execute_task(nodes_dict[source_node], temp_results, lazy=True) + source_res_iterator = source_res.data + if not isinstance(source_res_iterator, Iterator): + raise ValueError(f"Source node '{source_node}' for lazy pipeline '{pipe_name}' did not return an iterator") + + # Iterate through the source result and feed it downstream to the rest of the nodes in the pipeline according + # to the execution order + for item in source_res_iterator: + temp_results[source_node] = TaskResult(task_name=source_res.task_name, upstream=source_res.upstream, + params=source_res.params, data=item) + pipe_prefect_flow = make_prefect_flow(pipe_name, self.execute_pipeline) + temp_results = pipe_prefect_flow(pipe_name=pipe_name, node_order=node_order[1:], nodes_dict=nodes_dict, + temp_results=temp_results) + + # Merge the temp_results into self.results (a bit convoluted as we need to add this iterations result + # for all nodes into the main results dict) + for node_name in nodes_dict.keys(): + if node_name not in self.results: + self.results[node_name] = temp_results[node_name] + else: + self.results[node_name] = self.results[node_name].combine(temp_results[node_name]) + yield self.results + + ## Execute the pipelines + def run(self, max_workers=4): + pipe_order = self.execution_orders[0] + for pipe_names in pipe_order: + with cf.ThreadPoolExecutor(max_workers=max_workers) as executor: + future_map = {} + # Submit each pipeline in this generation to the executor + for pipe_name in pipe_names: + if self.pipelines[pipe_name]["lazy"]: + # Run the lazy pipeline to completion inside the worker thread by consuming its generator + def _run_lazy(pn): + last_result = None + for iteration_result in self.execute_lazy_pipeline(pn): + last_result = iteration_result + return last_result + + fut = executor.submit(_run_lazy, pipe_name) + else: + # Eager pipeline updates self.results in place + fut = executor.submit(self.execute_eager_pipeline, pipe_name) + + future_map[fut] = pipe_name + + # Wait for all pipelines in this generation to finish + done, _ = cf.wait(future_map.keys(), return_when=cf.ALL_COMPLETED) + + # Check for exceptions and re-raise after logging + for fut in done: + pipe_name = future_map[fut] + try: + fut.result() + logger.info(f"Pipeline '{pipe_name}' complete") + except Exception as exc: + logger.exception(f"Pipeline '{pipe_name}' failed: {exc}") + raise + +################################# Helper functions ################################# +def get_dag_order(deps): + """ + Get the execution order of tasks based on their dependencies using topological sorting. + :param deps: dict + A dictionary where keys are task names and values are sets of task names that the key task depends on + :return: list of sets + A list of generations of task names that can be executed in parallel, ordered by their dependencies + """ + ts = graphlib.TopologicalSorter(deps) + try: + # check for cycles + ts.prepare() + except graphlib.CycleError as e: + raise ValueError(f"Cyclic dependency detected in pipeline: {e}") + # Get generations of tasks that can be executed in parallel + execution_order = [] + while ts.is_active(): + generation = ts.get_ready() + execution_order.append(generation) + ts.done(*generation) + return execution_order + +def make_prefect_task(task_name, task_func, retries=3, retry_delay_seconds=3): + """ + Wrap a task function in a Prefect task with the given name and retry logic. + :param task_name: str + Name of the Prefect task, used for logging and tracking in Prefect. + :param task_func: function + The function that implements the task logic + :param retries: int + Number of times to retry the task in case of failure before giving up. + :param retry_delay_seconds: int + Number of seconds to wait between retries. + :return: Prefect task + A Prefect task that wraps the given task function with the specified name and retry logic. + """ + @task(name=task_name, retries=retries, retry_delay_seconds=retry_delay_seconds) + def prefect_task(*args, **kwargs): + return task_func(*args, **kwargs) + return prefect_task +def make_prefect_flow(flow_name, flow_func, task_runner=ThreadPoolTaskRunner, max_workers=4): + """ + Wrap a flow function in a Prefect flow with the given name and task runner for parallel execution. + :param flow_name: str + Name of the Prefect flow, used for logging and tracking in Prefect. + :param flow_func: function + The function that implements the flow logic. + :param task_runner: Prefect task runner class + The Prefect task runner class to use for parallel execution of tasks within the flow + (e.g., ThreadPoolTaskRunner or ProcessPoolTaskRunner). + :param max_workers: int + The maximum number of worker threads/processes to use for parallel execution of tasks within the flow + :return: Prefect flow + A Prefect flow that wraps the given flow function with the specified name and task runner for parallel execution. + """ + @flow(name=flow_name, task_runner=task_runner(max_workers=max_workers)) + def prefect_flow(*args, **kwargs): + return flow_func(*args, **kwargs) + return prefect_flow \ No newline at end of file diff --git a/eregion/tasks/calibration.py b/eregion/tasks/calibration.py index 30eb394..b1a3b2b 100644 --- a/eregion/tasks/calibration.py +++ b/eregion/tasks/calibration.py @@ -1,9 +1,9 @@ import numpy as np -from eregion.tasks.task import Task -from eregion.datamodels.image import DetImage -from eregion.datamodels.image_utils import ensure_dataarray -from eregion.core.image_operations import median_combine +from tasks.task import Task +from datamodels.image import DetImage +from utils.image_utils import ensure_dataarray +from utils.misc_utils import load_class # Task to generate master bias @@ -22,6 +22,11 @@ def run(self, bias_images: list[DetImage]) -> dict[str,list[DetImage]]: # Check that bias_images are DetImage instances if not isinstance(bias_images, list) or not all(isinstance(img, DetImage) for img in bias_images): raise ValueError("bias_images must be a list of DetImage instances.") + # Verify that all input images are of 'bias' image_type + # TODO: Add a init kwarg to specify the expected image_type of input bias frames, and check against that instead of hardcoding 'bias' + for img in bias_images: + if 'bias' not in img.image_type.lower(): + raise ValueError(f"All input images must have image_type 'bias'. Found '{img.image_type}' in {img.meta.filename}.") # Group bias images by detector name bias_dict = {} @@ -58,12 +63,9 @@ def _create_masterbias(self, biases: list[np.ndarray], method='median')-> np.nda :return: master_bias: numpy array The generated master bias frame. """ - if method == 'median': - return median_combine(biases) - else: - # print available methods - self.print_methods() - raise NotImplementedError + self.set_method(method) + method_cls = load_class(self.methods[method]) + return method_cls(biases) @property def methods(self): @@ -73,10 +75,9 @@ def methods(self): Dictionary with method names as keys and function signatures as values. """ return { - 'median': 'core.image_operations.median_combine(images: list[np.ndarray]) -> np.ndarray', + 'median': 'core.image_operations.median_combine', } - def __call__(self, biases: list[np.ndarray], method='median') -> np.ndarray: return self._create_masterbias(biases, method=method) diff --git a/eregion/tasks/imagegen.py b/eregion/tasks/imagegen.py index 4d0eeb3..91a0a01 100644 --- a/eregion/tasks/imagegen.py +++ b/eregion/tasks/imagegen.py @@ -2,14 +2,14 @@ import glob2 import time import importlib -from typing import Iterator, Generator, Callable, Optional, Iterable, Any +from typing import Iterator, Generator, Callable, Iterable from joblib import Parallel, delayed -from eregion.datamodels.image import * -from eregion.datamodels.image_utils import ensure_dataarray -from eregion.configs.config import DetectorConfig -from eregion.tasks.task import LazyTask -from eregion.core.io_utils import load_image_fits, parse_list_of_files, guess_image_type_from_header +from datamodels.image import * +from utils.image_utils import ensure_dataarray +from configs.config import DetectorConfig +from tasks.task import LazyTask +from utils.io_utils import load_image_fits, parse_list_of_files, guess_image_type_from_header ## Classes to handle image generation from configuration files @@ -275,36 +275,4 @@ def lazy_run(self, image_type = 'unknown' images = self._build_image_objects(input_data_array, image_type, filename=None) images_batch[image_type] = images_batch.get(image_type, []) + images - yield images_batch - - def run(self, - input_source: str | list[str] | Iterable[np.ndarray], - identifier_func: Optional[str | Callable[..., str]] = None, - identifier_kwargs: Optional[dict[str, Any]] = None, - fitsloader_func: Optional[str | Callable[..., str]] = None, - fitsloader_kwargs: Optional[dict[str, Any]] = None, - require_data: bool = True, - ) -> dict[str, list]: - """ - Eager run method to generate image objects from input source. - :param input_source: str or list of str or Iterable of np.ndarray - Input source can be a path to FITS files (file, directory, glob pattern), - a list of FITS file paths, or an iterable of numpy arrays. - :param identifier_func: str (from pipeline config) or Callable (if using directly), optional - Custom image type identification function. - :param identifier_kwargs: dict, optional - Additional keyword arguments for the identifier function. - :param fitsloader_func: str (from pipeline config) or Callable (if using directly), optional - Custom FITS loading function. - :param fitsloader_kwargs: dict, optional - Additional keyword arguments for the FITS loader function. - :param require_data: bool - If True, raises an error if no files are found in the input source. - :return: {"images": list of DetImage} - List of DetImage objects stored under the key 'images' in the returned dict. - """ - all_images = {} - for batch in self.lazy_run(input_source, identifier_func, identifier_kwargs, fitsloader_func, fitsloader_kwargs, require_data): - for image_type, images in batch.items(): - all_images[image_type] = all_images.get(image_type, []) + images - return all_images \ No newline at end of file + yield images_batch \ No newline at end of file diff --git a/eregion/tasks/preprocessing.py b/eregion/tasks/preprocessing.py index 064963e..7d66795 100644 --- a/eregion/tasks/preprocessing.py +++ b/eregion/tasks/preprocessing.py @@ -1,116 +1,154 @@ from copy import deepcopy - +from abc import abstractmethod from typing import Optional, Iterable, Iterator, Any import numpy as np import xarray as xr -from eregion.tasks.task import LazyTask -from eregion.datamodels.image import DetImage, Output -from eregion.datamodels.image_utils import ensure_dataarray -from eregion.core.image_operations import subtract_from_image, sigma_clip_image +from tasks.task import LazyTask +from datamodels.image import DetImage, Output +from utils.image_utils import ensure_dataarray, ensure_numpy +from utils.misc_utils import load_class +from core.image_operations import subtract_from_image, sigma_clip_image from joblib import Parallel, delayed +########### BasePreprocessingTask ########### +class BasePreprocessingTask(LazyTask): + """ + Base class for preproc tasks in this file to avoid repetitive code for lazy_run as most apply some processing + to each image independently and can benefit from parallelization at the image level. Each subclass just needs to + implement _process_single_image which defines how to process a single DetImage, and lazy_run will handle batching + and parallel execution. + """ + def __init__(self, name: Optional[str] = None, **kwargs): + super().__init__(name=name, **kwargs) + + @abstractmethod + def _process_single_image(self, img: DetImage) -> DetImage: + """ + Process a single DetImage and return the processed DetImage. + :param img: DetImage + The image to be processed. + :return: DetImage + The processed image. + """ + pass + + def lazy_run( + self, + images: Iterable[DetImage], + batch_size: int = 1 + ): + """ + Lazy execution: yield processed batches as they complete. + - images: an iterable/stream of DetImage + - batch_size: number of images to process per batch + """ + + def process_batch(): + output = Parallel(n_jobs=self.n_jobs)(delayed(self._process_single_image)(i) for i in batch) + outdict = {} + for out in output: + outdict[out.image_type] = outdict.get(out.image_type, []) + [out] + yield outdict + + batch: list[DetImage] = [] + for img in images: + batch.append(img) + if len(batch) >= batch_size: + yield from process_batch() + batch = [] + if batch: + yield from process_batch() + + ########### Bias Subtraction Task ########### -class BiasSubtraction(LazyTask): +class BiasSubtraction(BasePreprocessingTask): required_keys = [] def __init__(self, - master_bias: DetImage | np.ndarray | xr.DataArray, + master_biases: list[DetImage], name: Optional[str] = "subtract_bias", **kwargs - ): + ): """ - Initialize the BiasSubtraction task. kwargs must include 'master_bias' which can be a DetImage or numpy array. - :param master_bias: DetImage or np.ndarray or xr.DataArray - The master bias frame to subtract from science images. + Initialize the BiasSubtraction task. + :param master_biases: list[DetImage] + The master bias frames to subtract from science images. It can be a list of DetImage objects in case of mosaics :param name: Optional[str] """ - super().__init__(name=name, **kwargs) - if isinstance(master_bias, DetImage): - self.master_bias_data = master_bias.data - elif isinstance(master_bias, np.ndarray) or isinstance(master_bias, xr.DataArray): - self.master_bias_data = ensure_dataarray(master_bias) - else: - raise ValueError("master_bias must be either a DetImage, a numpy array, or an xarray DataArray.") - self.meta.update({"master_bias": master_bias}) - + if not isinstance(master_biases, list) or not all(isinstance(img, DetImage) for img in master_biases): + raise ValueError("master_biases must be a list of DetImage instances.") + # Verify image_type of master_biases is 'master_bias' + if not all(img.image_type.lower()=='master_bias' for img in master_biases): + raise ValueError("All master_biases must have image_type 'master_bias'.") + self.master_biases = master_biases def _process_single_image(self, img: DetImage) -> DetImage: """ - Process a single DetImage by subtracting the master bias. + Process a single DetImage by subtracting the corresponding master bias. :param img: DetImage The science image to be bias-subtracted. :return: DetImage The bias-subtracted science image. """ - # check that dimensions match - if img.data.shape != self.master_bias_data.shape: - raise ValueError(f"Image shape {img.data.shape} does not match master bias shape {self.master_bias_data.shape}.") + # Load the appropriate master bias for this image based on metadata (e.g., detector name) + master_bias = None + for mb in self.master_biases: + if 'name' not in mb.meta.keys() or 'name' not in img.meta.keys(): + raise ValueError(f"Either master bias or input DetImage is missing 'name' in metadata. Cannot match master bias to image.") + if mb.meta.name == img.meta.name: + master_bias = mb + break + if master_bias is None: + raise ValueError(f"No matching master bias found for DetImage with name '{img.meta['name']}'.") + # initialize a new DetImage for the bias-subtracted result as a copy of the input image bias_subtracted = deepcopy(img) - bias_subtracted.data = ensure_dataarray(img.data - self.master_bias_data) + # Update its data, meta + bias_subtracted.data = ensure_dataarray(self.subtract(img.data, master_bias.data)) bias_subtracted.meta["bias_subtracted"] = True - bias_subtracted.image_type = f'bias_sub_{img.image_type}' return bias_subtracted - - def lazy_run( - self, - images: Iterable[DetImage], - batch_size: int = 1, - ) -> Iterator[list[DetImage]]: - """ - Lazy execution: yield processed batches as they complete. - - images: an iterable/stream of DetImage - - batch_size: number of images to process per batch - """ - - batch: list[DetImage] = [] - for img in images: - batch.append(img) - if len(batch) >= batch_size: - yield Parallel(n_jobs=self.n_jobs)(delayed(self._process_single_image)(i) for i in batch) - batch = [] - - if batch: - yield Parallel(n_jobs=self.n_jobs)(delayed(self._process_single_image)(i) for i in batch) - - - def run(self, images: list[DetImage]) -> list[DetImage]: + def lazy_run(self, images: Iterable[DetImage], batch_size: int = 1) -> Iterator[dict[str, list[DetImage]]]: """ - Subtract the master bias from the given images. - :param images: list of DetImage - List of science images to be bias-subtracted. - :return: list of DetImage - List of bias-subtracted science images. + Inherit the lazy_run from BasePreprocessingTask which will handle batching and parallel execution of _process_single_image. + :param images: Iterable of DetImage objects to be processed. + :param batch_size: Number of images to process in each batch. Default is 1 (process images one at a time). + :return: An iterator that yields dictionaries with image_type as keys and lists of processed DetImage objects as values. """ - ## call lazy_run and collect all results - results = [] - for batch in self.lazy_run(images, batch_size=len(images)): - results.extend(batch) - return results + return super().lazy_run(images, batch_size) - def __call__(self, image: np.ndarray) -> np.ndarray: + @staticmethod + def subtract(image: np.ndarray | xr.DataArray, master_bias: np.ndarray | xr.DataArray) -> np.ndarray: """ Convenience non-Prefect path for raw arrays. """ - return image - self.master_bias_data.values + if master_bias.shape != image.shape: + raise ValueError( + f"Master bias array shape {master_bias.shape} does not match image shape {image.shape}.") + bias_subtracted, _ = subtract_from_image(ensure_numpy(image), ensure_numpy(master_bias)) + return bias_subtracted + def __call__(self, image, master_bias): + return self.subtract(image, master_bias) -########### Scan Subtraction Task ########### -class ScanSubtraction(LazyTask): - required_keys = ["which_scan"] - def __init__(self, name: Optional[str] = None, **kwargs): +########### Scan Subtraction Task ########### +class ScanSubtraction(BasePreprocessingTask): + def __init__(self, + which_scan: str, + name: Optional[str] = None, + **kwargs + ): """ - Initialize the ScanSubtraction task. kwargs must include 'which_scan' indicating which scan to subtract. + Initialize the ScanSubtraction task + :param which_scan: str, required + One of 'serial_prescan', 'serial_overscan', 'parallel_prescan', 'parallel_overscan'. :param name: Optional[str] :param kwargs: - which_scan: str, required - One of 'serial_prescan', 'serial_overscan', 'parallel_prescan', 'parallel_overscan'. method: str, optional Method to use for subtraction. Default is 'simple_median'. skip_rows: int, optional @@ -119,35 +157,32 @@ def __init__(self, name: Optional[str] = None, **kwargs): Number of columns to skip from the start of the scan region. Default is 0. """ super().__init__(name=name, **kwargs) - self.which_scan = kwargs["which_scan"] + self.which_scan = which_scan.lower() if self.which_scan not in ["serial_prescan", "serial_overscan", "parallel_prescan", "parallel_overscan"]: raise ValueError(f"Invalid which_scan value: {self.which_scan}") - self.method = kwargs.get("method", "simple_median") - if self.method not in self.methods: - self.print_methods() - raise NotImplementedError(f"Method {self.method} not implemented for ScanSubtraction.") - self.method = globals()[self.method] - + if self.method is None: + self.logger.warning("No method specified for scan subtraction. Defaulting to 'simple_median'.") + self.set_method("simple_median") def _subtract_scan_per_output(self, output: Output, skip_rows: int = 0, skip_cols: int = 0): axis, scan = self.which_scan.split("_") getfunc = getattr(output, "get_" + scan) scan_data = getfunc(kind=axis) # xr.DataArray - trimmed_scan_data = scan_data.isel({'y': slice(skip_rows, None), 'x': slice(skip_cols, None)}) + trimmed_scan_data = scan_data.isel({'y': slice(skip_rows, None, None), 'x': slice(skip_cols, None, None)}) - axisint = 1 if axis == "serial" else 0 + medaxis = getattr(output, axis+"_axis") + axisint = 0 if medaxis == 'y' else 1 subtracted_scan, subtract_value = subtract_from_image( image=output.data.values, subtract_object=trimmed_scan_data.values, - method=self.method, + method=load_class(self.method), axis=axisint ) return subtracted_scan, subtract_value - def _process_single_image(self, image: DetImage) -> DetImage: """ Process a single DetImage by subtracting the specified scan from each output. @@ -169,44 +204,16 @@ def _process_single_image(self, image: DetImage) -> DetImage: setattr(output, f"{self.which_scan}_median", subtract_value) new_image.meta[f"{self.which_scan}_subtracted"] = True - new_image.image_type = f'scan_sub_{image.image_type}' return new_image - - def lazy_run( - self, - images: Iterable[DetImage], - batch_size: int = 1, - ) -> Iterator[list[DetImage]]: + def lazy_run(self, images: Iterable[DetImage], batch_size: int = 1) -> Iterator[dict[str, list[DetImage]]]: """ - Lazy execution: yield processed batches as they complete. - - images: an iterable/stream of DetImage - - batch_size: number of images to process per batch + Inherit the lazy_run from BasePreprocessingTask which will handle batching and parallel execution of _process_single_image. + :param images: Iterable of DetImage objects to be processed. + :param batch_size: Number of images to process in each batch. Default is 1 (process images one at a time). + :return: An iterator that yields dictionaries with image_type as keys and lists of processed DetImage objects as values. """ - batch: list[DetImage] = [] - for img in images: - batch.append(img) - if len(batch) >= batch_size: - yield Parallel(n_jobs=self.n_jobs)(delayed(self._process_single_image)(i) for i in batch) - batch = [] - - if batch: - yield Parallel(n_jobs=self.n_jobs)(delayed(self._process_single_image)(i) for i in batch) - - - def run(self, images: list[DetImage]) -> list[DetImage]: - """ - Subtract the specified scan from the given images. - :param images: list of DetImage - List of images to be processed. - :return: list of DetImage - List of processed images with scans subtracted. - """ - ## call lazy_run and collect all results - results = [] - for batch in self.lazy_run(images, batch_size=len(images)): - results.extend(batch) - return results + return super().lazy_run(images, batch_size) @property def methods(self): @@ -216,55 +223,68 @@ def methods(self): Dictionary with method names as keys and function signatures as values. """ return { - 'simple_median': 'core.image_operations.simple_median(data: np.ndarray) -> float[Any]', - 'median_by_axis': 'core.image_operations.median_by_axis(data: np.ndarray, axis: int) -> np.ndarray', - 'simple_mean': 'core.image_operations.simple_mean(data: np.ndarray) -> float[Any]', + 'simple_median': 'core.image_operations.simple_median', + 'median_by_axis': 'core.image_operations.median_by_axis', + 'simple_mean': 'core.image_operations.simple_mean', } + ########### Cosmic Ray/Bad Pixel Masking ########### -class SigmaClipMasking(LazyTask): +class SigmaClipMasking(BasePreprocessingTask): required_keys = [] - def __init__(self, name: Optional[str] = "sigma_clip_masking", sigma_clip_args: dict[str, Any] = None, **kwargs): + def __init__(self, + name: Optional[str] = "sigma_clip_masking", + sigma_clip_args: Optional[dict[str, Any]] = None, + **kwargs + ): """ Initialize the SigmaClipMasking task. :param name: Optional[str] - :param sigma_clip_args: Dict[str, Any] + :param sigma_clip_args: Optional[dict[str, Any]] Arguments to pass to astropy.stats.sigma_clip function. :param kwargs: """ super().__init__(name=name, **kwargs) self.sigma_clip_args = sigma_clip_args or {"sigma": 5.0, "axis": None, "masked": True, "copy": True, "grow": 10.0} - - def _sigma_clip_per_output(self, output: Output) -> xr.DataArray: + def _sigma_clip_per_output(self, output: Output) -> Output: """ - Apply sigma clipping to a single output to create a mask. + Apply sigma clipping to a single output to create a mask. Masks are saved as attributes of the output for later use. :param output: Output The output to be processed. - :return: xr.DataArray - The mask created from sigma clipping. + :return: Output + The output with an added mask attribute for sigma clipping. """ sigma_clip_args_overscan = deepcopy(self.sigma_clip_args) sigma_clip_args_overscan.pop("grow") + # clip serial overscan serial_overscan_data = output.get_overscan(kind="serial").values serial_overscan_clipped = sigma_clip_image(serial_overscan_data, **sigma_clip_args_overscan) + # clip parallel overscan parallel_overscan_data = output.get_overscan(kind="parallel").values parallel_overscan_clipped = sigma_clip_image(parallel_overscan_data, **sigma_clip_args_overscan) + # clip image data region - im_yslc = slice(None, output.parallel_overscan.start) - im_xslc = slice(None, output.serial_overscan.start) - image_data = output.data.isel(y=im_yslc, x=im_xslc).values + im_slc_parallel = slice(output.parallel_prescan.stop, output.parallel_overscan.start) + im_slc_serial = slice(output.serial_prescan.stop, output.serial_overscan.start) + im_slcs = {output.parallel_axis: im_slc_parallel, output.serial_axis: im_slc_serial} + image_data = output.data.isel(**im_slcs).values image_data_clipped = sigma_clip_image(image_data, **self.sigma_clip_args) + # combine masks - combined_clipped = np.ma.MaskedArray(output.data.values, mask=np.zeros_like(output.data.values)) - combined_clipped.mask[im_yslc, im_xslc] = image_data_clipped.mask - combined_clipped.mask[output.serial_overscan, :] |= serial_overscan_clipped.mask - combined_clipped.mask[:, output.parallel_overscan] |= parallel_overscan_clipped.mask - return xr.DataArray(combined_clipped, dims=["y", "x"]) + combined_mask = xr.zeros_like(output.data).astype(bool) + combined_mask.isel(**im_slcs).values |= image_data_clipped.mask + combined_mask.isel(**{output.serial_axis: output.serial_overscan}).values |= serial_overscan_clipped.mask + combined_mask.isel(**{output.parallel_axis: output.parallel_overscan}).values |= parallel_overscan_clipped.mask + # Set mask as an attribute of output + if not hasattr(output, 'masks'): + output.masks = {} + output.masks['sigma_clip_mask'] = combined_mask + return output def _process_single_image(self, img: DetImage) -> DetImage: """ @@ -278,47 +298,20 @@ def _process_single_image(self, img: DetImage) -> DetImage: results = Parallel(n_jobs=self.n_jobs)( delayed(self._sigma_clip_per_output)(output) for output in new_image.outputs.values() ) - for output, clipped_data in zip(new_image.outputs.values(), results): - output.set_data_in_parent(clipped_data) + for new_output in results: + new_image.add_output(new_output, overwrite=True) + new_image.meta["bad_pixel_masked"] = True - new_image.image_type = f'bpm_{img.image_type}' return new_image - - def lazy_run( - self, - images: Iterable[DetImage], - batch_size: int = 1, - ) -> Iterator[list[DetImage]]: - """ - Lazy execution: yield processed batches as they complete. - - images: an iterable/stream of DetImage - - batch_size: number of images to process per batch - """ - batch: list[DetImage] = [] - for img in images: - batch.append(img) - if len(batch) >= batch_size: - yield Parallel(n_jobs=self.n_jobs)(delayed(self._process_single_image)(i) for i in batch) - batch = [] - - if batch: - yield Parallel(n_jobs=self.n_jobs)(delayed(self._process_single_image)(i) for i in batch) - - - def run(self, images: list[DetImage]) -> list[DetImage]: + def lazy_run(self, images: Iterable[DetImage], batch_size: int = 1) -> Iterator[dict[str, list[DetImage]]]: """ - Apply sigma clipping to create bad pixel masks for the given images. - :param images: list of DetImage - List of images to be processed. - :return: list of DetImage - List of processed images with updated masks. + Inherit the lazy_run from BasePreprocessingTask which will handle batching and parallel execution of _process_single_image. + :param images: Iterable of DetImage objects to be processed. + :param batch_size: Number of images to process in each batch. Default is 1 (process images one at a time). + :return: An iterator that yields dictionaries with image_type as keys and lists of processed DetImage objects as values. """ - ## call lazy_run and collect all results - results = [] - for batch in self.lazy_run(images, batch_size=len(images)): - results.extend(batch) - return results + return super().lazy_run(images, batch_size) diff --git a/eregion/tasks/task.py b/eregion/tasks/task.py index c12d172..3af6d9e 100644 --- a/eregion/tasks/task.py +++ b/eregion/tasks/task.py @@ -3,11 +3,13 @@ from pydantic import BaseModel, Field from typing import Any - import os -import logging import multiprocessing from astropy.time import Time +import inspect + +from utils.io_utils import configure_logger +from utils.misc_utils import load_class # Base abstract class for tasks, should have a call method for direct execution and a run method for pipeline workflows class Task(ABC): @@ -16,18 +18,32 @@ class Task(ABC): def __init__(self, name=None, **kwargs): self.name = name or self.__class__.__name__ self.n_jobs = kwargs.get('n_jobs') or self.get_default_n_jobs() - self.logger = self.configure_logger() + self.logger = configure_logger(self.name) - self.meta = {} - self.meta.update(kwargs) for key in self.required_keys: if key not in kwargs: raise ValueError(f"Missing required keyword argument: {key}") + self.meta = {} + self.meta.update(kwargs) - @abstractmethod - def run(self, *args, **kwargs): - """Run the task in a pipeline workflow.""" - pass + self._method = None + if 'method' in self.meta: + self.set_method(self.meta['method']) + + def set_method(self, method): + self._method = method + self.verify_method() + + def verify_method(self): + if self._method not in self.methods: + self.logger.warning(f"Supplied method '{self._method}' is not implemented for task {self.name}.") + self.print_methods() + raise NotImplementedError + + @property + def method(self): + """Return the currently selected method for this task.""" + return self.methods[self._method] if self._method else None @property def methods(self): @@ -42,25 +58,20 @@ def print_methods(self): """ Print a list available methods for this task and their function signatures. """ - print(f"Available methods for {self.name}:") - for method, signature in self.methods.items(): - print(f"- {method}: {signature}") + self.logger.info(f"Available methods for {self.name}:") + for method, func_path in self.methods.items(): + sig = inspect.signature(load_class(func_path)) + self.logger.info(f"- {method}: {func_path}, {sig}") + + @abstractmethod + def run(self, *args, **kwargs) -> dict[str, Any]: + """Run the task. Output should be a dict with string keys and any type of resulting data.""" + pass def __call__(self, *args, **kwargs): """Directly execute the task.""" return self.run(*args, **kwargs) - def configure_logger(self): - """ - Configure the logger for the task. - """ - logger = logging.getLogger(self.name) - handler = logging.StreamHandler() - formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') - handler.setFormatter(formatter) - logger.addHandler(handler) - logger.setLevel(logging.INFO) - return logger def set_logging_level(self, level: int): """ @@ -105,14 +116,25 @@ def __init__(self, name=None, watch_mode=False, poll_interval=10, **kwargs): self.poll_interval = poll_interval @abstractmethod - def lazy_run(self, *args, **kwargs) -> Iterator: - """Run the task lazily, yielding results.""" + def lazy_run(self, *args, **kwargs) -> Iterator[dict[str, Any]]: + """Run the task lazily, yielding results (which should be in dict format).""" pass - @abstractmethod - def run(self, *args, **kwargs): + def run(self, *args, **kwargs) -> dict[str, Any]: """Default run executes the lazy_run and collects all results.""" - return list(self.lazy_run(*args, **kwargs)) + res_dict = {} + for batch in self.lazy_run(*args, **kwargs): + for key, value in batch.items(): + if key in res_dict: + # If the key already exists, we merge the values. Extend lists, convert everything else into lists and extend + res_dict[key] = [res_dict[key]] if not isinstance(res_dict[key], list) else res_dict[key] + value_list = [value] if not isinstance(value, list) else value + res_dict[key].extend(value_list) + else: + res_dict[key] = value + return res_dict + + class TaskResult(BaseModel): @@ -121,8 +143,43 @@ class TaskResult(BaseModel): params: dict[str, Any] = Field(default_factory=dict) upstream: list[str] = Field(default_factory=list) - timestamp: Time = Field(default_factory=lambda: Time.now()) + timestamp: Time | list[Time] = Field(default_factory=lambda: Time.now()) model_config = {"arbitrary_types_allowed": True} + def combine(self, other: 'TaskResult') -> 'TaskResult': + """ + Combine this TaskResult with another with the same task_name, merging their data and metadata. (useful for lazy iterations) + :param other: TaskResult + Another TaskResult to combine with. + :return: TaskResult + A new TaskResult containing the combined data and metadata. + """ + if self.task_name != other.task_name: + raise ValueError("Can only combine TaskResults with the same task_name.") + + combined_data = self.data.copy() + for key, value in other.data.items(): + if key in combined_data: + # If the key already exists, we merge the values. Extend lists, convert everything else into lists and extend + combined_data[key] = [combined_data[key]] if not isinstance(combined_data[key], list) else combined_data[key] + value_list = [value] if not isinstance(value, list) else value + combined_data[key].extend(value_list) + else: + combined_data[key] = value + + combined_params = {**self.params, **other.params} + combined_upstream = list(set(self.upstream + other.upstream)) + combined_timestamp = self.timestamp if isinstance(self.timestamp, list) else [self.timestamp] + other_timestamp = other.timestamp if isinstance(other.timestamp, list) else [other.timestamp] + combined_timestamp.extend(other_timestamp) + + return TaskResult( + task_name=self.task_name, + data=combined_data, + params=combined_params, + upstream=combined_upstream, + timestamp=combined_timestamp + ) + TaskResult.model_rebuild() diff --git a/eregion/utils/__init__.py b/eregion/utils/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/eregion/datamodels/image_utils.py b/eregion/utils/image_utils.py similarity index 83% rename from eregion/datamodels/image_utils.py rename to eregion/utils/image_utils.py index ecbb9db..6507687 100644 --- a/eregion/datamodels/image_utils.py +++ b/eregion/utils/image_utils.py @@ -9,7 +9,6 @@ def ensure_dataarray(data: xr.DataArray | np.ndarray) -> xr.DataArray: :raises TypeError: if data is not xarray.DataArray or numpy.ndarray :return: xarray.DataArray with dims ('y','x') or ('y','x','t') """ - if isinstance(data, xr.DataArray): # Ensure dims ordering is ('y','x'); rename if unnamed if data.ndim == 2 and data.dims != ("y", "x"): @@ -50,6 +49,21 @@ def ensure_dataarray(data: xr.DataArray | np.ndarray) -> xr.DataArray: raise TypeError("data must be an xarray.DataArray, or numpy.ndarray") +def ensure_numpy(data: xr.DataArray | np.ndarray) -> np.ndarray: + """ + Coerce xarray.DataArray to numpy.ndarray + :type data: Union[xr.DataArray, np.ndarray] + :rtype: np.ndarray + :raises TypeError: if data is not xarray.DataArray or numpy.ndarray + :return: numpy.ndarray + """ + if isinstance(data, np.ndarray): + return data + elif isinstance(data, xr.DataArray): + return data.values + else: + raise TypeError("data must be an xarray.DataArray, or numpy.ndarray") + def slice_data(data: xr.DataArray, slicer: tuple[slice, ...]) -> xr.DataArray: """ Slice a 2D or 3D DataArray using ('y','x','t) positional slices. diff --git a/eregion/core/io_utils.py b/eregion/utils/io_utils.py similarity index 85% rename from eregion/core/io_utils.py rename to eregion/utils/io_utils.py index c81f2aa..2a70423 100644 --- a/eregion/core/io_utils.py +++ b/eregion/utils/io_utils.py @@ -4,10 +4,12 @@ from astropy.io import fits import shutil +from utils.misc_utils import configure_logger +logger = configure_logger(__name__) def search_directory_for_fits_files(directory: str) -> list[str]: fits_files = glob2.glob(os.path.join(directory, '**/*.fits*'), recursive=True) - print(f"Found {len(fits_files)} FITS files in directory {directory} and its sub-directories.") + logger.info(f"Found {len(fits_files)} FITS files in directory {directory} and its sub-directories.") return fits_files @@ -47,18 +49,18 @@ def parse_list_of_files(items: list[str]) -> list[str]: for item in items: # if item is a compressed archive, unpack it and search for fits files within if is_archive_file(item): - print(f"Found archive file {item}, unpacking and searching for FITS files within.") + logger.info(f"Found archive file {item}, unpacking and searching for FITS files within.") # unpack archive to parent directory and search for fits files within extraction_dir = os.path.join(os.path.dirname(item), str(os.path.basename(item).split('.')[0])) shutil.unpack_archive(item, extraction_dir) new_items.extend(search_directory_for_fits_files(extraction_dir)) elif is_fits_file(item): - print(f"Found FITS file {item}.") + logger.info(f"Found FITS file {item}.") new_items.append(item) elif is_directory(item): new_items.extend(search_directory_for_fits_files(item)) else: - print(f"Unrecognized item: {item}, not a FITS file, archive or directory, skipping.") + logger.info(f"Unrecognized item: {item}, not a FITS file, archive or directory, skipping.") return sorted(new_items) @@ -71,7 +73,7 @@ def load_image_fits(filename: str) -> tuple[list[Any], list[fits.Header]]: A tuple containing a list of data arrays (image/table/...) for each HDU and a list of corresponding FITS headers. """ if ".fits.fz" in filename: - print(f"Loading compressed FITS file {filename} using fits.open() with in-memory decompression enabled.") + logger.info(f"Loading compressed FITS file {filename} using fits.open() with in-memory decompression enabled.") input_data_array, input_headers = [], [] try: with fits.open(filename, decompress_in_memory=True) as hdulist: @@ -79,7 +81,7 @@ def load_image_fits(filename: str) -> tuple[list[Any], list[fits.Header]]: input_data_array.append(hdu.data) input_headers.append(hdu.header) except Exception as e: - print(f"Error loading FITS file {filename}: {e}") + logger.info(f"Error loading FITS file {filename}: {e}") return input_data_array, input_headers diff --git a/eregion/utils/misc_utils.py b/eregion/utils/misc_utils.py new file mode 100644 index 0000000..d37f50a --- /dev/null +++ b/eregion/utils/misc_utils.py @@ -0,0 +1,28 @@ +import logging +import importlib + + +def configure_logger(name): + """ + Configure a logger + """ + logger = logging.getLogger(name) + handler = logging.StreamHandler() + formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') + handler.setFormatter(formatter) + logger.addHandler(handler) + logger.setLevel(logging.INFO) + logger.propagate = False + return logger + + +def load_class(path: str): + """ + Dynamically load a class from a given path. Has to be in eregion package, or importable from the current environment. + :param path: str + The full path to the class, e.g. "module.submodule.ClassName". + :return: class + The loaded class call. + """ + module, cls = path.rsplit(".", 1) + return getattr(importlib.import_module(module), cls) \ No newline at end of file diff --git a/playground/test.ipynb b/playground/Example_image_loading.ipynb similarity index 98% rename from playground/test.ipynb rename to playground/Example_image_loading.ipynb index 035ba20..74bee1f 100644 --- a/playground/test.ipynb +++ b/playground/Example_image_loading.ipynb @@ -3,14 +3,61 @@ { "metadata": {}, "cell_type": "markdown", - "source": "## Image Creation Example with DEIMOS detector configuration", - "id": "ba52ca83c18b38fa" + "source": [ + "# Image loading and visualization\n", + "\n", + "This notebook demonstrates how to create, inspect, and visualize detector images using the `ImageCreator`\n", + "task and the `DetImage` datamodel. It shows typical workflows: loading raw FITS, inspecting outputs,\n", + "combining detectors into a focal plane, and creating a master bias. Each section contains a brief\n", + "explanation and the minimal example code you need to reproduce the step." + ], + "id": "67f63ebd0bd5e7d0" + }, + { + "metadata": { + "ExecuteTime": { + "end_time": "2026-03-09T17:55:45.398348Z", + "start_time": "2026-03-09T17:55:45.391223Z" + } + }, + "cell_type": "markdown", + "source": [ + "Imports and setup\n", + " - Import only what is needed for the examples below.\n", + " - Ensure your `PYTHONPATH` includes the project root (or install the package) so imports resolve.\n", + " - Replace the example file paths with paths valid on your machine." + ], + "id": "d0ff2b3d37d8ae0c" + }, + { + "metadata": {}, + "cell_type": "markdown", + "source": [ + "## Example 1: Create images from raw files\n", + "\n", + "Use `ImageCreator` task to convert raw FITS input into high-level `DetImage` objects.\n", + "\n", + "Initializing `ImageCreator` requires a path to a detector configuration file that defines the structure of the raw data and how to parse it.\n", + "- Pass a `detector_config` to `ImageCreator` (path to a YAML/JSON file or a Python `dict`) to describe detector geometry and per-output metadata.\n", + "- The detector config controls:\n", + " - mapping of data in FITS extensions to detector/outputs of a detector,\n", + " - per-output information and metadata depending on the detector type, e.g. prescan/overscan slices,\n", + " - focal-plane placement\n", + "\n", + "The `run` method of `ImageCreator` accepts the raw data input and produces a dictionary of `DetImage` objects.\n", + "- `input_source` accepts a single file, list of files, directory, or glob pattern, OR arrays from buffer\n", + "- Optionally provide `identifier_func` to control how files are classified (bias, object, flat, ...)\n", + "- Optionally provide `fitsloader_func` to control how raw FITS files are read-in and add custom preprocessing logic\n", + "- The return value is a dictionary keyed by image type (e.g. `'object'`, `'bias'`) whose values are lists\n", + " of `DetImage` objects." + ], + "id": "b0de6835f8677212" }, { "metadata": { "ExecuteTime": { - "end_time": "2026-02-24T00:42:00.764649Z", - "start_time": "2026-02-24T00:41:56.737750Z" + "end_time": "2026-03-10T20:30:58.806243Z", + "start_time": "2026-03-10T20:30:54.998768Z" } }, "cell_type": "code", @@ -18,7 +65,6 @@ "from eregion.tasks.imagegen import ImageCreator\n", "# Initialize the ImageCreator with the detector configuration.\n", "creator = ImageCreator(detector_config='../eregion/configs/detectors/deimos.yaml')\n", - "# Run the image creation process. The input path can be a single FITS file, a list of FITS files, a directory containing FITS files, or a glob pattern.\n", "res = creator.run(input_source='../data/deimos_raw/d0223_0107_cti.fits.gz')" ], "id": "5ae90c36955d4557", @@ -27,38 +73,32 @@ "name": "stderr", "output_type": "stream", "text": [ - "2026-02-23 16:41:57,242 - image_creator - INFO - Item ../data/deimos_raw/d0223_0107_cti.fits.gz is a regular path string\n", - "2026-02-23 16:41:57,243 - image_creator - INFO - Processing file ../data/deimos_raw/d0223_0107_cti.fits.gz\n" - ] - }, - { - "name": "stdout", - "output_type": "stream", - "text": [ - "Found FITS file ../data/deimos_raw/d0223_0107_cti.fits.gz.\n" - ] - }, - { - "name": "stderr", - "output_type": "stream", - "text": [ + "2026-03-10 13:30:55,600 - DetectorConfig - INFO - Config loaded from file '../eregion/configs/detectors/deimos.yaml'\n", + "2026-03-10 13:30:55,600 - image_creator - INFO - Item ../data/deimos_raw/d0223_0107_cti.fits.gz is a regular path string\n", + "2026-03-10 13:30:55,601 - utils.io_utils - INFO - Found FITS file ../data/deimos_raw/d0223_0107_cti.fits.gz.\n", + "2026-03-10 13:30:55,601 - image_creator - INFO - Processing file ../data/deimos_raw/d0223_0107_cti.fits.gz\n", "WARNING: VerifyWarning: Invalid 'BLANK' keyword in header. The 'BLANK' keyword is only applicable to integer data, and will be ignored in this HDU. [astropy.io.fits.hdu.image]\n" ] } ], - "execution_count": 1 + "execution_count": 2 }, { "metadata": {}, "cell_type": "markdown", - "source": "Output of creator.run() is a dict containing image_type as keys and corresponding list of DetImage objects (or whatever image class is specified in the detector configuration) as values. The image_type is determined by the identifier function specified in the run parameters of the task (see task documentation for details).", + "source": [ + "`res` maps image type -> list of `DetImage`. Typical checks:\n", + "- Print available keys with `res.keys()`.\n", + "- Use `res['object'][0]` (or the appropriate key) to retrieve the first image.\n", + "- Each `DetImage` exposes `.data` (full image array) and `.outputs` (per-amplifier subframes)." + ], "id": "ceeebc91485589fa" }, { "metadata": { "ExecuteTime": { - "end_time": "2026-02-24T00:42:00.774898Z", - "start_time": "2026-02-24T00:42:00.770053Z" + "end_time": "2026-03-10T20:31:44.787128Z", + "start_time": "2026-03-10T20:31:44.782685Z" } }, "cell_type": "code", @@ -68,36 +108,37 @@ { "data": { "text/plain": [ - "{'object': [,\n", - " ,\n", - " ,\n", - " ,\n", - " ,\n", - " ,\n", - " ,\n", - " ]}" + "{'object': [,\n", + " ,\n", + " ,\n", + " ,\n", + " ,\n", + " ,\n", + " ,\n", + " ]}" ] }, - "execution_count": 2, + "execution_count": 8, "metadata": {}, "output_type": "execute_result" } ], - "execution_count": 2 + "execution_count": 8 }, { "metadata": { "ExecuteTime": { - "end_time": "2026-02-24T00:44:14.921258Z", - "start_time": "2026-02-24T00:44:13.962948Z" + "end_time": "2026-03-10T00:16:57.843108Z", + "start_time": "2026-03-10T00:16:57.070090Z" } }, "cell_type": "code", "source": [ "# Display the first DetImage from an input file\n", "det_image = res['object'][0]\n", + "\n", "# Show the DetImage using its built-in show method that takes kwargs for plt.imshow\n", - "## For example, to set the colormap to 'gray', and scale the values using vmin, vmax from ZScaleInterval:\n", + "# For example, to set the colormap to 'gray', and scale the values using vmin, vmax from ZScaleInterval:\n", "from astropy.visualization import ZScaleInterval\n", "interval = ZScaleInterval()\n", "vmin, vmax = interval.get_limits(det_image.data)\n", @@ -119,7 +160,7 @@ } } ], - "execution_count": 3 + "execution_count": 8 }, { "metadata": { @@ -130,17 +171,18 @@ }, "cell_type": "markdown", "source": [ - "Each DetImage object contains a dictionary of \"Output\" class objects in DetImage.outputs that correspond to different amplifier outputs. Output class contain attributes and methods specific to the Detector type.\n", - "\n", - "The keys of the outputs dictionary are the output \"ids\" defined in the detector configuration file." + "Each `DetImage` contains an `outputs` dict keyed by amplifier id (strings from the detector config).\n", + "An `Output` object typically contains:\n", + "- `.data`: the amplifier sub-image.\n", + "- Detector-type specific attributes like `.serial_prescan`, `.serial_overscan`, `.parallel_prescan`, `.parallel_overscan`: slices you can set or read in `CCDOutput`" ], "id": "d37d14cec7edc706" }, { "metadata": { "ExecuteTime": { - "end_time": "2026-02-24T00:45:29.317232Z", - "start_time": "2026-02-24T00:45:29.312909Z" + "end_time": "2026-03-10T20:30:42.493817Z", + "start_time": "2026-03-10T20:30:36.594528Z" } }, "cell_type": "code", @@ -148,23 +190,24 @@ "id": "a1d5bd390f4f5df6", "outputs": [ { - "data": { - "text/plain": [ - "dict_keys(['chan_1', 'chan_2'])" - ] - }, - "execution_count": 4, - "metadata": {}, - "output_type": "execute_result" + "ename": "NameError", + "evalue": "name 'det_image' is not defined", + "output_type": "error", + "traceback": [ + "\u001B[31m---------------------------------------------------------------------------\u001B[39m", + "\u001B[31mNameError\u001B[39m Traceback (most recent call last)", + "\u001B[36mCell\u001B[39m\u001B[36m \u001B[39m\u001B[32mIn[1]\u001B[39m\u001B[32m, line 1\u001B[39m\n\u001B[32m----> \u001B[39m\u001B[32m1\u001B[39m \u001B[43mdet_image\u001B[49m.outputs.keys()\n", + "\u001B[31mNameError\u001B[39m: name 'det_image' is not defined" + ] } ], - "execution_count": 4 + "execution_count": 1 }, { "metadata": { "ExecuteTime": { - "end_time": "2026-02-24T00:46:05.843306Z", - "start_time": "2026-02-24T00:46:05.400289Z" + "end_time": "2026-03-10T00:17:00.204200Z", + "start_time": "2026-03-10T00:16:59.759497Z" } }, "cell_type": "code", @@ -172,7 +215,9 @@ "# Plotting the first Output\n", "output = det_image.outputs['chan_1']\n", "vmin, vmax = interval.get_limits(output.data)\n", - "## Let's say the serial prescan and overscan regions are first and last 100 columns respectively (these are set in the detector config file and can be accessed as attributes of the Output object)\n", + "\n", + "# Let's say the serial prescan and overscan regions are first and last 100 columns respectively (only for demo)\n", + "# (these are set in the detector config file and can be accessed as attributes of the Output object)\n", "output.serial_prescan = slice(0, 100)\n", "output.serial_overscan = slice(1024-100, 1024)\n", "## Same for parallel prescan and overscan regions, first and last 100 rows respectively\n", @@ -196,7 +241,7 @@ } } ], - "execution_count": 5 + "execution_count": 10 }, { "metadata": { @@ -207,9 +252,13 @@ }, "cell_type": "markdown", "source": [ - "The DEIMOS raw FITS used above contains 8 extensions, corresponding to 8 detectors in the focal plane, each detector with two outputs.\n", + "## Building a focal plane image\n", + "\n", + "`FocalPlaneImage` combines multiple `DetImage` instances into a single array representing the focal plane.\n", + "- Provide: number of detectors, full dimension, and the list of `DetImage`s.\n", + "- The resulting object contains a consolidated `.data` array and a `frames_df` describing detector placement.\n", "\n", - "FocalPlaneImage() class can be used to combine multiple DetImage objects into a single focal plane representation.\n", + "The DEIMOS raw FITS used above contains 8 extensions, corresponding to 8 detectors in the focal plane, each detector with two outputs.\n", "\n", "The focal plane position information needs to be specified in the detector configuration file." ], @@ -218,8 +267,8 @@ { "metadata": { "ExecuteTime": { - "end_time": "2026-02-24T00:46:31.348708Z", - "start_time": "2026-02-24T00:46:31.263432Z" + "end_time": "2026-03-09T21:39:27.770699Z", + "start_time": "2026-03-09T21:39:27.679065Z" } }, "cell_type": "code", @@ -229,17 +278,22 @@ ], "id": "2b5cab4588ea587c", "outputs": [], - "execution_count": 7 + "execution_count": 6 }, { "metadata": { "ExecuteTime": { - "end_time": "2026-02-24T00:46:33.321298Z", - "start_time": "2026-02-24T00:46:33.309749Z" + "end_time": "2026-03-09T21:39:28.338307Z", + "start_time": "2026-03-09T21:39:28.325529Z" } }, "cell_type": "code", - "source": "fpimage.frames_df", + "source": [ + "# Focal plane usage tips\n", + "# - Ensure `num_detectors` and `dim` reflect your detector layout.\n", + "# - Use `frames_df` to verify that detector positions were applied correctly.\n", + "fpimage.frames_df" + ], "id": "35a9895070353a09", "outputs": [ { @@ -397,18 +451,18 @@ "" ] }, - "execution_count": 8, + "execution_count": 7, "metadata": {}, "output_type": "execute_result" } ], - "execution_count": 8 + "execution_count": 7 }, { "metadata": { "ExecuteTime": { - "end_time": "2026-02-24T00:46:42.684682Z", - "start_time": "2026-02-24T00:46:36.342601Z" + "end_time": "2026-03-09T21:39:37.002223Z", + "start_time": "2026-03-09T21:39:29.393824Z" } }, "cell_type": "code", @@ -424,7 +478,7 @@ "" ] }, - "execution_count": 9, + "execution_count": 8, "metadata": {}, "output_type": "execute_result" }, @@ -442,31 +496,30 @@ } } ], - "execution_count": 9 + "execution_count": 8 }, { "metadata": {}, "cell_type": "markdown", - "source": "## Image creation example with single detector DEIMOS configuration", + "source": "## Example 2: Image loading with custom identifier function", "id": "f3849f9a6dbb6e9e" }, { "metadata": { "ExecuteTime": { - "end_time": "2026-02-24T00:48:32.724307Z", - "start_time": "2026-02-24T00:48:31.870473Z" + "end_time": "2026-03-09T21:39:51.080354Z", + "start_time": "2026-03-09T21:39:50.624258Z" } }, "cell_type": "code", "source": [ "from eregion.tasks.imagegen import ImageCreator\n", "creator = ImageCreator(detector_config='../eregion/configs/detectors/deimos_singledet.yaml')\n", - "# load custom identifier function to identify bias frames based on filename\n", - "from eregion.tasks.custom import guess_image_type_from_filename_DEIMOS\n", "\n", + "# load custom identifier function to identify bias frames based on filename, supply as identifier_func to ImageCreator.run\n", + "from eregion.tasks.custom import guess_image_type_from_filename_DEIMOS\n", "res = creator.run(input_source='/Users/yashvi/Desktop/Detector Characterization Tools/DTU_dettest/DTU_singledet_acceptance/PTC/SCI/20250812-101350/*_bias_0*.fits',\n", - " identifier_func=guess_image_type_from_filename_DEIMOS)\n", - "res" + " identifier_func=guess_image_type_from_filename_DEIMOS)" ], "id": "5edf36e4d279bb64", "outputs": [ @@ -474,33 +527,20 @@ "name": "stderr", "output_type": "stream", "text": [ - "2026-02-23 16:48:31,878 - image_creator - INFO - Item /Users/yashvi/Desktop/Detector Characterization Tools/DTU_dettest/DTU_singledet_acceptance/PTC/SCI/20250812-101350/*_bias_0*.fits is a glob pattern\n", - "2026-02-23 16:48:31,878 - image_creator - INFO - Item /Users/yashvi/Desktop/Detector Characterization Tools/DTU_dettest/DTU_singledet_acceptance/PTC/SCI/20250812-101350/*_bias_0*.fits is a glob pattern\n", - "2026-02-23 16:48:31,878 - image_creator - INFO - Item /Users/yashvi/Desktop/Detector Characterization Tools/DTU_dettest/DTU_singledet_acceptance/PTC/SCI/20250812-101350/*_bias_0*.fits is a glob pattern\n", - "2026-02-23 16:48:31,880 - image_creator - INFO - Processing file /Users/yashvi/Desktop/Detector Characterization Tools/DTU_dettest/DTU_singledet_acceptance/PTC/SCI/20250812-101350/DTU_DT-SCI_2_bias_0_2025-08-12T101355.400.fits\n", - "2026-02-23 16:48:31,880 - image_creator - INFO - Processing file /Users/yashvi/Desktop/Detector Characterization Tools/DTU_dettest/DTU_singledet_acceptance/PTC/SCI/20250812-101350/DTU_DT-SCI_2_bias_0_2025-08-12T101355.400.fits\n", - "2026-02-23 16:48:31,880 - image_creator - INFO - Processing file /Users/yashvi/Desktop/Detector Characterization Tools/DTU_dettest/DTU_singledet_acceptance/PTC/SCI/20250812-101350/DTU_DT-SCI_2_bias_0_2025-08-12T101355.400.fits\n" + "2026-03-09 14:39:50,632 - DetectorConfig - INFO - Config loaded from file '../eregion/configs/detectors/deimos_singledet.yaml'\n", + "2026-03-09 14:39:50,632 - DetectorConfig - INFO - Config loaded from file '../eregion/configs/detectors/deimos_singledet.yaml'\n", + "2026-03-09 14:39:50,632 - DetectorConfig - INFO - Config loaded from file '../eregion/configs/detectors/deimos_singledet.yaml'\n", + "2026-03-09 14:39:50,634 - image_creator - INFO - Item /Users/yashvi/Desktop/Detector Characterization Tools/DTU_dettest/DTU_singledet_acceptance/PTC/SCI/20250812-101350/*_bias_0*.fits is a glob pattern\n", + "2026-03-09 14:39:50,634 - image_creator - INFO - Item /Users/yashvi/Desktop/Detector Characterization Tools/DTU_dettest/DTU_singledet_acceptance/PTC/SCI/20250812-101350/*_bias_0*.fits is a glob pattern\n", + "2026-03-09 14:39:50,634 - image_creator - INFO - Item /Users/yashvi/Desktop/Detector Characterization Tools/DTU_dettest/DTU_singledet_acceptance/PTC/SCI/20250812-101350/*_bias_0*.fits is a glob pattern\n", + "2026-03-09 14:39:50,636 - utils.io_utils - INFO - Found FITS file /Users/yashvi/Desktop/Detector Characterization Tools/DTU_dettest/DTU_singledet_acceptance/PTC/SCI/20250812-101350/DTU_DT-SCI_2_bias_0_2025-08-12T101355.400.fits.\n", + "2026-03-09 14:39:50,637 - image_creator - INFO - Processing file /Users/yashvi/Desktop/Detector Characterization Tools/DTU_dettest/DTU_singledet_acceptance/PTC/SCI/20250812-101350/DTU_DT-SCI_2_bias_0_2025-08-12T101355.400.fits\n", + "2026-03-09 14:39:50,637 - image_creator - INFO - Processing file /Users/yashvi/Desktop/Detector Characterization Tools/DTU_dettest/DTU_singledet_acceptance/PTC/SCI/20250812-101350/DTU_DT-SCI_2_bias_0_2025-08-12T101355.400.fits\n", + "2026-03-09 14:39:50,637 - image_creator - INFO - Processing file /Users/yashvi/Desktop/Detector Characterization Tools/DTU_dettest/DTU_singledet_acceptance/PTC/SCI/20250812-101350/DTU_DT-SCI_2_bias_0_2025-08-12T101355.400.fits\n" ] - }, - { - "name": "stdout", - "output_type": "stream", - "text": [ - "Found FITS file /Users/yashvi/Desktop/Detector Characterization Tools/DTU_dettest/DTU_singledet_acceptance/PTC/SCI/20250812-101350/DTU_DT-SCI_2_bias_0_2025-08-12T101355.400.fits.\n" - ] - }, - { - "data": { - "text/plain": [ - "{'bias': []}" - ] - }, - "execution_count": 11, - "metadata": {}, - "output_type": "execute_result" } ], - "execution_count": 11 + "execution_count": 10 }, { "metadata": { @@ -510,14 +550,38 @@ } }, "cell_type": "markdown", - "source": "ImageCreator's output is a dict with image types as keys and list of DetImage objects as values. In this case, the identifier function identifies the input files as bias frames and assigns them the image type 'bias'.", + "source": "Check the keys in the result to confirm bias images were identified and loaded correctly.", "id": "68dca1f460fc11ec" }, { "metadata": { "ExecuteTime": { - "end_time": "2026-02-24T00:49:17.914560Z", - "start_time": "2026-02-24T00:49:17.132488Z" + "end_time": "2026-03-09T21:39:55.524750Z", + "start_time": "2026-03-09T21:39:55.522178Z" + } + }, + "cell_type": "code", + "source": "res.keys(), len(res['bias'])", + "id": "5c714e16e6d1d41f", + "outputs": [ + { + "data": { + "text/plain": [ + "(dict_keys(['bias']), 1)" + ] + }, + "execution_count": 11, + "metadata": {}, + "output_type": "execute_result" + } + ], + "execution_count": 11 + }, + { + "metadata": { + "ExecuteTime": { + "end_time": "2026-03-09T21:39:57.091114Z", + "start_time": "2026-03-09T21:39:56.320513Z" } }, "cell_type": "code", @@ -550,45 +614,29 @@ ], "execution_count": 12 }, - { - "metadata": { - "ExecuteTime": { - "end_time": "2026-02-24T00:49:25.476060Z", - "start_time": "2026-02-24T00:49:25.474330Z" - } - }, - "cell_type": "code", - "source": "", - "id": "67320cac943e25f1", - "outputs": [], - "execution_count": null - }, { "metadata": {}, "cell_type": "markdown", - "source": "### Master Bias Creation Example using tasks directly", + "source": "## Example 3: Create master bias using tasks directly in the notebook", "id": "83482c51ffcfc722" }, { "metadata": { "ExecuteTime": { - "end_time": "2026-02-24T00:50:44.474318Z", - "start_time": "2026-02-24T00:50:37.124332Z" + "end_time": "2026-03-09T21:49:39.958212Z", + "start_time": "2026-03-09T21:49:33.068998Z" } }, "cell_type": "code", "source": [ "from eregion.tasks.imagegen import ImageCreator\n", "from eregion.tasks.calibration import MasterBias\n", + "from eregion.tasks.custom import guess_image_type_from_filename_DEIMOS\n", "\n", "# Create image creator and run it to get bias images\n", "creator = ImageCreator(detector_config='../eregion/configs/detectors/deimos_singledet.yaml')\n", "res = creator.run(input_source='/Users/yashvi/Desktop/Detector Characterization Tools/DTU_dettest/DTU_singledet_acceptance/PTC/SCI/20250812-101350/*_bias_*.fits',\n", - " identifier_func=guess_image_type_from_filename_DEIMOS)\n", - "\n", - "# Create master bias task and run it\n", - "master_bias_task = MasterBias(method='median')\n", - "mb_res = master_bias_task.run(bias_images=res['bias'])" + " identifier_func=guess_image_type_from_filename_DEIMOS)" ], "id": "8273699a2ddbca87", "outputs": [ @@ -596,76 +644,110 @@ "name": "stderr", "output_type": "stream", "text": [ - "2026-02-23 16:50:37,133 - image_creator - INFO - Item /Users/yashvi/Desktop/Detector Characterization Tools/DTU_dettest/DTU_singledet_acceptance/PTC/SCI/20250812-101350/*_bias_*.fits is a glob pattern\n", - "2026-02-23 16:50:37,133 - image_creator - INFO - Item /Users/yashvi/Desktop/Detector Characterization Tools/DTU_dettest/DTU_singledet_acceptance/PTC/SCI/20250812-101350/*_bias_*.fits is a glob pattern\n", - "2026-02-23 16:50:37,133 - image_creator - INFO - Item /Users/yashvi/Desktop/Detector Characterization Tools/DTU_dettest/DTU_singledet_acceptance/PTC/SCI/20250812-101350/*_bias_*.fits is a glob pattern\n", - "2026-02-23 16:50:37,133 - image_creator - INFO - Item /Users/yashvi/Desktop/Detector Characterization Tools/DTU_dettest/DTU_singledet_acceptance/PTC/SCI/20250812-101350/*_bias_*.fits is a glob pattern\n", - "2026-02-23 16:50:37,139 - image_creator - INFO - Processing file /Users/yashvi/Desktop/Detector Characterization Tools/DTU_dettest/DTU_singledet_acceptance/PTC/SCI/20250812-101350/DTU_DT-SCI_2_bias_0_2025-08-12T101355.400.fits\n", - "2026-02-23 16:50:37,139 - image_creator - INFO - Processing file /Users/yashvi/Desktop/Detector Characterization Tools/DTU_dettest/DTU_singledet_acceptance/PTC/SCI/20250812-101350/DTU_DT-SCI_2_bias_0_2025-08-12T101355.400.fits\n", - "2026-02-23 16:50:37,139 - image_creator - INFO - Processing file /Users/yashvi/Desktop/Detector Characterization Tools/DTU_dettest/DTU_singledet_acceptance/PTC/SCI/20250812-101350/DTU_DT-SCI_2_bias_0_2025-08-12T101355.400.fits\n", - "2026-02-23 16:50:37,139 - image_creator - INFO - Processing file /Users/yashvi/Desktop/Detector Characterization Tools/DTU_dettest/DTU_singledet_acceptance/PTC/SCI/20250812-101350/DTU_DT-SCI_2_bias_0_2025-08-12T101355.400.fits\n" + "2026-03-09 14:49:33,565 - DetectorConfig - INFO - Config loaded from file '../eregion/configs/detectors/deimos_singledet.yaml'\n", + "2026-03-09 14:49:33,566 - image_creator - INFO - Item /Users/yashvi/Desktop/Detector Characterization Tools/DTU_dettest/DTU_singledet_acceptance/PTC/SCI/20250812-101350/*_bias_*.fits is a glob pattern\n", + "2026-03-09 14:49:33,567 - utils.io_utils - INFO - Found FITS file /Users/yashvi/Desktop/Detector Characterization Tools/DTU_dettest/DTU_singledet_acceptance/PTC/SCI/20250812-101350/DTU_DT-SCI_2_bias_2_2025-08-12T101455.893.fits.\n", + "2026-03-09 14:49:33,567 - utils.io_utils - INFO - Found FITS file /Users/yashvi/Desktop/Detector Characterization Tools/DTU_dettest/DTU_singledet_acceptance/PTC/SCI/20250812-101350/DTU_DT-SCI_2_bias_5_2025-08-12T101626.632.fits.\n", + "2026-03-09 14:49:33,567 - utils.io_utils - INFO - Found FITS file /Users/yashvi/Desktop/Detector Characterization Tools/DTU_dettest/DTU_singledet_acceptance/PTC/SCI/20250812-101350/DTU_DT-SCI_2_bias_4_2025-08-12T101556.386.fits.\n", + "2026-03-09 14:49:33,568 - utils.io_utils - INFO - Found FITS file /Users/yashvi/Desktop/Detector Characterization Tools/DTU_dettest/DTU_singledet_acceptance/PTC/SCI/20250812-101350/DTU_DT-SCI_2_bias_3_2025-08-12T101526.139.fits.\n", + "2026-03-09 14:49:33,569 - utils.io_utils - INFO - Found FITS file /Users/yashvi/Desktop/Detector Characterization Tools/DTU_dettest/DTU_singledet_acceptance/PTC/SCI/20250812-101350/DTU_DT-SCI_2_bias_9_2025-08-12T101827.618.fits.\n", + "2026-03-09 14:49:33,570 - utils.io_utils - INFO - Found FITS file /Users/yashvi/Desktop/Detector Characterization Tools/DTU_dettest/DTU_singledet_acceptance/PTC/SCI/20250812-101350/DTU_DT-SCI_2_bias_0_2025-08-12T101355.400.fits.\n", + "2026-03-09 14:49:33,570 - utils.io_utils - INFO - Found FITS file /Users/yashvi/Desktop/Detector Characterization Tools/DTU_dettest/DTU_singledet_acceptance/PTC/SCI/20250812-101350/DTU_DT-SCI_2_bias_6_2025-08-12T101656.879.fits.\n", + "2026-03-09 14:49:33,571 - utils.io_utils - INFO - Found FITS file /Users/yashvi/Desktop/Detector Characterization Tools/DTU_dettest/DTU_singledet_acceptance/PTC/SCI/20250812-101350/DTU_DT-SCI_2_bias_1_2025-08-12T101425.647.fits.\n", + "2026-03-09 14:49:33,572 - utils.io_utils - INFO - Found FITS file /Users/yashvi/Desktop/Detector Characterization Tools/DTU_dettest/DTU_singledet_acceptance/PTC/SCI/20250812-101350/DTU_DT-SCI_2_bias_7_2025-08-12T101727.125.fits.\n", + "2026-03-09 14:49:33,572 - utils.io_utils - INFO - Found FITS file /Users/yashvi/Desktop/Detector Characterization Tools/DTU_dettest/DTU_singledet_acceptance/PTC/SCI/20250812-101350/DTU_DT-SCI_2_bias_8_2025-08-12T101757.371.fits.\n", + "2026-03-09 14:49:33,573 - image_creator - INFO - Processing file /Users/yashvi/Desktop/Detector Characterization Tools/DTU_dettest/DTU_singledet_acceptance/PTC/SCI/20250812-101350/DTU_DT-SCI_2_bias_0_2025-08-12T101355.400.fits\n", + "2026-03-09 14:49:34,493 - image_creator - INFO - Processing file /Users/yashvi/Desktop/Detector Characterization Tools/DTU_dettest/DTU_singledet_acceptance/PTC/SCI/20250812-101350/DTU_DT-SCI_2_bias_1_2025-08-12T101425.647.fits\n", + "2026-03-09 14:49:35,344 - image_creator - INFO - Processing file /Users/yashvi/Desktop/Detector Characterization Tools/DTU_dettest/DTU_singledet_acceptance/PTC/SCI/20250812-101350/DTU_DT-SCI_2_bias_2_2025-08-12T101455.893.fits\n", + "2026-03-09 14:49:36,163 - image_creator - INFO - Processing file /Users/yashvi/Desktop/Detector Characterization Tools/DTU_dettest/DTU_singledet_acceptance/PTC/SCI/20250812-101350/DTU_DT-SCI_2_bias_3_2025-08-12T101526.139.fits\n", + "2026-03-09 14:49:36,587 - image_creator - INFO - Processing file /Users/yashvi/Desktop/Detector Characterization Tools/DTU_dettest/DTU_singledet_acceptance/PTC/SCI/20250812-101350/DTU_DT-SCI_2_bias_4_2025-08-12T101556.386.fits\n", + "2026-03-09 14:49:37,006 - image_creator - INFO - Processing file /Users/yashvi/Desktop/Detector Characterization Tools/DTU_dettest/DTU_singledet_acceptance/PTC/SCI/20250812-101350/DTU_DT-SCI_2_bias_5_2025-08-12T101626.632.fits\n", + "2026-03-09 14:49:37,817 - image_creator - INFO - Processing file /Users/yashvi/Desktop/Detector Characterization Tools/DTU_dettest/DTU_singledet_acceptance/PTC/SCI/20250812-101350/DTU_DT-SCI_2_bias_6_2025-08-12T101656.879.fits\n", + "2026-03-09 14:49:38,257 - image_creator - INFO - Processing file /Users/yashvi/Desktop/Detector Characterization Tools/DTU_dettest/DTU_singledet_acceptance/PTC/SCI/20250812-101350/DTU_DT-SCI_2_bias_7_2025-08-12T101727.125.fits\n", + "2026-03-09 14:49:39,094 - image_creator - INFO - Processing file /Users/yashvi/Desktop/Detector Characterization Tools/DTU_dettest/DTU_singledet_acceptance/PTC/SCI/20250812-101350/DTU_DT-SCI_2_bias_8_2025-08-12T101757.371.fits\n", + "2026-03-09 14:49:39,529 - image_creator - INFO - Processing file /Users/yashvi/Desktop/Detector Characterization Tools/DTU_dettest/DTU_singledet_acceptance/PTC/SCI/20250812-101350/DTU_DT-SCI_2_bias_9_2025-08-12T101827.618.fits\n" ] - }, + } + ], + "execution_count": 1 + }, + { + "metadata": { + "ExecuteTime": { + "end_time": "2026-03-09T21:49:39.968707Z", + "start_time": "2026-03-09T21:49:39.964368Z" + } + }, + "cell_type": "code", + "source": "res.keys(), len(res['bias'])", + "id": "74e15828aa01fb76", + "outputs": [ { - "name": "stdout", - "output_type": "stream", - "text": [ - "Found FITS file /Users/yashvi/Desktop/Detector Characterization Tools/DTU_dettest/DTU_singledet_acceptance/PTC/SCI/20250812-101350/DTU_DT-SCI_2_bias_2_2025-08-12T101455.893.fits.\n", - "Found FITS file /Users/yashvi/Desktop/Detector Characterization Tools/DTU_dettest/DTU_singledet_acceptance/PTC/SCI/20250812-101350/DTU_DT-SCI_2_bias_5_2025-08-12T101626.632.fits.\n", - "Found FITS file /Users/yashvi/Desktop/Detector Characterization Tools/DTU_dettest/DTU_singledet_acceptance/PTC/SCI/20250812-101350/DTU_DT-SCI_2_bias_4_2025-08-12T101556.386.fits.\n", - "Found FITS file /Users/yashvi/Desktop/Detector Characterization Tools/DTU_dettest/DTU_singledet_acceptance/PTC/SCI/20250812-101350/DTU_DT-SCI_2_bias_3_2025-08-12T101526.139.fits.\n", - "Found FITS file /Users/yashvi/Desktop/Detector Characterization Tools/DTU_dettest/DTU_singledet_acceptance/PTC/SCI/20250812-101350/DTU_DT-SCI_2_bias_9_2025-08-12T101827.618.fits.\n", - "Found FITS file /Users/yashvi/Desktop/Detector Characterization Tools/DTU_dettest/DTU_singledet_acceptance/PTC/SCI/20250812-101350/DTU_DT-SCI_2_bias_0_2025-08-12T101355.400.fits.\n", - "Found FITS file /Users/yashvi/Desktop/Detector Characterization Tools/DTU_dettest/DTU_singledet_acceptance/PTC/SCI/20250812-101350/DTU_DT-SCI_2_bias_6_2025-08-12T101656.879.fits.\n", - "Found FITS file /Users/yashvi/Desktop/Detector Characterization Tools/DTU_dettest/DTU_singledet_acceptance/PTC/SCI/20250812-101350/DTU_DT-SCI_2_bias_1_2025-08-12T101425.647.fits.\n", - "Found FITS file /Users/yashvi/Desktop/Detector Characterization Tools/DTU_dettest/DTU_singledet_acceptance/PTC/SCI/20250812-101350/DTU_DT-SCI_2_bias_7_2025-08-12T101727.125.fits.\n", - "Found FITS file /Users/yashvi/Desktop/Detector Characterization Tools/DTU_dettest/DTU_singledet_acceptance/PTC/SCI/20250812-101350/DTU_DT-SCI_2_bias_8_2025-08-12T101757.371.fits.\n" - ] - }, + "data": { + "text/plain": [ + "(dict_keys(['bias']), 10)" + ] + }, + "execution_count": 2, + "metadata": {}, + "output_type": "execute_result" + } + ], + "execution_count": 2 + }, + { + "metadata": {}, + "cell_type": "markdown", + "source": [ + "Use the `MasterBias` task to combine many bias frames into a single master bias frame.\n", + "- Typical method: `'median'` or `'mean'`.\n", + "- Input: a list of bias `DetImage` objects, e.g. `res['bias']`.\n", + "- Output: a dict (e.g. key `'master_biases'`) containing `DetImage` objects.\n", + "- Inspect the result and visualize with the same `show()` helpers." + ], + "id": "49ce336dbc65b64d" + }, + { + "metadata": { + "ExecuteTime": { + "end_time": "2026-03-09T21:49:39.987532Z", + "start_time": "2026-03-09T21:49:39.975404Z" + } + }, + "cell_type": "code", + "source": [ + "# To get the list of available methods for any task, use Task.print_methods()\n", + "MasterBias().print_methods()" + ], + "id": "a2991aa80780bd25", + "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ - "2026-02-23 16:50:37,987 - image_creator - INFO - Processing file /Users/yashvi/Desktop/Detector Characterization Tools/DTU_dettest/DTU_singledet_acceptance/PTC/SCI/20250812-101350/DTU_DT-SCI_2_bias_1_2025-08-12T101425.647.fits\n", - "2026-02-23 16:50:37,987 - image_creator - INFO - Processing file /Users/yashvi/Desktop/Detector Characterization Tools/DTU_dettest/DTU_singledet_acceptance/PTC/SCI/20250812-101350/DTU_DT-SCI_2_bias_1_2025-08-12T101425.647.fits\n", - "2026-02-23 16:50:37,987 - image_creator - INFO - Processing file /Users/yashvi/Desktop/Detector Characterization Tools/DTU_dettest/DTU_singledet_acceptance/PTC/SCI/20250812-101350/DTU_DT-SCI_2_bias_1_2025-08-12T101425.647.fits\n", - "2026-02-23 16:50:37,987 - image_creator - INFO - Processing file /Users/yashvi/Desktop/Detector Characterization Tools/DTU_dettest/DTU_singledet_acceptance/PTC/SCI/20250812-101350/DTU_DT-SCI_2_bias_1_2025-08-12T101425.647.fits\n", - "2026-02-23 16:50:38,419 - image_creator - INFO - Processing file /Users/yashvi/Desktop/Detector Characterization Tools/DTU_dettest/DTU_singledet_acceptance/PTC/SCI/20250812-101350/DTU_DT-SCI_2_bias_2_2025-08-12T101455.893.fits\n", - "2026-02-23 16:50:38,419 - image_creator - INFO - Processing file /Users/yashvi/Desktop/Detector Characterization Tools/DTU_dettest/DTU_singledet_acceptance/PTC/SCI/20250812-101350/DTU_DT-SCI_2_bias_2_2025-08-12T101455.893.fits\n", - "2026-02-23 16:50:38,419 - image_creator - INFO - Processing file /Users/yashvi/Desktop/Detector Characterization Tools/DTU_dettest/DTU_singledet_acceptance/PTC/SCI/20250812-101350/DTU_DT-SCI_2_bias_2_2025-08-12T101455.893.fits\n", - "2026-02-23 16:50:38,419 - image_creator - INFO - Processing file /Users/yashvi/Desktop/Detector Characterization Tools/DTU_dettest/DTU_singledet_acceptance/PTC/SCI/20250812-101350/DTU_DT-SCI_2_bias_2_2025-08-12T101455.893.fits\n", - "2026-02-23 16:50:38,850 - image_creator - INFO - Processing file /Users/yashvi/Desktop/Detector Characterization Tools/DTU_dettest/DTU_singledet_acceptance/PTC/SCI/20250812-101350/DTU_DT-SCI_2_bias_3_2025-08-12T101526.139.fits\n", - "2026-02-23 16:50:38,850 - image_creator - INFO - Processing file /Users/yashvi/Desktop/Detector Characterization Tools/DTU_dettest/DTU_singledet_acceptance/PTC/SCI/20250812-101350/DTU_DT-SCI_2_bias_3_2025-08-12T101526.139.fits\n", - "2026-02-23 16:50:38,850 - image_creator - INFO - Processing file /Users/yashvi/Desktop/Detector Characterization Tools/DTU_dettest/DTU_singledet_acceptance/PTC/SCI/20250812-101350/DTU_DT-SCI_2_bias_3_2025-08-12T101526.139.fits\n", - "2026-02-23 16:50:38,850 - image_creator - INFO - Processing file /Users/yashvi/Desktop/Detector Characterization Tools/DTU_dettest/DTU_singledet_acceptance/PTC/SCI/20250812-101350/DTU_DT-SCI_2_bias_3_2025-08-12T101526.139.fits\n", - "2026-02-23 16:50:39,668 - image_creator - INFO - Processing file /Users/yashvi/Desktop/Detector Characterization Tools/DTU_dettest/DTU_singledet_acceptance/PTC/SCI/20250812-101350/DTU_DT-SCI_2_bias_4_2025-08-12T101556.386.fits\n", - "2026-02-23 16:50:39,668 - image_creator - INFO - Processing file /Users/yashvi/Desktop/Detector Characterization Tools/DTU_dettest/DTU_singledet_acceptance/PTC/SCI/20250812-101350/DTU_DT-SCI_2_bias_4_2025-08-12T101556.386.fits\n", - "2026-02-23 16:50:39,668 - image_creator - INFO - Processing file /Users/yashvi/Desktop/Detector Characterization Tools/DTU_dettest/DTU_singledet_acceptance/PTC/SCI/20250812-101350/DTU_DT-SCI_2_bias_4_2025-08-12T101556.386.fits\n", - "2026-02-23 16:50:39,668 - image_creator - INFO - Processing file /Users/yashvi/Desktop/Detector Characterization Tools/DTU_dettest/DTU_singledet_acceptance/PTC/SCI/20250812-101350/DTU_DT-SCI_2_bias_4_2025-08-12T101556.386.fits\n", - "2026-02-23 16:50:40,476 - image_creator - INFO - Processing file /Users/yashvi/Desktop/Detector Characterization Tools/DTU_dettest/DTU_singledet_acceptance/PTC/SCI/20250812-101350/DTU_DT-SCI_2_bias_5_2025-08-12T101626.632.fits\n", - "2026-02-23 16:50:40,476 - image_creator - INFO - Processing file /Users/yashvi/Desktop/Detector Characterization Tools/DTU_dettest/DTU_singledet_acceptance/PTC/SCI/20250812-101350/DTU_DT-SCI_2_bias_5_2025-08-12T101626.632.fits\n", - "2026-02-23 16:50:40,476 - image_creator - INFO - Processing file /Users/yashvi/Desktop/Detector Characterization Tools/DTU_dettest/DTU_singledet_acceptance/PTC/SCI/20250812-101350/DTU_DT-SCI_2_bias_5_2025-08-12T101626.632.fits\n", - "2026-02-23 16:50:40,476 - image_creator - INFO - Processing file /Users/yashvi/Desktop/Detector Characterization Tools/DTU_dettest/DTU_singledet_acceptance/PTC/SCI/20250812-101350/DTU_DT-SCI_2_bias_5_2025-08-12T101626.632.fits\n", - "2026-02-23 16:50:40,903 - image_creator - INFO - Processing file /Users/yashvi/Desktop/Detector Characterization Tools/DTU_dettest/DTU_singledet_acceptance/PTC/SCI/20250812-101350/DTU_DT-SCI_2_bias_6_2025-08-12T101656.879.fits\n", - "2026-02-23 16:50:40,903 - image_creator - INFO - Processing file /Users/yashvi/Desktop/Detector Characterization Tools/DTU_dettest/DTU_singledet_acceptance/PTC/SCI/20250812-101350/DTU_DT-SCI_2_bias_6_2025-08-12T101656.879.fits\n", - "2026-02-23 16:50:40,903 - image_creator - INFO - Processing file /Users/yashvi/Desktop/Detector Characterization Tools/DTU_dettest/DTU_singledet_acceptance/PTC/SCI/20250812-101350/DTU_DT-SCI_2_bias_6_2025-08-12T101656.879.fits\n", - "2026-02-23 16:50:40,903 - image_creator - INFO - Processing file /Users/yashvi/Desktop/Detector Characterization Tools/DTU_dettest/DTU_singledet_acceptance/PTC/SCI/20250812-101350/DTU_DT-SCI_2_bias_6_2025-08-12T101656.879.fits\n", - "2026-02-23 16:50:41,316 - image_creator - INFO - Processing file /Users/yashvi/Desktop/Detector Characterization Tools/DTU_dettest/DTU_singledet_acceptance/PTC/SCI/20250812-101350/DTU_DT-SCI_2_bias_7_2025-08-12T101727.125.fits\n", - "2026-02-23 16:50:41,316 - image_creator - INFO - Processing file /Users/yashvi/Desktop/Detector Characterization Tools/DTU_dettest/DTU_singledet_acceptance/PTC/SCI/20250812-101350/DTU_DT-SCI_2_bias_7_2025-08-12T101727.125.fits\n", - "2026-02-23 16:50:41,316 - image_creator - INFO - Processing file /Users/yashvi/Desktop/Detector Characterization Tools/DTU_dettest/DTU_singledet_acceptance/PTC/SCI/20250812-101350/DTU_DT-SCI_2_bias_7_2025-08-12T101727.125.fits\n", - "2026-02-23 16:50:41,316 - image_creator - INFO - Processing file /Users/yashvi/Desktop/Detector Characterization Tools/DTU_dettest/DTU_singledet_acceptance/PTC/SCI/20250812-101350/DTU_DT-SCI_2_bias_7_2025-08-12T101727.125.fits\n", - "2026-02-23 16:50:42,192 - image_creator - INFO - Processing file /Users/yashvi/Desktop/Detector Characterization Tools/DTU_dettest/DTU_singledet_acceptance/PTC/SCI/20250812-101350/DTU_DT-SCI_2_bias_8_2025-08-12T101757.371.fits\n", - "2026-02-23 16:50:42,192 - image_creator - INFO - Processing file /Users/yashvi/Desktop/Detector Characterization Tools/DTU_dettest/DTU_singledet_acceptance/PTC/SCI/20250812-101350/DTU_DT-SCI_2_bias_8_2025-08-12T101757.371.fits\n", - "2026-02-23 16:50:42,192 - image_creator - INFO - Processing file /Users/yashvi/Desktop/Detector Characterization Tools/DTU_dettest/DTU_singledet_acceptance/PTC/SCI/20250812-101350/DTU_DT-SCI_2_bias_8_2025-08-12T101757.371.fits\n", - "2026-02-23 16:50:42,192 - image_creator - INFO - Processing file /Users/yashvi/Desktop/Detector Characterization Tools/DTU_dettest/DTU_singledet_acceptance/PTC/SCI/20250812-101350/DTU_DT-SCI_2_bias_8_2025-08-12T101757.371.fits\n", - "2026-02-23 16:50:42,658 - image_creator - INFO - Processing file /Users/yashvi/Desktop/Detector Characterization Tools/DTU_dettest/DTU_singledet_acceptance/PTC/SCI/20250812-101350/DTU_DT-SCI_2_bias_9_2025-08-12T101827.618.fits\n", - "2026-02-23 16:50:42,658 - image_creator - INFO - Processing file /Users/yashvi/Desktop/Detector Characterization Tools/DTU_dettest/DTU_singledet_acceptance/PTC/SCI/20250812-101350/DTU_DT-SCI_2_bias_9_2025-08-12T101827.618.fits\n", - "2026-02-23 16:50:42,658 - image_creator - INFO - Processing file /Users/yashvi/Desktop/Detector Characterization Tools/DTU_dettest/DTU_singledet_acceptance/PTC/SCI/20250812-101350/DTU_DT-SCI_2_bias_9_2025-08-12T101827.618.fits\n", - "2026-02-23 16:50:42,658 - image_creator - INFO - Processing file /Users/yashvi/Desktop/Detector Characterization Tools/DTU_dettest/DTU_singledet_acceptance/PTC/SCI/20250812-101350/DTU_DT-SCI_2_bias_9_2025-08-12T101827.618.fits\n" + "2026-03-09 14:49:39,976 - MasterBias - INFO - Available methods for MasterBias:\n", + "2026-03-09 14:49:39,986 - MasterBias - INFO - - median: core.image_operations.median_combine, (images: list[numpy.ndarray]) -> numpy.ndarray\n" ] } ], - "execution_count": 13 + "execution_count": 3 + }, + { + "metadata": { + "ExecuteTime": { + "end_time": "2026-03-09T21:49:41.156168Z", + "start_time": "2026-03-09T21:49:40.013020Z" + } + }, + "cell_type": "code", + "source": [ + "# Create master bias task and run it\n", + "master_bias_task = MasterBias(method='median')\n", + "mb_res = master_bias_task.run(bias_images=res['bias'])" + ], + "id": "921c48190c2ae567", + "outputs": [], + "execution_count": 4 }, { "metadata": { @@ -694,8 +776,8 @@ { "metadata": { "ExecuteTime": { - "end_time": "2026-02-24T00:51:12.064165Z", - "start_time": "2026-02-24T00:51:11.295752Z" + "end_time": "2026-03-09T22:05:42.170370Z", + "start_time": "2026-03-09T22:05:41.282413Z" } }, "cell_type": "code", @@ -708,7 +790,7 @@ "" ] }, - "execution_count": 15, + "execution_count": 5, "metadata": {}, "output_type": "execute_result" }, @@ -726,7 +808,7 @@ } } ], - "execution_count": 15 + "execution_count": 5 }, { "metadata": {}, diff --git a/playground/Example_pipeline_flows.ipynb b/playground/Example_pipeline_flows.ipynb new file mode 100644 index 0000000..562270e --- /dev/null +++ b/playground/Example_pipeline_flows.ipynb @@ -0,0 +1,368 @@ +{ + "cells": [ + { + "cell_type": "code", + "id": "initial_id", + "metadata": { + "collapsed": true, + "ExecuteTime": { + "end_time": "2026-03-11T00:40:57.497533Z", + "start_time": "2026-03-11T00:40:55.959788Z" + } + }, + "source": "from pipeline.engine import PipelineEngine", + "outputs": [], + "execution_count": 1 + }, + { + "metadata": { + "ExecuteTime": { + "end_time": "2026-03-11T00:40:57.895771Z", + "start_time": "2026-03-11T00:40:57.507592Z" + } + }, + "cell_type": "code", + "source": "test_eng = PipelineEngine(pipeline_config_input='../eregion/configs/pipeline_flows/masterbias_example.yaml')", + "id": "20948681f2506757", + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "2026-03-10 17:40:57,510 - PipelineConfig - INFO - Config loaded from file '../eregion/configs/pipeline_flows/masterbias_example.yaml'\n", + "2026-03-10 17:40:57,511 - pipeline.engine - INFO - Number of pipelines defined: 2\n", + "2026-03-10 17:40:57,893 - pipeline.engine - INFO - Pipeline order: [('calib_flow',), ('preproc_flow',)]\n", + "2026-03-10 17:40:57,894 - pipeline.engine - INFO - Pipeline calib_flow order: [('calib_flow.image_creator',), ('calib_flow.master_bias',)]\n", + "2026-03-10 17:40:57,894 - pipeline.engine - INFO - Pipeline preproc_flow order: [('preproc_flow.image_creator', 'calib_flow.master_bias'), ('preproc_flow.bias_subtraction',), ('preproc_flow.overscan_subtraction',), ('preproc_flow.badpixel_masking',)]\n" + ] + } + ], + "execution_count": 2 + }, + { + "metadata": { + "ExecuteTime": { + "end_time": "2026-03-11T00:41:23.004102Z", + "start_time": "2026-03-11T00:40:57.906266Z" + } + }, + "cell_type": "code", + "source": "test_eng.run()", + "id": "e556540665a3cc46", + "outputs": [ + { + "data": { + "text/plain": [ + "17:40:58.639 | \u001B[36mINFO\u001B[0m | prefect - Starting temporary server on \u001B[94mhttp://127.0.0.1:8485\u001B[0m\n", + "See \u001B[94mhttps://docs.prefect.io/v3/concepts/server#how-to-guides\u001B[0m for more information on running a dedicated Prefect server.\n" + ], + "text/html": [ + "
17:40:58.639 | INFO    | prefect - Starting temporary server on http://127.0.0.1:8485\n",
+       "See https://docs.prefect.io/v3/concepts/server#how-to-guides for more information on running a dedicated Prefect server.\n",
+       "
\n" + ] + }, + "metadata": {}, + "output_type": "display_data", + "jetTransient": { + "display_id": null + } + }, + { + "data": { + "text/plain": [ + "17:41:00.506 | \u001B[36mINFO\u001B[0m | Flow run\u001B[35m 'unbiased-ara'\u001B[0m - Beginning flow run\u001B[35m 'unbiased-ara'\u001B[0m for flow\u001B[1;35m 'calib_flow'\u001B[0m\n" + ], + "text/html": [ + "
17:41:00.506 | INFO    | Flow run 'unbiased-ara' - Beginning flow run 'unbiased-ara' for flow 'calib_flow'\n",
+       "
\n" + ] + }, + "metadata": {}, + "output_type": "display_data", + "jetTransient": { + "display_id": null + } + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "2026-03-10 17:41:00,538 - DetectorConfig - INFO - Config loaded from file '/Users/yashvi/Desktop/Detector Characterization Tools/eregion/eregion/configs/detectors/deimos_singledet.yaml'\n", + "2026-03-10 17:41:00,539 - calib_flow.image_creator - INFO - Item /Users/yashvi/Desktop/Detector Characterization Tools/DTU_dettest/DTU_singledet_acceptance/PTC/SCI/20250812-101350/*_bias_*.fits is a glob pattern\n", + "2026-03-10 17:41:00,540 - utils.io_utils - INFO - Found FITS file /Users/yashvi/Desktop/Detector Characterization Tools/DTU_dettest/DTU_singledet_acceptance/PTC/SCI/20250812-101350/DTU_DT-SCI_2_bias_2_2025-08-12T101455.893.fits.\n", + "2026-03-10 17:41:00,541 - utils.io_utils - INFO - Found FITS file /Users/yashvi/Desktop/Detector Characterization Tools/DTU_dettest/DTU_singledet_acceptance/PTC/SCI/20250812-101350/DTU_DT-SCI_2_bias_5_2025-08-12T101626.632.fits.\n", + "2026-03-10 17:41:00,541 - utils.io_utils - INFO - Found FITS file /Users/yashvi/Desktop/Detector Characterization Tools/DTU_dettest/DTU_singledet_acceptance/PTC/SCI/20250812-101350/DTU_DT-SCI_2_bias_4_2025-08-12T101556.386.fits.\n", + "2026-03-10 17:41:00,542 - utils.io_utils - INFO - Found FITS file /Users/yashvi/Desktop/Detector Characterization Tools/DTU_dettest/DTU_singledet_acceptance/PTC/SCI/20250812-101350/DTU_DT-SCI_2_bias_3_2025-08-12T101526.139.fits.\n", + "2026-03-10 17:41:00,543 - utils.io_utils - INFO - Found FITS file /Users/yashvi/Desktop/Detector Characterization Tools/DTU_dettest/DTU_singledet_acceptance/PTC/SCI/20250812-101350/DTU_DT-SCI_2_bias_9_2025-08-12T101827.618.fits.\n", + "2026-03-10 17:41:00,543 - utils.io_utils - INFO - Found FITS file /Users/yashvi/Desktop/Detector Characterization Tools/DTU_dettest/DTU_singledet_acceptance/PTC/SCI/20250812-101350/DTU_DT-SCI_2_bias_0_2025-08-12T101355.400.fits.\n", + "2026-03-10 17:41:00,543 - utils.io_utils - INFO - Found FITS file /Users/yashvi/Desktop/Detector Characterization Tools/DTU_dettest/DTU_singledet_acceptance/PTC/SCI/20250812-101350/DTU_DT-SCI_2_bias_6_2025-08-12T101656.879.fits.\n", + "2026-03-10 17:41:00,544 - utils.io_utils - INFO - Found FITS file /Users/yashvi/Desktop/Detector Characterization Tools/DTU_dettest/DTU_singledet_acceptance/PTC/SCI/20250812-101350/DTU_DT-SCI_2_bias_1_2025-08-12T101425.647.fits.\n", + "2026-03-10 17:41:00,544 - utils.io_utils - INFO - Found FITS file /Users/yashvi/Desktop/Detector Characterization Tools/DTU_dettest/DTU_singledet_acceptance/PTC/SCI/20250812-101350/DTU_DT-SCI_2_bias_7_2025-08-12T101727.125.fits.\n", + "2026-03-10 17:41:00,544 - utils.io_utils - INFO - Found FITS file /Users/yashvi/Desktop/Detector Characterization Tools/DTU_dettest/DTU_singledet_acceptance/PTC/SCI/20250812-101350/DTU_DT-SCI_2_bias_8_2025-08-12T101757.371.fits.\n", + "2026-03-10 17:41:00,545 - calib_flow.image_creator - INFO - Processing file /Users/yashvi/Desktop/Detector Characterization Tools/DTU_dettest/DTU_singledet_acceptance/PTC/SCI/20250812-101350/DTU_DT-SCI_2_bias_0_2025-08-12T101355.400.fits\n", + "2026-03-10 17:41:01,522 - calib_flow.image_creator - INFO - Processing file /Users/yashvi/Desktop/Detector Characterization Tools/DTU_dettest/DTU_singledet_acceptance/PTC/SCI/20250812-101350/DTU_DT-SCI_2_bias_1_2025-08-12T101425.647.fits\n", + "2026-03-10 17:41:02,369 - calib_flow.image_creator - INFO - Processing file /Users/yashvi/Desktop/Detector Characterization Tools/DTU_dettest/DTU_singledet_acceptance/PTC/SCI/20250812-101350/DTU_DT-SCI_2_bias_2_2025-08-12T101455.893.fits\n", + "2026-03-10 17:41:03,217 - calib_flow.image_creator - INFO - Processing file /Users/yashvi/Desktop/Detector Characterization Tools/DTU_dettest/DTU_singledet_acceptance/PTC/SCI/20250812-101350/DTU_DT-SCI_2_bias_3_2025-08-12T101526.139.fits\n", + "2026-03-10 17:41:04,074 - calib_flow.image_creator - INFO - Processing file /Users/yashvi/Desktop/Detector Characterization Tools/DTU_dettest/DTU_singledet_acceptance/PTC/SCI/20250812-101350/DTU_DT-SCI_2_bias_4_2025-08-12T101556.386.fits\n", + "2026-03-10 17:41:04,486 - calib_flow.image_creator - INFO - Processing file /Users/yashvi/Desktop/Detector Characterization Tools/DTU_dettest/DTU_singledet_acceptance/PTC/SCI/20250812-101350/DTU_DT-SCI_2_bias_5_2025-08-12T101626.632.fits\n", + "2026-03-10 17:41:04,914 - calib_flow.image_creator - INFO - Processing file /Users/yashvi/Desktop/Detector Characterization Tools/DTU_dettest/DTU_singledet_acceptance/PTC/SCI/20250812-101350/DTU_DT-SCI_2_bias_6_2025-08-12T101656.879.fits\n", + "2026-03-10 17:41:05,352 - calib_flow.image_creator - INFO - Processing file /Users/yashvi/Desktop/Detector Characterization Tools/DTU_dettest/DTU_singledet_acceptance/PTC/SCI/20250812-101350/DTU_DT-SCI_2_bias_7_2025-08-12T101727.125.fits\n", + "2026-03-10 17:41:05,798 - calib_flow.image_creator - INFO - Processing file /Users/yashvi/Desktop/Detector Characterization Tools/DTU_dettest/DTU_singledet_acceptance/PTC/SCI/20250812-101350/DTU_DT-SCI_2_bias_8_2025-08-12T101757.371.fits\n", + "2026-03-10 17:41:06,672 - calib_flow.image_creator - INFO - Processing file /Users/yashvi/Desktop/Detector Characterization Tools/DTU_dettest/DTU_singledet_acceptance/PTC/SCI/20250812-101350/DTU_DT-SCI_2_bias_9_2025-08-12T101827.618.fits\n" + ] + }, + { + "data": { + "text/plain": [ + "17:41:07.067 | \u001B[36mINFO\u001B[0m | Task run 'calib_flow.image_creator-9fe' - Finished in state \u001B[32mCompleted\u001B[0m()\n" + ], + "text/html": [ + "
17:41:07.067 | INFO    | Task run 'calib_flow.image_creator-9fe' - Finished in state Completed()\n",
+       "
\n" + ] + }, + "metadata": {}, + "output_type": "display_data", + "jetTransient": { + "display_id": null + } + }, + { + "data": { + "text/plain": [ + "17:41:09.365 | \u001B[36mINFO\u001B[0m | Task run 'calib_flow.master_bias-17d' - Finished in state \u001B[32mCompleted\u001B[0m()\n" + ], + "text/html": [ + "
17:41:09.365 | INFO    | Task run 'calib_flow.master_bias-17d' - Finished in state Completed()\n",
+       "
\n" + ] + }, + "metadata": {}, + "output_type": "display_data", + "jetTransient": { + "display_id": null + } + }, + { + "data": { + "text/plain": [ + "17:41:09.382 | \u001B[36mINFO\u001B[0m | Flow run\u001B[35m 'unbiased-ara'\u001B[0m - Finished in state \u001B[32mCompleted\u001B[0m()\n" + ], + "text/html": [ + "
17:41:09.382 | INFO    | Flow run 'unbiased-ara' - Finished in state Completed()\n",
+       "
\n" + ] + }, + "metadata": {}, + "output_type": "display_data", + "jetTransient": { + "display_id": null + } + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "2026-03-10 17:41:09,384 - pipeline.engine - INFO - Pipeline 'calib_flow' complete\n" + ] + }, + { + "data": { + "text/plain": [ + "17:41:09.427 | \u001B[36mINFO\u001B[0m | Flow run\u001B[35m 'awesome-eel'\u001B[0m - Beginning flow run\u001B[35m 'awesome-eel'\u001B[0m for flow\u001B[1;35m 'preproc_flow'\u001B[0m\n" + ], + "text/html": [ + "
17:41:09.427 | INFO    | Flow run 'awesome-eel' - Beginning flow run 'awesome-eel' for flow 'preproc_flow'\n",
+       "
\n" + ] + }, + "metadata": {}, + "output_type": "display_data", + "jetTransient": { + "display_id": null + } + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "2026-03-10 17:41:09,450 - DetectorConfig - INFO - Config loaded from file '/Users/yashvi/Desktop/Detector Characterization Tools/eregion/eregion/configs/detectors/deimos_singledet.yaml'\n", + "2026-03-10 17:41:09,450 - DetectorConfig - INFO - Config loaded from file '/Users/yashvi/Desktop/Detector Characterization Tools/eregion/eregion/configs/detectors/deimos_singledet.yaml'\n", + "2026-03-10 17:41:09,451 - preproc_flow.image_creator - INFO - Item /Users/yashvi/Desktop/Detector Characterization Tools/DTU_dettest/DTU_singledet_acceptance/PTC/SCI/20250812-101350/*flat_0.000*.fits is a glob pattern\n", + "2026-03-10 17:41:09,452 - utils.io_utils - INFO - Found FITS file /Users/yashvi/Desktop/Detector Characterization Tools/DTU_dettest/DTU_singledet_acceptance/PTC/SCI/20250812-101350/DTU_DT-SCI_2_flat_0.000_0_2025-08-12T102203.192.fits.\n", + "2026-03-10 17:41:09,454 - utils.io_utils - INFO - Found FITS file /Users/yashvi/Desktop/Detector Characterization Tools/DTU_dettest/DTU_singledet_acceptance/PTC/SCI/20250812-101350/DTU_DT-SCI_2_flat_0.000_1_2025-08-12T102233.989.fits.\n", + "2026-03-10 17:41:09,454 - preproc_flow.image_creator - INFO - Processing file /Users/yashvi/Desktop/Detector Characterization Tools/DTU_dettest/DTU_singledet_acceptance/PTC/SCI/20250812-101350/DTU_DT-SCI_2_flat_0.000_0_2025-08-12T102203.192.fits\n", + "2026-03-10 17:41:10,302 - preproc_flow.image_creator - INFO - Processing file /Users/yashvi/Desktop/Detector Characterization Tools/DTU_dettest/DTU_singledet_acceptance/PTC/SCI/20250812-101350/DTU_DT-SCI_2_flat_0.000_1_2025-08-12T102233.989.fits\n" + ] + }, + { + "data": { + "text/plain": [ + "17:41:10.711 | \u001B[36mINFO\u001B[0m | Task run 'preproc_flow.image_creator-532' - Finished in state \u001B[32mCompleted\u001B[0m()\n" + ], + "text/html": [ + "
17:41:10.711 | INFO    | Task run 'preproc_flow.image_creator-532' - Finished in state Completed()\n",
+       "
\n" + ] + }, + "metadata": {}, + "output_type": "display_data", + "jetTransient": { + "display_id": null + } + }, + { + "data": { + "text/plain": [ + "17:41:13.745 | \u001B[36mINFO\u001B[0m | Task run 'preproc_flow.bias_subtraction-d2a' - Finished in state \u001B[32mCompleted\u001B[0m()\n" + ], + "text/html": [ + "
17:41:13.745 | INFO    | Task run 'preproc_flow.bias_subtraction-d2a' - Finished in state Completed()\n",
+       "
\n" + ] + }, + "metadata": {}, + "output_type": "display_data", + "jetTransient": { + "display_id": null + } + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "/Users/yashvi/Desktop/Detector Characterization Tools/eregion/.venv/lib/python3.13/site-packages/numpy/_core/fromnumeric.py:3860: RuntimeWarning: Mean of empty slice.\n", + " return _methods._mean(a, axis=axis, dtype=dtype,\n", + "/Users/yashvi/Desktop/Detector Characterization Tools/eregion/.venv/lib/python3.13/site-packages/numpy/_core/_methods.py:136: RuntimeWarning: invalid value encountered in divide\n", + " ret = um.true_divide(\n", + "/Users/yashvi/Desktop/Detector Characterization Tools/eregion/.venv/lib/python3.13/site-packages/numpy/_core/fromnumeric.py:3860: RuntimeWarning: Mean of empty slice.\n", + " return _methods._mean(a, axis=axis, dtype=dtype,\n", + "/Users/yashvi/Desktop/Detector Characterization Tools/eregion/.venv/lib/python3.13/site-packages/numpy/_core/_methods.py:136: RuntimeWarning: invalid value encountered in divide\n", + " ret = um.true_divide(\n" + ] + }, + { + "data": { + "text/plain": [ + "17:41:16.396 | \u001B[36mINFO\u001B[0m | Task run 'preproc_flow.overscan_subtraction-3d0' - Finished in state \u001B[32mCompleted\u001B[0m()\n" + ], + "text/html": [ + "
17:41:16.396 | INFO    | Task run 'preproc_flow.overscan_subtraction-3d0' - Finished in state Completed()\n",
+       "
\n" + ] + }, + "metadata": {}, + "output_type": "display_data", + "jetTransient": { + "display_id": null + } + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "WARNING: Input data contains invalid values (NaNs or infs), which were automatically clipped. [astropy.stats.sigma_clipping]\n", + "2026-03-10 17:41:18,317 - datamodels.image - WARNING - Output with id chan_1 already exists, overwrite is set to True.\n", + "2026-03-10 17:41:18,317 - datamodels.image - WARNING - Output with id chan_2 already exists, overwrite is set to True.\n", + "WARNING: Input data contains invalid values (NaNs or infs), which were automatically clipped. [astropy.stats.sigma_clipping]\n", + "2026-03-10 17:41:21,860 - datamodels.image - WARNING - Output with id chan_1 already exists, overwrite is set to True.\n", + "2026-03-10 17:41:21,860 - datamodels.image - WARNING - Output with id chan_2 already exists, overwrite is set to True.\n" + ] + }, + { + "data": { + "text/plain": [ + "17:41:22.968 | \u001B[36mINFO\u001B[0m | Task run 'preproc_flow.badpixel_masking-59b' - Finished in state \u001B[32mCompleted\u001B[0m()\n" + ], + "text/html": [ + "
17:41:22.968 | INFO    | Task run 'preproc_flow.badpixel_masking-59b' - Finished in state Completed()\n",
+       "
\n" + ] + }, + "metadata": {}, + "output_type": "display_data", + "jetTransient": { + "display_id": null + } + }, + { + "data": { + "text/plain": [ + "17:41:22.998 | \u001B[36mINFO\u001B[0m | Flow run\u001B[35m 'awesome-eel'\u001B[0m - Finished in state \u001B[32mCompleted\u001B[0m()\n" + ], + "text/html": [ + "
17:41:22.998 | INFO    | Flow run 'awesome-eel' - Finished in state Completed()\n",
+       "
\n" + ] + }, + "metadata": {}, + "output_type": "display_data", + "jetTransient": { + "display_id": null + } + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "2026-03-10 17:41:23,000 - pipeline.engine - INFO - Pipeline 'preproc_flow' complete\n" + ] + }, + { + "data": { + "text/plain": [ + "{'calib_flow.image_creator': TaskResult(task_name='calib_flow.image_creator', data={'bias': [, , , , , , , , , ]}, params={'init': {'detector_config': '/Users/yashvi/Desktop/Detector Characterization Tools/eregion/eregion/configs/detectors/deimos_singledet.yaml'}, 'run': {'input_source': '/Users/yashvi/Desktop/Detector Characterization Tools/DTU_dettest/DTU_singledet_acceptance/PTC/SCI/20250812-101350/*_bias_*.fits', 'identifier_func': 'tasks.custom.guess_image_type_from_filename_DEIMOS'}}, upstream=[], timestamp=