diff --git a/python/lsst/drp/tasks/build_warp_diff_matrix.py b/python/lsst/drp/tasks/build_warp_diff_matrix.py new file mode 100644 index 00000000..4a9ed88d --- /dev/null +++ b/python/lsst/drp/tasks/build_warp_diff_matrix.py @@ -0,0 +1,288 @@ +# This file is part of drp_tasks. +# +# Developed for the LSST Data Management System. +# This product includes software developed by the LSST Project +# (https://www.lsst.org). +# See the COPYRIGHT file at the top-level directory of this distribution +# for details of code ownership. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +from __future__ import annotations + +__all__ = () + +import dataclasses +from collections.abc import Mapping +from typing import ClassVar + +import astropy.io.fits +import astropy.table +import numpy as np +import scipy.linalg + +from lsst.afw.cameraGeom import FOCAL_PLANE, Camera +from lsst.afw.math import ChebyshevBoundedField, ChebyshevBoundedFieldControl +from lsst.geom import Box2D, Box2I +from lsst.pex.config import Field +from lsst.pipe.base import ( + PipelineTask, + PipelineTaskConfig, + PipelineTaskConnections, + Struct, +) +from lsst.pipe.base import connectionTypes as cT + + +@dataclasses.dataclass +class WarpDiffMatrixProblem: + chebyshev_order: int + visit_ids: np.ndarray + matrix: np.ndarray + vector: np.ndarray + + @classmethod + def readFits(cls, filename: str) -> WarpDiffMatrixProblem: + with astropy.io.fits.open(filename) as fits: + chebyshev_order = fits["MODEL"].header["CHBORDER"] + visit_ids = np.asarray(fits["MODEL"].data["visit_id"], dtype=np.uint64) + matrix = np.asarray(fits["MATRIX"].data, dtype=np.float64) + vector = np.asarray(fits["VECTOR"].data, dtype=np.float64) + return cls( + chebyshev_order=chebyshev_order, + visit_ids=visit_ids, + matrix=matrix, + vector=vector, + ) + + def writeFits(self, filename: str) -> None: + fits = astropy.io.fits.HDUList() + visit_id_hdu = astropy.io.fits.BinTableHDU.from_columns( + [astropy.io.fits.Column("visit_id", format="K", array=self.visit_ids)] + ) + visit_id_hdu.header["EXTNAME"] = "MODEL" + visit_id_hdu.header["CHBORDER"] = self.chebyshev_order + fits.append(visit_id_hdu) + fits.append(astropy.io.fits.ImageHDU(self.matrix, name="MATRIX")) + fits.append(astropy.io.fits.ImageHDU(self.vector, name="VECTOR")) + fits.writeto(filename) + + +class BuildWarpDiffMatrixConnections(PipelineTaskConnections, dimensions=["patch", "band"]): + diff_table = cT.Input("warp_diff_binned", storageClass="ArrowAstropy", dimensions=["patch", "band"]) + camera = cT.PrerequisiteInput( + "camera", + storageClass="Camera", + dimensions=["instrument"], + isCalibration=True, + ) + matrix_problem = cT.Output( + "warp_diff_matrix_problem", storageClass="WarpDiffMatrixProblem", dimensions=["patch", "band"] + ) + + +class BuildWarpDiffMatrixConfig(PipelineTaskConfig, pipelineConnections=BuildWarpDiffMatrixConnections): + covariance_rcond = Field[float]( + "Relative condition number for constructing a basis with nonsingular covariance for each bin.", + dtype=float, + default=1e-8, + ) + chebyshev_order = Field[int]( + "Order of the Chebyshev polynomial fit across the focal-plane for each visit.", + dtype=int, + default=4, + check=lambda v: v >= 0, + ) + + +class BuildWarpDiffMatrixTask(PipelineTask): + _DefaultName: ClassVar[str] = "buildWarpDiffMatrix" + ConfigClass: ClassVar[type[BuildWarpDiffMatrixConfig]] = BuildWarpDiffMatrixConfig + config: BuildWarpDiffMatrixConfig + + def __init__(self, *, config=None, log=None, initInputs=None, **kwargs): + super().__init__(config=config, log=log, initInputs=initInputs, **kwargs) + # We always drop the zeroth-order Chebyshev term since it's degenerate + # with detector pisons. + self.n_chebyshev = ((self.config.chebyshev_order + 2) * (self.config.chebyshev_order + 1)) // 2 - 1 + + def run( + self, + *, + camera: Camera, + diff_table: astropy.table.Table, + ): + visit_indices = {visit_id: index for index, visit_id in enumerate(self.unique_visits(diff_table))} + n_visits = len(visit_indices) + n_parameters = (len(camera) + self.n_chebyshev) * n_visits + F = np.zeros((n_parameters, n_parameters), dtype=float) + g = np.zeros(n_parameters, dtype=float) + diff_table = diff_table.group_by(["bin_row", "bin_column"]) + camera_bbox = self._compute_camera_bbox(camera) + detector_indices = {detector_id: index for index, detector_id in enumerate(camera)} + for begin, end in zip(diff_table.groups.indices[:-1], diff_table.groups.indices[1:]): + bin_diff_table = diff_table[begin:end] + # The bins that have the same (row, column) in patch coordinates + # are correlated, because many of them involved the same warp and + # hence the same pixel values. We need to incorporate these + # correlations in our fitting. + # We start by constructing a 'projection matrix' A that maps the + # warps to their differences. It has shape (n_diffs, n_visits), + # and each row has a 1 for the positive visit and -1 for the + # negative visit. + n_diffs = end - begin + A = np.zeros((n_diffs, n_visits), dtype=float) + for i in range(n_diffs): + A[i, visit_indices[bin_diff_table["positive_visit_id"][i]]] = 1.0 + A[i, visit_indices[bin_diff_table["negative_visit_id"][i]]] = -1.0 + # If we had computed the original variances of the visit warps in + # the same bins V with the same clipping, we could compute the + # covariance matrix of the diffs via + # + # C = A V A^T + # + # We didn't do that, but we did save something almost as good (and + # a lot more efficient to gather): the variances of the diffs + # we actually measured, i.e. the diagonal of C. And thanks to the + # structure of A, we know those diagonal values are just the sums + # of the variances of the two visits that went into the diff + # (because when you subtract values, you add their variances). + # That means that as long as n_diffs >= n_visits, we can recover + # V from the diagonal of C, and we can do it by solving: + # + # diag(C) = abs(A) diag(V) + # + # Because different diffs involving the same visit might have + # different clipping, this won't factor exactly, so we use least + # squares to effectively average over different combinations of + # diffs giving slightly inconsistent estimates of the visit-level + # variances. + V, _, _, _ = scipy.linalg.lstsq(np.abs(A), bin_diff_table["bin_variance"]) + C = np.dot(np.dot(A, V), A.transpose()) + # We can't use our new covariance directly, however, because it's + # almost certainly singular: we've fundamentally only got at most + # (n_visits - 1) independent sets of pixels, but we've constructed + # n_diffs data points from those. To fix this, we compute the + # spectral decomposition of C: + # + # Q S Q^T + # + # and select out the significantly positive eigenvalues and + # corresponding columns of Q: + S, Q = scipy.linalg.eigh(C) + keep = S > np.max(S) * self.config.covariance_rcond + S = S[keep] + Q = Q[:, keep] + # Q still satisfies Q Q^T = I, but not Q^T Q = I. We can still use + # it to project our diffs vector 'z' to a smaller-dimensional space + # with a nonsingular (and diagonal) covariance matrix S: + # + # z -> Q^T z = v + # + # C -> Q^T C Q = Q^T (Q S Q^T) Q = S + # + v = np.dot(Q.transpose(), bin_diff_table["bin_value"]) + # Our projected data points are now independent, and they're + # independent from the data points from any bin (row, column), + # as well as independent of any the data points from any other + # patch. + # The ultimate goal is to find the minimum-norm solution to the the + # normal equations: + # + # (B^T Q S^{-1} Q^T B) \alpha = B^T Q S^{-1} v + # + # F \alpha = g + # + # where \alpha are the background model parameters (for all visits) + # and B is the matrix that evaluates and subtracts the models at + # bin centers. Because S is diagonal, we can construct the matrix + # and right-hand side vector by summing the contributions from each + # bin 'i': + # + # F = \sum_i F_i = \sum_i B_i^T Q_i S_i^{-1} Q_i^T B_i + # b = \sum_i g_i = \sum_i B_i^T Q_i S_i^{-1} v + # + S_inv_sqrt = S ** (-0.5) + B = self.make_model_matrix(diff_table[begin:end], visit_indices, detector_indices, camera_bbox) + F_i_sqrt = np.dot(S_inv_sqrt[:, np.newaxis] * Q.transpose(), B) + F += np.dot(F_i_sqrt.transpose(), F_i_sqrt) + g += np.dot(F_i_sqrt, S_inv_sqrt * v) + return Struct( + matrix_problem=WarpDiffMatrixProblem( + chebyshev_order=self.config.chebyshev_order, + visit_ids=np.array(visit_indices.keys(), dtype=np.uint64), + matrix=F, + vector=g, + ) + ) + + def make_model_matrix( + self, + bin_diff_table: astropy.table.Table, + all_visits: Mapping[int, int], + all_detectors: Mapping[int, int], + camera_bbox: Box2I, + ) -> np.ndarray: + visits_in_bin = self.unique_visits(bin_diff_table) + n_visit_parameters = self.n_chebyshev + len(all_detectors) + n_parameters = n_visit_parameters * len(all_visits) + ctrl = ChebyshevBoundedFieldControl() + ctrl.orderX = self.chebyshev_order + ctrl.orderY = self.chebyshev_order + B = np.zeros((len(bin_diff_table), n_parameters), dtype=float) + positive_chebyshev_matrix = ChebyshevBoundedField.makeFitMatrix( + camera_bbox, bin_diff_table["positive_x"], bin_diff_table["positive_y"], ctrl + )[:, 1:] + positive_chebyshev_matrix *= bin_diff_table["positive_scaling"][:, np.newaxis] + negative_chebyshev_matrix = ChebyshevBoundedField.makeFitMatrix( + camera_bbox, bin_diff_table["negative_x"], bin_diff_table["negative_y"], ctrl + )[:, 1:] + negative_chebyshev_matrix *= bin_diff_table["negative_scaling"][:, np.newaxis] + for visit_id in visits_in_bin: + positive_mask = bin_diff_table["positive_visit_id"] == visit_id + negative_mask = bin_diff_table["negative_visit_id"] == visit_id + j = all_visits[visit_id] * n_visit_parameters + B[positive_mask, j : j + self.n_chebyshev] += positive_chebyshev_matrix[positive_mask, :] + B[negative_mask, j : j + self.n_chebyshev] += negative_chebyshev_matrix[negative_mask, :] + for i in range(len(bin_diff_table)): + j = ( + all_visits[bin_diff_table["positive_visit_id"][i]] * n_visit_parameters + + self.n_chebyshev + + all_detectors[bin_diff_table["positive_detector_id"][i]] + ) + B[i, j] = bin_diff_table["positive_scaling"].data[i] + j = ( + all_visits[bin_diff_table["negative_visit_id"][i]] * n_visit_parameters + + self.n_chebyshev + + all_detectors[bin_diff_table["negative_detector_id"][i]] + ) + B[i, j] = -bin_diff_table["negative_scaling"].data[i] + return B + + @staticmethod + def unique_visits(diff_table: astropy.table.Table) -> list[int]: + return sorted(set(diff_table["positive_visit_id"]) | set(diff_table["negative_visit_id"])) + + @staticmethod + def unique_detectors(diff_table: astropy.table.Table) -> list[int]: + return sorted(set(diff_table["positive_detector_id"]) | set(diff_table["negative_detector_id"])) + + @staticmethod + def _compute_camera_bbox(camera: Camera) -> Box2I: + """Compute the bounding box of a camera in focal-plane coordinates.""" + bbox = Box2D() + for detector in camera: + for corner in detector.getCorners(FOCAL_PLANE): + bbox.include(corner) + return Box2I(bbox) diff --git a/python/lsst/drp/tasks/diff_warp_background.py b/python/lsst/drp/tasks/diff_warp_background.py new file mode 100644 index 00000000..de2074f0 --- /dev/null +++ b/python/lsst/drp/tasks/diff_warp_background.py @@ -0,0 +1,523 @@ +# This file is part of drp_tasks. +# +# Developed for the LSST Data Management System. +# This product includes software developed by the LSST Project +# (https://www.lsst.org). +# See the COPYRIGHT file at the top-level directory of this distribution +# for details of code ownership. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +from __future__ import annotations + +__all__ = () + +import dataclasses +import itertools +import math +from collections.abc import Mapping +from typing import ClassVar + +import astropy.table +import numpy as np + +import lsst.afw.math +from lsst.afw.cameraGeom import FOCAL_PLANE, PIXELS, Camera, Detector +from lsst.afw.geom import Polygon, SinglePolygonException, TransformPoint2ToPoint2, makeWcsPairTransform +from lsst.afw.image import ExposureF, Mask, PhotoCalib +from lsst.afw.table import ExposureCatalog, ExposureRecord +from lsst.daf.butler import DeferredDatasetHandle +from lsst.geom import Box2D +from lsst.pex.config import ChoiceField, Field, ListField +from lsst.pipe.base import ( + InputQuantizedConnection, + OutputQuantizedConnection, + PipelineTask, + PipelineTaskConfig, + PipelineTaskConnections, + QuantumContext, + Struct, +) +from lsst.pipe.base import connectionTypes as cT +from lsst.skymap import BaseSkyMap, PatchInfo + + +@dataclasses.dataclass +class WarpDetectorInfo: + polygon: Polygon + detector: Detector + patch_to_detector: TransformPoint2ToPoint2 + photo_calib: PhotoCalib + + +@dataclasses.dataclass +class WarpData: + visit_id: int + detectors: Mapping[int, WarpDetectorInfo] + exposure: ExposureF + + @classmethod + def load_and_crop( + cls, + warp_handle: DeferredDatasetHandle, + correction_warp_handle: DeferredDatasetHandle, + patch_info: PatchInfo, + detectors: Mapping[int, WarpDetectorInfo], + ) -> WarpData: + bbox = patch_info.getInnerBBox() + exposure = warp_handle.get()[bbox] + exposure.maskedImage /= correction_warp_handle.get()[bbox] + return WarpData( + visit_id=warp_handle.dataId["visit"], + exposure=exposure, + detectors=detectors, + ) + + +@dataclasses.dataclass +class DiffDetectorArrays: + detector_id: np.ndarray + x_camera: np.ndarray + y_camera: np.ndarray + x_detector: np.ndarray + y_detector: np.ndarray + scaling: np.ndarray + mask: np.ndarray + + +class DiffWarpBackgroundsConnections(PipelineTaskConnections, dimensions=["patch", "band"]): + warps = cT.Input( + "direct_matched_warp", + storageClass="ExposureF", + multiple=True, + deferLoad=True, + dimensions=["patch", "visit"], + ) + correction_warps = cT.Input( + # TODO: nothing makes this dataset right now; need to modify + # MakeDirectWarpTask to optionally produce it (by warping ones if it + # doesn't get a background_to_photometric_ratio input). + "background_to_photometric_ratio_warped", + storageClass="ExposureF", + multiple=True, + deferLoad=True, + dimensions=["patch", "visit"], + ) + visit_summaries = cT.Input( + "visit_summary", + storageClass="ExposureCatalog", + multiple=True, + dimensions=["visit"], + ) + sky_map = cT.Input( + BaseSkyMap.SKYMAP_DATASET_TYPE_NAME, + doc="Input definition of geometry/bbox and projection/wcs for warps.", + storageClass="SkyMap", + dimensions=["skymap"], + ) + camera = cT.PrerequisiteInput( + "camera", + storageClass="Camera", + dimensions=["instrument"], + isCalibration=True, + ) + diff_table = cT.Output("warp_diff_binned", storageClass="ArrowAstropy", dimensions=["patch", "band"]) + + def adjustQuantum(self, inputs, outputs, label, data_id): + # Trim out inputs for which we don't have some other input for a visit. + warps_connection, warp_refs = inputs["warps"] + correction_warps_connection, correction_warp_refs = inputs["correction_warps"] + visit_summaries_connection, visit_summary_refs = inputs["visit_summaries"] + warp_refs_by_visit = {ref.dataId["visit"]: ref for ref in warp_refs} + correction_warp_refs_by_visit = {ref.dataId["visit"]: ref for ref in correction_warp_refs} + visit_summary_refs_by_visit = {ref.dataId["visit"]: ref for ref in visit_summary_refs} + visits = sorted( + warp_refs_by_visit.keys() + & visit_summary_refs_by_visit.keys() + & correction_warp_refs_by_visit.keys() + ) + inputs["warps"] = (warps_connection, [warp_refs_by_visit[v] for v in visits]) + inputs["correction_warps"] = ( + correction_warps_connection, + [correction_warp_refs_by_visit[v] for v in visits], + ) + inputs["visit_summaries"] = ( + visit_summaries_connection, + [visit_summary_refs_by_visit[v] for v in visits], + ) + return super().adjustQuantum(inputs, outputs, label, data_id) + + +class DiffWarpBackgroundsConfig(PipelineTaskConfig, pipelineConnections=DiffWarpBackgroundsConnections): + bin_size = Field[int]("Size of each bin (isotropic in x and y) in patch pixels.", dtype=int, default=128) + bin_overlap_erosion = Field[int]( + "How much to shrink a bin's bounding box by (on each side) when testing whether " + "it overlaps a detector. Must be at least 1 to avoid floating-point round-off problems.", + dtype=int, + default=16, + check=lambda v: v > 0, + ) + n_reference_visits = Field("Maximum of visits that are diffed against all others.", dtype=int, default=4) + bad_mask_planes = ListField( + doc="Names of mask planes to ignore while estimating the background", + dtype=str, + default=["EDGE", "DETECTED", "DETECTED_NEGATIVE", "SAT", "BAD", "INTRP", "CR"], + itemCheck=lambda x: x in Mask().getMaskPlaneDict().keys(), + ) + statistic = ChoiceField( + dtype=str, + doc="Type of statistic to estimate pixel value for each bin", + default="MEDIAN", + allowed={"MEAN": "mean", "MEDIAN": "median", "MEANCLIP": "clipped mean"}, + ) + + +class DiffWarpBackgroundsTask(PipelineTask): + _DefaultName: ClassVar[str] = "diffWarpBackgrounds" + ConfigClass: ClassVar[type[DiffWarpBackgroundsConfig]] = DiffWarpBackgroundsConfig + config: DiffWarpBackgroundsConfig + + @staticmethod + def make_diff_table_dtype() -> np.dtype: + return np.dtype( + [ + ("positive_visit_id", np.uint64), + ("negative_visit_id", np.uint64), + ("positive_detector_id", np.uint8), + ("negative_detector_id", np.uint8), + ("positive_camera_x", np.float64), + ("positive_camera_y", np.float64), + ("negative_camera_x", np.float64), + ("negative_camera_y", np.float64), + ("positive_detector_x", np.float64), + ("positive_detector_y", np.float64), + ("negative_detector_x", np.float64), + ("negative_detector_y", np.float64), + ("positive_scaling", np.float64), + ("negative_scaling", np.float64), + ("tract_x", np.float64), + ("tract_y", np.float64), + ("bin_row", np.uint16), + ("bin_column", np.uint16), + ("bin_value", np.float32), + ("bin_variance", np.float32), + ] + ) + + def __init__(self, *, config=None, log=None, initInputs=None, **kwargs): + super().__init__(config=config, log=log, initInputs=initInputs, **kwargs) + self.diff_table_dtype = self.make_diff_table_dtype() + self.stats_flag = getattr(lsst.afw.math, self.config.statistic) + self.stats_ctrl = lsst.afw.math.StatisticsControl() + self.stats_ctrl.setAndMask(Mask.getPlaneBitMask(self.config.bad_mask_planes)) + self.stats_ctrl.setNanSafe(True) + + def runQuantum( + self, + butlerQC: QuantumContext, + inputRefs: InputQuantizedConnection, + outputRefs: OutputQuantizedConnection, + ) -> None: + sky_map = butlerQC.get(inputRefs.sky_map) + patch_info = sky_map[butlerQC.dataId["tract"]][butlerQC.dataId["patch"]] + warp_handles = {handle.dataId["visit"]: handle for handle in butlerQC.get(inputRefs.warps)} + correction_warp_handles = { + handle.dataId["visit"]: handle for handle in butlerQC.get(inputRefs.correction_warps) + } + visit_summaries = {ref.dataId["visit"]: butlerQC.get(ref) for ref in inputRefs.visit_summaries} + camera = butlerQC.get(inputRefs.camera) + outputs = self.run( + patch_info=patch_info, + warp_handles=warp_handles, + correction_warp_handles=correction_warp_handles, + visit_summaries=visit_summaries, + camera=camera, + ) + butlerQC.put(outputs, outputRefs) + + def run( + self, + *, + patch_info: PatchInfo, + warp_handles: Mapping[int, DeferredDatasetHandle], + correction_warp_handles: Mapping[int, DeferredDatasetHandle], + visit_summaries: Mapping[int, ExposureCatalog], + camera: Camera, + ) -> Struct: + self.log.info("Computing detector polygons for %s visits.", len(visit_summaries)) + # A dictionary mapping visit ID to a nested mapping from detector ID to + # polygon, with only detectors that overlap the patch and only visits + # that have at least one overlapping detector. We'll pop visits from + # this dictionary as we process them. + to_do: dict[int, dict[int, Polygon]] = {} + for visit_id, visit_summary in visit_summaries.items(): + visit_detector_info = { + record.getId(): detector_info + for record in visit_summary + if (detector_info := self._make_detector_info(record, patch_info, camera[record.getId()])) + is not None + } + if visit_detector_info: + to_do[visit_id] = visit_detector_info + self.log.info("Selecting up to %s reference visits.", self.config.n_reference_visits) + reference_visits = self._select_reference_visits(to_do) + self.log.info("Loading %s reference visit warps.", len(reference_visits)) + reference_warps = [ + WarpData.load_and_crop( + warp_handles[visit_id], + correction_warp_handles[visit_id], + patch_info, + to_do.pop(visit_id), + ) + for visit_id in reference_visits + ] + self.log.info( + "Diffing %s reference visit pairs.", len(reference_visits) * (len(reference_visits) - 1) / 2 + ) + diff_table_chunks: list[np.ndarray] = [] + for positive, negative in itertools.combinations(reference_warps.items(), r=2): + diff_table_chunks.append(self._compute_bin_diffs(positive, negative)) + self.log.info("Diffing reference visits with %s other visits.", len(to_do)) + for visit_id in sorted(to_do.keys()): # sort just for determinism + positive = WarpData.load_and_crop( + warp_handles[visit_id], + correction_warp_handles[visit_id], + patch_info, + to_do.pop(visit_id), + ) + for negative in reference_warps: + diff_table_chunks.append(self._compute_bin_diffs(positive, negative)) + return Struct( + diff_table=astropy.table.Table(np.vstack(diff_table_chunks)), + ) + + def _make_detector_info( + self, record: ExposureRecord, patch_info: PatchInfo, detector: Detector + ) -> WarpDetectorInfo | None: + detector_to_patch = makeWcsPairTransform(record.wcs, patch_info.getWcs()) + if (polygon := record.validPolygon) is None: + polygon = Polygon(Box2D(record.getBBox())) + try: + patch_polygon = polygon.transform(detector_to_patch).intersectionSingle( + Box2D(patch_info.getInnerBBox()) + ) + except SinglePolygonException: + return None + photo_calib = record.getPhotoCalib() + if photo_calib is None: + return None + patch_to_detector = detector_to_patch.inverted() + return WarpDetectorInfo( + polygon=patch_polygon, + detector=detector, + patch_to_detector=patch_to_detector, + photo_calib=photo_calib, + ) + + def _select_reference_visits(self, visits: Mapping[int, Mapping[int, WarpDetectorInfo]]) -> list[int]: + if len(visits) <= self.config.n_reference_visits: + return list(visits.keys()) + coverage = { + visit_id: { + detector_id: detector_info.calculateArea() + for detector_id, detector_info in visit_detector_info.items() + } + for visit_id, visit_detector_info in visits.items() + } + target_area = 4 * self.config.bin_size**2 + + def score(visit_id: int) -> tuple[int, float]: + return ( + # Prefer visits with more detectors that have at least the + # target area in the patch. + sum(area > target_area for area in coverage[visit_id].values()), + # To break ties rank by the contribution from the least-covered + # detector. + min(coverage[visit_id].values()), + ) + + reference_visits: list[int] = [] + while len(reference_visits) < self.config.n_reference_visits: + best = max(coverage, key=score) + reference_visits.append(best) + detectors_added = coverage.pop(best) + # In the next iteration, don't consider detectors that area already + # in the reference list towards coverage. + for other_visit_coverage in coverage.values(): + for detector_id in detectors_added: + other_visit_coverage.pop(detector_id, None) + return reference_visits + + def _compute_bin_diffs(self, positive: WarpData, negative: WarpData) -> np.ndarray: + """Compute the piece of the diff table that corresponds on one pair of + warps. + """ + diff = positive.exposure.maskedImage.clone() + diff -= negative.exposure.maskedImage + width = diff.getWidth() + height = diff.getHeight() + nx = math.ceil(width / self.config.binSize) + ny = math.ceil(height / self.config.binSize) + bctrl = lsst.afw.math.BackgroundControl(nx, ny, self.stats_ctrl, self.stats_flag) + # BackgroundControl wants to know how we'll interterpolate, but we're + # not going to use it to interpolate, so it doesn't matter. + bctrl.setUndersampleStyle("REDUCE_INTERP_ORDER") + bctrl.setInterpStyle("AKIMA_SPLINE") + bkgd = lsst.afw.math.makeBackground(diff, bctrl) + stats_image = bkgd.getStatsImage() + result = np.zeros(nx * ny, dtype=self.diff_table_dtype) + result["positive_visit_id"] = positive.visit_id + result["negative_visit_id"] = negative.visit_id + result["bin_row"] = np.arange(0, ny, dtype=result["bin_row"].dtype)[:, np.newaxis] + result["bin_column"] = np.arange(0, nx, dtype=result["bin_row"].dtype)[np.newaxis, :] + result["bin_value"] = stats_image.image.array.ravel() + result["bin_variance"] = stats_image.variance.array.ravel() + x_centers = bkgd.getBinCentersX() + y_centers = bkgd.getBinCentersY() + result["tract_x"], result["tract_y"] = np.meshgrid(x_centers, y_centers) + corners = self._make_bin_corners_array(Box2D(diff.getBBox()), x_centers, y_centers) + positive_detector = self._make_detector_arrays( + corners, result["tract_x"], result["tract_y"], positive.detectors + ) + result["positive_detector_id"] = positive_detector.detector_id + result["positive_camera_x"] = positive_detector.x_camera + result["positive_camera_y"] = positive_detector.y_camera + result["positive_detector_x"] = positive_detector.x_detector + result["positive_detector_y"] = positive_detector.y_detector + result["positive_scaling"] = positive_detector.scaling + negative_detector = self._make_detector_arrays(corners, x_centers, y_centers, negative.detectors) + result["negative_detector_id"] = negative_detector.detector_id + result["negative_camera_x"] = negative_detector.x_camera + result["negative_camera_y"] = negative_detector.y_camera + result["negative_detector_x"] = negative_detector.x_detector + result["negative_detector_y"] = negative_detector.y_detector + result["negative_scaling"] = negative_detector.scaling + mask = np.all( + [ + positive_detector.mask, + negative_detector.mask, + np.isfinite(result["bin_value"]), + result["bin_variance"] > 0, + ], + axis=0, + ) + return result[mask] + + def _make_bin_corners_array( + self, bbox: Box2D, x_centers: np.ndarray, y_centers: np.ndarray + ) -> np.ndarray: + """Construct a 3-d array with the corners of the bins in a single + warp. Shape is ``(n_corners=4, y, x)``. + """ + x_internal_edges = 0.5 * (x_centers[1:] + x_centers[:-1]) + y_internal_edges = 0.5 * (y_centers[1:] + y_centers[:-1]) + x_min_bounds = np.concatenate( + [ + [bbox.x.min + self.config.bin_overlap_erosion], + x_internal_edges - self.config.bin_overlap_erosion, + ] + ) + x_max_bounds = np.concatenate( + [ + x_internal_edges + self.config.bin_overlap_erosion, + [bbox.x.max - self.config.bin_overlap_erosion], + ] + ) + y_min_bounds = np.concatenate( + [ + [bbox.y.min + self.config.bin_overlap_erosion], + y_internal_edges - self.config.bin_overlap_erosion, + ] + ) + y_max_bounds = np.concatenate( + [ + y_internal_edges + self.config.bin_overlap_erosion, + [bbox.y.max - self.config.bin_overlap_erosion], + ] + ) + return np.array( + [ + np.meshgrid(x_min_bounds, y_min_bounds), + np.meshgrid(x_min_bounds, y_max_bounds), + np.meshgrid(x_max_bounds, y_max_bounds), + np.meshgrid(x_max_bounds, y_min_bounds), + ] + ) + + def _make_detector_arrays( + self, + corners: np.ndarray, + tract_x: np.ndarray, + tract_y: np.ndarray, + detectors: Mapping[int, WarpDetectorInfo], + ) -> DiffDetectorArrays: + """Make arrays for columns in the diff table that must be computed + detector-by-detector. + + Parameters + ---------- + corners : `numpy.ndarray` + ``(4, ny, nx)`` array of bin corners. + TODO + detectors : `~collections.abc.Mapping` [ `int`, `WarpDetectorData` ] + Mapping from detector ID to struct that includes the polygon and + coordinate transfrom from patch to focal-plane coordinates. + + Returns + ------- + detector_arrays : `DiffDetectorArrays` + Struct of per-detector arrays. + + Notes + ----- + This assumes that the detector polygons are accurate to less than the + gaps between detectors. This may not be true if chip gaps are tiny + and optical distortion is very large (i.e. so detector boundaries + are not straight lines in tract coordinates). + """ + id_array = np.zeros(corners.shape[1:], dtype=np.uint8) + x_camera = np.zeros(id_array.shape, dtype=np.float64) + y_camera = np.zeros(id_array.shape, dtype=np.float64) + x_detector = np.zeros(id_array.shape, dtype=np.float64) + y_detector = np.zeros(id_array.shape, dtype=np.float64) + scaling = np.zeros(id_array.shape, dtype=np.float64) + mask = np.zeros(id_array.shape, dtype=bool) + for detector_id, detector_info in detectors.items(): + detector_mask = np.all(detector_info.polygon.contains(corners), axis=0) + id_array[detector_mask] = detector_id + xy_detector = detector_info.patch_to_detector.getMapping().applyForward( + np.vstack((tract_x[detector_mask], tract_y[detector_mask])) + ) + scaling[detector_mask] = detector_info.photo_calib.getLocalCalibrationArray( + xy_detector[0], xy_detector[1] + ) + x_detector[detector_mask] = xy_detector[0] + y_detector[detector_mask] = xy_detector[1] + xy_camera = ( + detector_info.detector.getTransform(PIXELS, FOCAL_PLANE) + .getMapping() + .applyForward(xy_detector) + ) + x_camera[detector_mask] = xy_camera[0] + y_camera[detector_mask] = xy_camera[1] + mask = np.logical_or(mask, detector_mask) + return DiffDetectorArrays( + detector_id=id_array, + x_camera=x_camera, + y_camera=y_camera, + x_detector=x_detector, + y_detector=y_detector, + scaling=scaling, + mask=mask, + ) diff --git a/python/lsst/drp/tasks/make_direct_warp.py b/python/lsst/drp/tasks/make_direct_warp.py index b0719ea1..77c3b4bd 100644 --- a/python/lsst/drp/tasks/make_direct_warp.py +++ b/python/lsst/drp/tasks/make_direct_warp.py @@ -91,7 +91,7 @@ def exposure(self) -> ExposureF: return self.exposure_or_handle @property - def background_to_photometric_ratio(self) -> Image: + def background_to_photometric_ratio(self) -> Image | None: """Get the background_to_photometric object, loading if necessary.""" if isinstance(self.background_ratio_or_handle, (DeferredDatasetHandle, InMemoryDatasetHandle)): self.background_ratio_or_handle = self.background_ratio_or_handle.get() @@ -206,6 +206,12 @@ class MakeDirectWarpConnections( storageClass="ImageF", dimensions=("tract", "patch", "skymap", "instrument", "visit"), ) + photometric_to_background_ratio_warp = Output( + doc="Warped version of the inverse of the background_to_photometric_ratio exposure.", + name="photometric_to_background_ratio_warp", + storageClass="ImageF", + dimensions=("tract", "patch", "skymap", "instrument", "visit"), + ) def __init__(self, *, config=None): super().__init__(config=config) @@ -221,6 +227,8 @@ def __init__(self, *, config=None): if not config.doWarpMaskedFraction: del self.masked_fraction_warp + if not config.doWarpPhotometricToBackgroundRatio: + del self.photometric_to_background_ratio_warp # Dynamically set output connections for noise images, depending on the # number of noise realization specified in the config. @@ -336,6 +344,18 @@ class MakeDirectWarpConfig( doc="Configuration for the warp that warps the mask fraction image", dtype=Warper.ConfigClass, ) + doWarpPhotometricToBackgroundRatio = Field[bool]( + doc=( + "Warp the inverse of the background_to_photometric_ratio image? " + "If this is True but doApplyFlatBackgroundRatio is False, an image " + "of ones is warped instead." + ), + default=False, + ) + backgroundToPhotometricRatioWarper = ConfigField( + doc="Configuration for the warp that warps the background_to_photometric ratio", + dtype=Warper.ConfigClass, + ) coaddPsf = ConfigField( doc="Configuration for CoaddPsf", dtype=CoaddPsfConfig, @@ -365,6 +385,7 @@ def setDefaults(self) -> None: self.warper.warpingKernelName = "lanczos3" self.warper.cacheSize = 0 self.maskedFractionWarper.warpingKernelName = "bilinear" + self.backgroundToPhotometricRatioWarper.warpingKernelName = "bilinear" class MakeDirectWarpTask(PipelineTask): @@ -400,6 +421,10 @@ def __init__(self, **kwargs): self.warper = Warper.fromConfig(self.config.warper) if self.config.doWarpMaskedFraction: self.maskedFractionWarper = Warper.fromConfig(self.config.maskedFractionWarper) + if self.config.doWarpPhotometricToBackgroundRatio: + self.backgroundToPhotometricRatioWarper = Warper.fromConfig( + self.config.backgroundToPhotometricRatioWarper + ) def runQuantum(self, butlerQC, inputRefs, outputRefs): # Docstring inherited. @@ -528,7 +553,10 @@ def run(self, inputs: Mapping[int, WarpDetectorInputs], sky_info, visit_summary) # So we create empty exposure(s) to accumulate the warps of each type, # and then process each detector serially. final_warp = self._prepareEmptyExposure(sky_info) - final_masked_fraction_warp = self._prepareEmptyExposure(sky_info) + if self.config.doWarpMaskedFraction: + final_masked_fraction_warp = self._prepareEmptyExposure(sky_info) + if self.config.doWarpPhotometricToBackgroundRatio: + final_photometric_to_background_ratio_warp = self._prepareEmptyExposure(sky_info) final_noise_warps = { n_noise: self._prepareEmptyExposure(sky_info) for n_noise in range(self.config.numberOfNoiseRealizations) @@ -603,6 +631,22 @@ def run(self, inputs: Mapping[int, WarpDetectorInputs], sky_info, visit_summary) final_masked_fraction_warp.mask.getPlaneBitMask(["NO_DATA"]), ) + if self.config.doWarpPhotometricToBackgroundRatio: + ptbr_exp = input_exposure.clone() + ptbr_exp.image.array[:, :] = 1.0 + ptbr_exp.mask.array[:, :] = 0 + ptbr_exp.variance.array[:, :] = 1.0 + if detector_inputs.background_to_photometric_ratio is not None: + ptbr_exp.image.array[:, :] /= detector_inputs.background_to_photometric_ratio.array[:, :] + btpr_warp = self.backgroundToPhotometricRatioWarper.warpExposure( + target_wcs, ptbr_exp, destBBox=target_bbox + ) + copyGoodPixels( + final_photometric_to_background_ratio_warp.maskedImage, + btpr_warp.maskedImage, + final_photometric_to_background_ratio_warp.mask.getPlaneBitMask(["NO_DATA"]), + ) + # Process and accumulate noise images. for n_noise in range(self.config.numberOfNoiseRealizations): noise_calexp = noise_calexps[n_noise] @@ -659,6 +703,9 @@ def run(self, inputs: Mapping[int, WarpDetectorInputs], sky_info, visit_summary) if self.config.doWarpMaskedFraction: results.masked_fraction_warp = final_masked_fraction_warp.image + if self.config.doWarpPhotometricToBackgroundRatio: + results.photometric_to_background_ratio_warp = final_photometric_to_background_ratio_warp.image + for noise_index, noise_exposure in final_noise_warps.items(): setattr(results, f"noise_warp{noise_index}", noise_exposure.maskedImage)