diff --git a/cosipy/event_selection/good_time_interval.py b/cosipy/event_selection/good_time_interval.py index e135e1e4..ef42a65a 100644 --- a/cosipy/event_selection/good_time_interval.py +++ b/cosipy/event_selection/good_time_interval.py @@ -231,6 +231,82 @@ def from_pointing_cut(cls, return cls(sc_history.intervals_tstart[start_idx], sc_history.intervals_tstop[stop_idx]) + @classmethod + def from_region_cut(cls, + region: 'Histogram', + sc_history: 'SpacecraftHistory', + earth_occ: bool = False, + earth_occ_mode: str = 'all'): + """ + Build a GTI where the spacecraft z-pointing is within a given sky region. + + Parameters + ---------- + region : histpy.Histogram + 1-D boolean Histogram with a single HealpixAxis defining the on-region. + Pixels whose value is True are considered "on-region". + sc_history : cosipy.spacecraftfile.SpacecraftHistory + Spacecraft pointinghistory to evaluate (.ori file). + earth_occ : bool, optional + If True, exclude time bins in which the target is occulted + by the Earth. Default is False. + earth_occ_mode : {'all', 'any'}, optional + Controls how Earth occultation is evaluated across on-region pixels. + 'all' : *all* on-region pixels must be unoccluded (stricter). + 'any' : *at least one* on-region pixel must be unoccluded (looser). + Only used when earth_occ=True. Default is 'all'. + + Returns + ------- + GoodTimeInterval + GTI containing time ranges where the z-pointing condition is satisfied. + """ + + if earth_occ and earth_occ_mode not in ('all', 'any'): + raise ValueError(f"earth_occ_mode must be 'all' or 'any', got '{earth_occ_mode}'") + + _, _, z_gal = sc_history.attitude[:-1].transform_to('galactic').as_axes() + + z_l = z_gal.l.deg + z_b = z_gal.b.deg + + z_hp_idx = region.axis.ang2pix(z_l, z_b, lonlat=True) + + # Bool mask: is the z-pointing pixel inside the on-region? + in_region = np.asarray(region[z_hp_idx], dtype=bool) + + if earth_occ: + on_pixels = np.flatnonzero(np.asarray(region[:], dtype=bool)) + on_region_coords = region.axis.pix2skycoord(on_pixels) # shape (n_on_pix,) + + occulted = np.array([sc_history.get_earth_occ(coord) + for coord in on_region_coords]) + + # Align with time bins: use left-edge + occulted_bins = occulted[:, :-1] + + if earth_occ_mode == 'all': + earth_ok = np.all(~occulted_bins, axis=0) + else: + earth_ok = np.any(~occulted_bins, axis=0) + + in_region = in_region & earth_ok + + if not np.any(in_region): + empty_time = Time([], + format=sc_history.intervals_tstart.format, + scale=sc_history.intervals_tstart.scale) + return cls(empty_time, empty_time.copy()) + + edges = np.flatnonzero( + np.diff(np.concatenate(([False], in_region, [False]))) + ) + start_idx = edges[::2] + stop_idx = edges[1::2] - 1 + + return cls(sc_history.intervals_tstart[start_idx], + sc_history.intervals_tstop[stop_idx]) + @classmethod def intersection(cls, *gti_list): """ diff --git a/cosipy/spacecraftfile/spacecraft_file.py b/cosipy/spacecraftfile/spacecraft_file.py index d6054171..1eeee9cb 100644 --- a/cosipy/spacecraftfile/spacecraft_file.py +++ b/cosipy/spacecraftfile/spacecraft_file.py @@ -879,10 +879,7 @@ def apply_gti(self, gti: GoodTimeInterval) -> "SpacecraftHistory": new_location = None new_livetime = None - for i, (start, stop) in enumerate(zip(gti.tstart_list, gti.tstop_list)): - # TODO: this line can be replaced with the following line - # after the PR in the develop branch is merged. - #for i, (start, stop) in enumerate(gti): + for i, (start, stop) in enumerate(gti): _sph = self.select_interval(start, stop) _obstime = _sph.obstime diff --git a/tests/event_selection/test_GTI.py b/tests/event_selection/test_GTI.py index 1d96c71e..5d965ed8 100644 --- a/tests/event_selection/test_GTI.py +++ b/tests/event_selection/test_GTI.py @@ -4,8 +4,10 @@ from astropy.coordinates import Galactic, SkyCoord from astropy.time import Time from types import SimpleNamespace +import healpy as hp -from scoords import SpacecraftFrame +from scoords import Attitude, SpacecraftFrame +from histpy import HealpixAxis, Histogram from cosipy import SpacecraftHistory from cosipy.event_selection import GoodTimeInterval @@ -13,25 +15,29 @@ class DummySpacecraftHistory(SpacecraftHistory): - def __init__(self, colatitude, earth_occ=None): + def __init__(self, colatitude, earth_occ=None, attitude=None): self._colatitude = np.asarray(colatitude, dtype=float) if earth_occ is None: earth_occ = np.zeros_like(self._colatitude, dtype=bool) self._earth_occ = np.asarray(earth_occ, dtype=bool) + self._attitude = attitude # optional: scoords.Attitude for from_region_cut + self._occ_call_idx = 0 # row counter for 2D earth_occ (from_region_cut) @property def intervals_tstart(self): return Time([60970.0, 60971.0, 60972.0, 60973.0, 60974.0], - format='mjd', scale='utc') + format='mjd', scale='utc') @property def intervals_tstop(self): return Time([60971.0, 60972.0, 60973.0, 60974.0, 60975.0], - format='mjd', scale='utc') + format='mjd', scale='utc') @property def attitude(self): + if self._attitude is not None: + return self._attitude return SimpleNamespace(frame=Galactic()) @property @@ -39,27 +45,78 @@ def location(self): return Galactic() def get_target_in_sc_frame(self, source): - return SkyCoord(lon = np.zeros_like(self._colatitude), lat = np.pi/2 - self._colatitude, unit = 'rad', frame = SpacecraftFrame()) + return SkyCoord(lon=np.zeros_like(self._colatitude), + lat=np.pi/2 - self._colatitude, + unit='rad', frame=SpacecraftFrame()) def get_earth_occ(self, source): + # 2D (n_on_pix, npoints): return one row per call (from_region_cut) + if self._earth_occ.ndim == 2: + row = self._earth_occ[self._occ_call_idx] + self._occ_call_idx += 1 + return row + # 1D (npoints,): return as-is (from_pointing_cut) return self._earth_occ + +def _make_region(nside=4): + """Boolean Histogram covering the GC pixel and its 8 neighbours.""" + ax = HealpixAxis(nside=nside, scheme='ring', coordsys='galactic', label='lb') + gc_pix = ax.ang2pix(0.0, 0.0, lonlat=True) + neighbors = hp.get_all_neighbours(nside, gc_pix, nest=False) + on_pixels = np.append(neighbors[neighbors >= 0], gc_pix) + region = Histogram(ax, contents=np.zeros(ax.npix, dtype=bool)) + region.contents[on_pixels] = True + return region + + +def _make_attitude(lons_deg, lats_deg): + """Minimal Attitude array from Galactic z-pointing directions.""" + lons = np.asarray(lons_deg, dtype=float) + lats = np.asarray(lats_deg, dtype=float) + z_pts = SkyCoord(l=lons * u.deg, b=lats * u.deg, frame=Galactic()) + x_pts = SkyCoord(l=(lons + 90) * u.deg, b=np.zeros_like(lons) * u.deg, + frame=Galactic()) + return Attitude.from_axes(x=x_pts, z=z_pts, frame=Galactic()) + + +def _make_sc_region(region, pattern): + """ + Build a DummySpacecraftHistory whose z-pointing follows `pattern`. + + Parameters + ---------- + pattern : list of bool, length 5 + True = pointing inside the region, False = outside. + Internally padded to 6 edge-points (one extra 'out' at the end). + """ + ax = region.axis + gc_pix = ax.ang2pix(0.0, 0.0, lonlat=True) + lon_in, lat_in = ax.pix2ang(int(gc_pix), lonlat=True) + lon_out, lat_out = ax.pix2ang(0, lonlat=True) + + # attitude has npoints = nintervals + 1 edges; pad one extra point at the end + lons = [lon_in if p else lon_out for p in pattern] + [lon_out] + lats = [lat_in if p else lat_out for p in pattern] + [lat_out] + return DummySpacecraftHistory(colatitude=[], attitude=_make_attitude(lons, lats)) + + def test_GTI(tmp_path): # test with 1 range - gti = GoodTimeInterval(Time(60970.0, format='mjd', scale = 'utc'), - Time(60975.0, format='mjd', scale = 'utc')) + gti = GoodTimeInterval(Time(60970.0, format='mjd', scale='utc'), + Time(60975.0, format='mjd', scale='utc')) # A single start/stop pair should produce exactly one GTI interval. assert len(gti) == 1 - + # test with 2 ranges - tstarts = Time([60970.0, 60980.0], format='mjd', scale = 'utc') - tstops = Time([60975.0, 60985.0], format='mjd', scale = 'utc') + tstarts = Time([60970.0, 60980.0], format='mjd', scale='utc') + tstops = Time([60975.0, 60985.0], format='mjd', scale='utc') gti = GoodTimeInterval(tstarts, tstops) assert len(gti) == 2 - + # check the values tstarts = gti.tstart_list tstops = gti.tstop_list @@ -67,7 +124,7 @@ def test_GTI(tmp_path): for i, (tstart, tstop) in enumerate(gti): # Iteration, list access, and direct indexing should all agree. assert tstart == tstarts[i] == gti[i][0] - assert tstop == tstops[i] == gti[i][1] + assert tstop == tstops[i] == gti[i][1] # save file gti.save_as_fits(tmp_path / 'gti.fits') @@ -79,36 +136,32 @@ def test_GTI(tmp_path): # intersection - #GTI1 - tstarts_1 = Time([60970.0, 60980.0], format='mjd', scale = 'utc') - tstops_1 = Time([60975.0, 60985.0], format='mjd', scale = 'utc') - + # GTI1 + tstarts_1 = Time([60970.0, 60980.0], format='mjd', scale='utc') + tstops_1 = Time([60975.0, 60985.0], format='mjd', scale='utc') gti1 = GoodTimeInterval(tstarts_1, tstops_1) - - #GTI2 - tstarts_2 = Time([60972.0, 60979.0], format='mjd', scale = 'utc') - tstops_2 = Time([60977.0, 60983.0], format='mjd', scale = 'utc') - + + # GTI2 + tstarts_2 = Time([60972.0, 60979.0], format='mjd', scale='utc') + tstops_2 = Time([60977.0, 60983.0], format='mjd', scale='utc') gti2 = GoodTimeInterval(tstarts_2, tstops_2) - - #GTI3 - tstarts_3 = Time([60970.0], format='mjd', scale = 'utc') - tstops_3 = Time([60990.0], format='mjd', scale = 'utc') - + + # GTI3 + tstarts_3 = Time([60970.0], format='mjd', scale='utc') + tstops_3 = Time([60990.0], format='mjd', scale='utc') gti3 = GoodTimeInterval(tstarts_3, tstops_3) - - #Intersection + + # Intersection gti_intersection = GoodTimeInterval.intersection(gti1, gti2, gti3) # Intersections should keep only the time ranges shared by all GTIs. - assert np.all(gti_intersection.tstart_list == Time([60972.0, 60980.0], format='mjd', scale = 'utc')) - assert np.all(gti_intersection.tstop_list == Time([60975.0, 60983.0], format='mjd', scale = 'utc')) - - #Intersection with no components - #GTI4 - tstarts_4 = Time([60950.0], format='mjd', scale = 'utc') - tstops_4 = Time([60960.0], format='mjd', scale = 'utc') - + assert np.all(gti_intersection.tstart_list == Time([60972.0, 60980.0], format='mjd', scale='utc')) + assert np.all(gti_intersection.tstop_list == Time([60975.0, 60983.0], format='mjd', scale='utc')) + + # Intersection with no components + # GTI4 + tstarts_4 = Time([60950.0], format='mjd', scale='utc') + tstops_4 = Time([60960.0], format='mjd', scale='utc') gti4 = GoodTimeInterval(tstarts_4, tstops_4) gti_intersection_no_components = GoodTimeInterval.intersection(gti1, gti4) @@ -127,7 +180,7 @@ def test_gti_from_pointing_cut(): # Adjacent in-FoV bins should be merged into GTI ranges. assert np.all(gti.tstart_list == Time([60971.0, 60974.0], format='mjd', scale='utc')) - assert np.all(gti.tstop_list == Time([60973.0, 60975.0], format='mjd', scale='utc')) + assert np.all(gti.tstop_list == Time([60973.0, 60975.0], format='mjd', scale='utc')) def test_gti_from_pointing_cut_with_earth_occultation(): @@ -141,7 +194,7 @@ def test_gti_from_pointing_cut_with_earth_occultation(): # Earth-occulted bins should be removed before GTI ranges are built. assert np.all(gti.tstart_list == Time([60972.0, 60974.0], format='mjd', scale='utc')) - assert np.all(gti.tstop_list == Time([60973.0, 60975.0], format='mjd', scale='utc')) + assert np.all(gti.tstop_list == Time([60973.0, 60975.0], format='mjd', scale='utc')) def test_gti_from_pointing_cut_empty(): @@ -154,3 +207,86 @@ def test_gti_from_pointing_cut_empty(): # If no bin passes the pointing cut, the GTI should be empty. assert len(gti) == 0 + + +def test_from_region_cut_basic(): + """Bins [F,T,T,F,T] produce two GTI intervals.""" + region = _make_region() + sc = _make_sc_region(region, [False, True, True, False, True]) + + gti = GoodTimeInterval.from_region_cut(region, sc) + + assert len(gti) == 2 + assert np.all(gti.tstart_list == Time([60971., 60974.], format='mjd', scale='utc')) + assert np.all(gti.tstop_list == Time([60973., 60975.], format='mjd', scale='utc')) + + +def test_from_region_cut_empty(): + """No bin inside the region -> empty GTI.""" + region = _make_region() + sc = _make_sc_region(region, [False, False, False, False, False]) + + gti = GoodTimeInterval.from_region_cut(region, sc) + + assert len(gti) == 0 + + +def test_from_region_cut_all_in(): + """All bins inside the region -> single GTI covering the full range.""" + region = _make_region() + sc = _make_sc_region(region, [True, True, True, True, True]) + + gti = GoodTimeInterval.from_region_cut(region, sc) + + assert len(gti) == 1 + assert gti.tstart_list[0] == Time(60970., format='mjd', scale='utc') + assert gti.tstop_list[0] == Time(60975., format='mjd', scale='utc') + + +def test_from_region_cut_earth_occ_all_vs_any(): + """ + Verify that earth_occ_mode='all' and 'any' produce different results. + + Setup: all 5 bins point into the region. + - bin 1: all on-region pixels occulted -> excluded by both 'all' and 'any' + - bin 3: only pixel 0 occulted -> excluded by 'all', kept by 'any' + + Expected: + mode='all': [T,F,T,F,T] -> 3 intervals + mode='any': [T,F,T,T,T] -> bin 2,3,4 merge -> 2 intervals + """ + region = _make_region() + n_on = int(np.sum(np.asarray(region[:], dtype=bool))) + + # earth_occ 2D: shape (n_on, npoints=6) + earth_occ = np.zeros((n_on, 6), dtype=bool) + earth_occ[:, 1] = True # bin 1: all pixels occulted + earth_occ[0, 3] = True # bin 3: only pixel 0 occulted + + sc_all = _make_sc_region(region, [True, True, True, True, True]) + sc_all._earth_occ = earth_occ + + sc_any = _make_sc_region(region, [True, True, True, True, True]) + sc_any._earth_occ = earth_occ + + gti_all = GoodTimeInterval.from_region_cut(region, sc_all, earth_occ=True, earth_occ_mode='all') + gti_any = GoodTimeInterval.from_region_cut(region, sc_any, earth_occ=True, earth_occ_mode='any') + + # mode='all': bin 1 and bin 3 both excluded -> 3 separate intervals + assert len(gti_all) == 3 + assert np.all(gti_all.tstart_list == Time([60970., 60972., 60974.], format='mjd', scale='utc')) + assert np.all(gti_all.tstop_list == Time([60971., 60973., 60975.], format='mjd', scale='utc')) + + # mode='any': only bin 1 excluded; bin 3 kept -> bin 2,3,4 merge into one interval + assert len(gti_any) == 2 + assert np.all(gti_any.tstart_list == Time([60970., 60972.], format='mjd', scale='utc')) + assert np.all(gti_any.tstop_list == Time([60971., 60975.], format='mjd', scale='utc')) + + +def test_from_region_cut_invalid_earth_occ_mode(): + """Invalid earth_occ_mode raises ValueError.""" + region = _make_region() + sc = _make_sc_region(region, [True, True, True, True, True]) + + with pytest.raises(ValueError, match="earth_occ_mode"): + GoodTimeInterval.from_region_cut(region, sc, earth_occ=True, earth_occ_mode='invalid')