From 36ee305fcc777fdf40cf4f3c0f80c2cdaef8279a Mon Sep 17 00:00:00 2001 From: Hiroki Yoneda Date: Wed, 25 Mar 2026 19:09:22 +0900 Subject: [PATCH 1/4] Added from_region_cut to GTI --- cosipy/event_selection/good_time_interval.py | 76 ++++++++++++++++++++ 1 file changed, 76 insertions(+) diff --git a/cosipy/event_selection/good_time_interval.py b/cosipy/event_selection/good_time_interval.py index e135e1e4..ef42a65a 100644 --- a/cosipy/event_selection/good_time_interval.py +++ b/cosipy/event_selection/good_time_interval.py @@ -231,6 +231,82 @@ def from_pointing_cut(cls, return cls(sc_history.intervals_tstart[start_idx], sc_history.intervals_tstop[stop_idx]) + @classmethod + def from_region_cut(cls, + region: 'Histogram', + sc_history: 'SpacecraftHistory', + earth_occ: bool = False, + earth_occ_mode: str = 'all'): + """ + Build a GTI where the spacecraft z-pointing is within a given sky region. + + Parameters + ---------- + region : histpy.Histogram + 1-D boolean Histogram with a single HealpixAxis defining the on-region. + Pixels whose value is True are considered "on-region". + sc_history : cosipy.spacecraftfile.SpacecraftHistory + Spacecraft pointinghistory to evaluate (.ori file). + earth_occ : bool, optional + If True, exclude time bins in which the target is occulted + by the Earth. Default is False. + earth_occ_mode : {'all', 'any'}, optional + Controls how Earth occultation is evaluated across on-region pixels. + 'all' : *all* on-region pixels must be unoccluded (stricter). + 'any' : *at least one* on-region pixel must be unoccluded (looser). + Only used when earth_occ=True. Default is 'all'. + + Returns + ------- + GoodTimeInterval + GTI containing time ranges where the z-pointing condition is satisfied. + """ + + if earth_occ and earth_occ_mode not in ('all', 'any'): + raise ValueError(f"earth_occ_mode must be 'all' or 'any', got '{earth_occ_mode}'") + + _, _, z_gal = sc_history.attitude[:-1].transform_to('galactic').as_axes() + + z_l = z_gal.l.deg + z_b = z_gal.b.deg + + z_hp_idx = region.axis.ang2pix(z_l, z_b, lonlat=True) + + # Bool mask: is the z-pointing pixel inside the on-region? + in_region = np.asarray(region[z_hp_idx], dtype=bool) + + if earth_occ: + on_pixels = np.flatnonzero(np.asarray(region[:], dtype=bool)) + on_region_coords = region.axis.pix2skycoord(on_pixels) # shape (n_on_pix,) + + occulted = np.array([sc_history.get_earth_occ(coord) + for coord in on_region_coords]) + + # Align with time bins: use left-edge + occulted_bins = occulted[:, :-1] + + if earth_occ_mode == 'all': + earth_ok = np.all(~occulted_bins, axis=0) + else: + earth_ok = np.any(~occulted_bins, axis=0) + + in_region = in_region & earth_ok + + if not np.any(in_region): + empty_time = Time([], + format=sc_history.intervals_tstart.format, + scale=sc_history.intervals_tstart.scale) + return cls(empty_time, empty_time.copy()) + + edges = np.flatnonzero( + np.diff(np.concatenate(([False], in_region, [False]))) + ) + start_idx = edges[::2] + stop_idx = edges[1::2] - 1 + + return cls(sc_history.intervals_tstart[start_idx], + sc_history.intervals_tstop[stop_idx]) + @classmethod def intersection(cls, *gti_list): """ From a686a989f67b74df42cfe1d390c568cb236b92ec Mon Sep 17 00:00:00 2001 From: Hiroki Yoneda Date: Wed, 25 Mar 2026 19:10:07 +0900 Subject: [PATCH 2/4] Minor modification of SpacecraftHistory --- cosipy/spacecraftfile/spacecraft_file.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/cosipy/spacecraftfile/spacecraft_file.py b/cosipy/spacecraftfile/spacecraft_file.py index d6054171..1eeee9cb 100644 --- a/cosipy/spacecraftfile/spacecraft_file.py +++ b/cosipy/spacecraftfile/spacecraft_file.py @@ -879,10 +879,7 @@ def apply_gti(self, gti: GoodTimeInterval) -> "SpacecraftHistory": new_location = None new_livetime = None - for i, (start, stop) in enumerate(zip(gti.tstart_list, gti.tstop_list)): - # TODO: this line can be replaced with the following line - # after the PR in the develop branch is merged. - #for i, (start, stop) in enumerate(gti): + for i, (start, stop) in enumerate(gti): _sph = self.select_interval(start, stop) _obstime = _sph.obstime From e95db339fbe36291ba6f2e59d84de118155d605d Mon Sep 17 00:00:00 2001 From: Hiroki Yoneda Date: Wed, 25 Mar 2026 19:10:39 +0900 Subject: [PATCH 3/4] Added unittest for from_region_cut --- tests/event_selection/test_GTI.py | 200 ++++++++++++++++++++++++------ 1 file changed, 162 insertions(+), 38 deletions(-) diff --git a/tests/event_selection/test_GTI.py b/tests/event_selection/test_GTI.py index 1d96c71e..1763c028 100644 --- a/tests/event_selection/test_GTI.py +++ b/tests/event_selection/test_GTI.py @@ -4,8 +4,10 @@ from astropy.coordinates import Galactic, SkyCoord from astropy.time import Time from types import SimpleNamespace +import healpy as hp -from scoords import SpacecraftFrame +from scoords import Attitude, SpacecraftFrame +from histpy import HealpixAxis, Histogram from cosipy import SpacecraftHistory from cosipy.event_selection import GoodTimeInterval @@ -13,25 +15,28 @@ class DummySpacecraftHistory(SpacecraftHistory): - def __init__(self, colatitude, earth_occ=None): + def __init__(self, colatitude, earth_occ=None, attitude=None): self._colatitude = np.asarray(colatitude, dtype=float) if earth_occ is None: earth_occ = np.zeros_like(self._colatitude, dtype=bool) self._earth_occ = np.asarray(earth_occ, dtype=bool) + self._attitude = attitude # optional: scoords.Attitude for from_region_cut @property def intervals_tstart(self): return Time([60970.0, 60971.0, 60972.0, 60973.0, 60974.0], - format='mjd', scale='utc') + format='mjd', scale='utc') @property def intervals_tstop(self): return Time([60971.0, 60972.0, 60973.0, 60974.0, 60975.0], - format='mjd', scale='utc') + format='mjd', scale='utc') @property def attitude(self): + if self._attitude is not None: + return self._attitude return SimpleNamespace(frame=Galactic()) @property @@ -39,27 +44,72 @@ def location(self): return Galactic() def get_target_in_sc_frame(self, source): - return SkyCoord(lon = np.zeros_like(self._colatitude), lat = np.pi/2 - self._colatitude, unit = 'rad', frame = SpacecraftFrame()) + return SkyCoord(lon=np.zeros_like(self._colatitude), + lat=np.pi/2 - self._colatitude, + unit='rad', frame=SpacecraftFrame()) def get_earth_occ(self, source): return self._earth_occ + +def _make_region(nside=4): + """Boolean Histogram covering the GC pixel and its 8 neighbours.""" + ax = HealpixAxis(nside=nside, scheme='ring', coordsys='galactic', label='lb') + gc_pix = ax.ang2pix(0.0, 0.0, lonlat=True) + neighbors = hp.get_all_neighbours(nside, gc_pix, nest=False) + on_pixels = np.append(neighbors[neighbors >= 0], gc_pix) + region = Histogram(ax, contents=np.zeros(ax.npix, dtype=bool)) + region.contents[on_pixels] = True + return region + + +def _make_attitude(lons_deg, lats_deg): + """Minimal Attitude array from Galactic z-pointing directions.""" + lons = np.asarray(lons_deg, dtype=float) + lats = np.asarray(lats_deg, dtype=float) + z_pts = SkyCoord(l=lons * u.deg, b=lats * u.deg, frame=Galactic()) + x_pts = SkyCoord(l=(lons + 90) * u.deg, b=np.zeros_like(lons) * u.deg, + frame=Galactic()) + return Attitude.from_axes(x=x_pts, z=z_pts, frame=Galactic()) + + +def _make_sc_region(region, pattern): + """ + Build a DummySpacecraftHistory whose z-pointing follows `pattern`. + + Parameters + ---------- + pattern : list of bool, length 5 + True = pointing inside the region, False = outside. + Internally padded to 6 edge-points (one extra 'out' at the end). + """ + ax = region.axis + gc_pix = ax.ang2pix(0.0, 0.0, lonlat=True) + lon_in, lat_in = ax.pix2ang(int(gc_pix), lonlat=True) + lon_out, lat_out = ax.pix2ang(0, lonlat=True) + + # attitude has npoints = nintervals + 1 edges; pad one extra point at the end + lons = [lon_in if p else lon_out for p in pattern] + [lon_out] + lats = [lat_in if p else lat_out for p in pattern] + [lat_out] + return DummySpacecraftHistory(colatitude=[], attitude=_make_attitude(lons, lats)) + + def test_GTI(tmp_path): # test with 1 range - gti = GoodTimeInterval(Time(60970.0, format='mjd', scale = 'utc'), - Time(60975.0, format='mjd', scale = 'utc')) + gti = GoodTimeInterval(Time(60970.0, format='mjd', scale='utc'), + Time(60975.0, format='mjd', scale='utc')) # A single start/stop pair should produce exactly one GTI interval. assert len(gti) == 1 - + # test with 2 ranges - tstarts = Time([60970.0, 60980.0], format='mjd', scale = 'utc') - tstops = Time([60975.0, 60985.0], format='mjd', scale = 'utc') + tstarts = Time([60970.0, 60980.0], format='mjd', scale='utc') + tstops = Time([60975.0, 60985.0], format='mjd', scale='utc') gti = GoodTimeInterval(tstarts, tstops) assert len(gti) == 2 - + # check the values tstarts = gti.tstart_list tstops = gti.tstop_list @@ -67,7 +117,7 @@ def test_GTI(tmp_path): for i, (tstart, tstop) in enumerate(gti): # Iteration, list access, and direct indexing should all agree. assert tstart == tstarts[i] == gti[i][0] - assert tstop == tstops[i] == gti[i][1] + assert tstop == tstops[i] == gti[i][1] # save file gti.save_as_fits(tmp_path / 'gti.fits') @@ -79,36 +129,32 @@ def test_GTI(tmp_path): # intersection - #GTI1 - tstarts_1 = Time([60970.0, 60980.0], format='mjd', scale = 'utc') - tstops_1 = Time([60975.0, 60985.0], format='mjd', scale = 'utc') - + # GTI1 + tstarts_1 = Time([60970.0, 60980.0], format='mjd', scale='utc') + tstops_1 = Time([60975.0, 60985.0], format='mjd', scale='utc') gti1 = GoodTimeInterval(tstarts_1, tstops_1) - - #GTI2 - tstarts_2 = Time([60972.0, 60979.0], format='mjd', scale = 'utc') - tstops_2 = Time([60977.0, 60983.0], format='mjd', scale = 'utc') - + + # GTI2 + tstarts_2 = Time([60972.0, 60979.0], format='mjd', scale='utc') + tstops_2 = Time([60977.0, 60983.0], format='mjd', scale='utc') gti2 = GoodTimeInterval(tstarts_2, tstops_2) - - #GTI3 - tstarts_3 = Time([60970.0], format='mjd', scale = 'utc') - tstops_3 = Time([60990.0], format='mjd', scale = 'utc') - + + # GTI3 + tstarts_3 = Time([60970.0], format='mjd', scale='utc') + tstops_3 = Time([60990.0], format='mjd', scale='utc') gti3 = GoodTimeInterval(tstarts_3, tstops_3) - - #Intersection + + # Intersection gti_intersection = GoodTimeInterval.intersection(gti1, gti2, gti3) # Intersections should keep only the time ranges shared by all GTIs. - assert np.all(gti_intersection.tstart_list == Time([60972.0, 60980.0], format='mjd', scale = 'utc')) - assert np.all(gti_intersection.tstop_list == Time([60975.0, 60983.0], format='mjd', scale = 'utc')) - - #Intersection with no components - #GTI4 - tstarts_4 = Time([60950.0], format='mjd', scale = 'utc') - tstops_4 = Time([60960.0], format='mjd', scale = 'utc') - + assert np.all(gti_intersection.tstart_list == Time([60972.0, 60980.0], format='mjd', scale='utc')) + assert np.all(gti_intersection.tstop_list == Time([60975.0, 60983.0], format='mjd', scale='utc')) + + # Intersection with no components + # GTI4 + tstarts_4 = Time([60950.0], format='mjd', scale='utc') + tstops_4 = Time([60960.0], format='mjd', scale='utc') gti4 = GoodTimeInterval(tstarts_4, tstops_4) gti_intersection_no_components = GoodTimeInterval.intersection(gti1, gti4) @@ -127,7 +173,7 @@ def test_gti_from_pointing_cut(): # Adjacent in-FoV bins should be merged into GTI ranges. assert np.all(gti.tstart_list == Time([60971.0, 60974.0], format='mjd', scale='utc')) - assert np.all(gti.tstop_list == Time([60973.0, 60975.0], format='mjd', scale='utc')) + assert np.all(gti.tstop_list == Time([60973.0, 60975.0], format='mjd', scale='utc')) def test_gti_from_pointing_cut_with_earth_occultation(): @@ -141,7 +187,7 @@ def test_gti_from_pointing_cut_with_earth_occultation(): # Earth-occulted bins should be removed before GTI ranges are built. assert np.all(gti.tstart_list == Time([60972.0, 60974.0], format='mjd', scale='utc')) - assert np.all(gti.tstop_list == Time([60973.0, 60975.0], format='mjd', scale='utc')) + assert np.all(gti.tstop_list == Time([60973.0, 60975.0], format='mjd', scale='utc')) def test_gti_from_pointing_cut_empty(): @@ -154,3 +200,81 @@ def test_gti_from_pointing_cut_empty(): # If no bin passes the pointing cut, the GTI should be empty. assert len(gti) == 0 + + +def test_from_region_cut_basic(): + """Bins [F,T,T,F,T] produce two GTI intervals.""" + region = _make_region() + sc = _make_sc_region(region, [False, True, True, False, True]) + + gti = GoodTimeInterval.from_region_cut(region, sc) + + assert len(gti) == 2 + assert np.all(gti.tstart_list == Time([60971., 60974.], format='mjd', scale='utc')) + assert np.all(gti.tstop_list == Time([60973., 60975.], format='mjd', scale='utc')) + + +def test_from_region_cut_empty(): + """No bin inside the region -> empty GTI.""" + region = _make_region() + sc = _make_sc_region(region, [False, False, False, False, False]) + + gti = GoodTimeInterval.from_region_cut(region, sc) + + assert len(gti) == 0 + + +def test_from_region_cut_all_in(): + """All bins inside the region -> single GTI covering the full range.""" + region = _make_region() + sc = _make_sc_region(region, [True, True, True, True, True]) + + gti = GoodTimeInterval.from_region_cut(region, sc) + + assert len(gti) == 1 + assert gti.tstart_list[0] == Time(60970., format='mjd', scale='utc') + assert gti.tstop_list[0] == Time(60975., format='mjd', scale='utc') + + +def test_from_region_cut_earth_occ_all(): + """ + mode='all': bin 4 is occulted for one on-region pixel -> excluded. + Result: only the first interval [60971, 60973] survives. + """ + region = _make_region() + sc = _make_sc_region(region, [False, True, True, False, True]) + + # earth_occ is 1D (npoints=6); get_earth_occ is called once per on-region pixel. + # Occulted at edge-point 4 for the first pixel only. + sc._earth_occ = np.array([False, False, False, False, True, False]) + + gti = GoodTimeInterval.from_region_cut(region, sc, earth_occ=True, earth_occ_mode='all') + + assert len(gti) == 1 + assert np.all(gti.tstart_list == Time([60971.], format='mjd', scale='utc')) + assert np.all(gti.tstop_list == Time([60973.], format='mjd', scale='utc')) + + +def test_from_region_cut_earth_occ_any(): + """ + mode='any': bin 4 has one pixel occulted but others are fine -> kept. + Result: both intervals survive unchanged. + """ + region = _make_region() + sc = _make_sc_region(region, [False, True, True, False, True]) + sc._earth_occ = np.array([False, False, False, False, True, False]) + + gti = GoodTimeInterval.from_region_cut(region, sc, earth_occ=True, earth_occ_mode='any') + + assert len(gti) == 1 + assert np.all(gti.tstart_list == Time([60971.], format='mjd', scale='utc')) + assert np.all(gti.tstop_list == Time([60973.], format='mjd', scale='utc')) + + +def test_from_region_cut_invalid_earth_occ_mode(): + """Invalid earth_occ_mode raises ValueError.""" + region = _make_region() + sc = _make_sc_region(region, [True, True, True, True, True]) + + with pytest.raises(ValueError, match="earth_occ_mode"): + GoodTimeInterval.from_region_cut(region, sc, earth_occ=True, earth_occ_mode='invalid') From bda2988679b46b22a043287d665fb1de40394e0a Mon Sep 17 00:00:00 2001 From: Hiroki Yoneda Date: Wed, 25 Mar 2026 19:19:47 +0900 Subject: [PATCH 4/4] Modified unittest for from_region_cut --- tests/event_selection/test_GTI.py | 72 ++++++++++++++++++------------- 1 file changed, 42 insertions(+), 30 deletions(-) diff --git a/tests/event_selection/test_GTI.py b/tests/event_selection/test_GTI.py index 1763c028..5d965ed8 100644 --- a/tests/event_selection/test_GTI.py +++ b/tests/event_selection/test_GTI.py @@ -22,6 +22,7 @@ def __init__(self, colatitude, earth_occ=None, attitude=None): earth_occ = np.zeros_like(self._colatitude, dtype=bool) self._earth_occ = np.asarray(earth_occ, dtype=bool) self._attitude = attitude # optional: scoords.Attitude for from_region_cut + self._occ_call_idx = 0 # row counter for 2D earth_occ (from_region_cut) @property def intervals_tstart(self): @@ -49,6 +50,12 @@ def get_target_in_sc_frame(self, source): unit='rad', frame=SpacecraftFrame()) def get_earth_occ(self, source): + # 2D (n_on_pix, npoints): return one row per call (from_region_cut) + if self._earth_occ.ndim == 2: + row = self._earth_occ[self._occ_call_idx] + self._occ_call_idx += 1 + return row + # 1D (npoints,): return as-is (from_pointing_cut) return self._earth_occ @@ -236,39 +243,44 @@ def test_from_region_cut_all_in(): assert gti.tstop_list[0] == Time(60975., format='mjd', scale='utc') -def test_from_region_cut_earth_occ_all(): +def test_from_region_cut_earth_occ_all_vs_any(): """ - mode='all': bin 4 is occulted for one on-region pixel -> excluded. - Result: only the first interval [60971, 60973] survives. + Verify that earth_occ_mode='all' and 'any' produce different results. + + Setup: all 5 bins point into the region. + - bin 1: all on-region pixels occulted -> excluded by both 'all' and 'any' + - bin 3: only pixel 0 occulted -> excluded by 'all', kept by 'any' + + Expected: + mode='all': [T,F,T,F,T] -> 3 intervals + mode='any': [T,F,T,T,T] -> bin 2,3,4 merge -> 2 intervals """ region = _make_region() - sc = _make_sc_region(region, [False, True, True, False, True]) - - # earth_occ is 1D (npoints=6); get_earth_occ is called once per on-region pixel. - # Occulted at edge-point 4 for the first pixel only. - sc._earth_occ = np.array([False, False, False, False, True, False]) - - gti = GoodTimeInterval.from_region_cut(region, sc, earth_occ=True, earth_occ_mode='all') - - assert len(gti) == 1 - assert np.all(gti.tstart_list == Time([60971.], format='mjd', scale='utc')) - assert np.all(gti.tstop_list == Time([60973.], format='mjd', scale='utc')) - - -def test_from_region_cut_earth_occ_any(): - """ - mode='any': bin 4 has one pixel occulted but others are fine -> kept. - Result: both intervals survive unchanged. - """ - region = _make_region() - sc = _make_sc_region(region, [False, True, True, False, True]) - sc._earth_occ = np.array([False, False, False, False, True, False]) - - gti = GoodTimeInterval.from_region_cut(region, sc, earth_occ=True, earth_occ_mode='any') - - assert len(gti) == 1 - assert np.all(gti.tstart_list == Time([60971.], format='mjd', scale='utc')) - assert np.all(gti.tstop_list == Time([60973.], format='mjd', scale='utc')) + n_on = int(np.sum(np.asarray(region[:], dtype=bool))) + + # earth_occ 2D: shape (n_on, npoints=6) + earth_occ = np.zeros((n_on, 6), dtype=bool) + earth_occ[:, 1] = True # bin 1: all pixels occulted + earth_occ[0, 3] = True # bin 3: only pixel 0 occulted + + sc_all = _make_sc_region(region, [True, True, True, True, True]) + sc_all._earth_occ = earth_occ + + sc_any = _make_sc_region(region, [True, True, True, True, True]) + sc_any._earth_occ = earth_occ + + gti_all = GoodTimeInterval.from_region_cut(region, sc_all, earth_occ=True, earth_occ_mode='all') + gti_any = GoodTimeInterval.from_region_cut(region, sc_any, earth_occ=True, earth_occ_mode='any') + + # mode='all': bin 1 and bin 3 both excluded -> 3 separate intervals + assert len(gti_all) == 3 + assert np.all(gti_all.tstart_list == Time([60970., 60972., 60974.], format='mjd', scale='utc')) + assert np.all(gti_all.tstop_list == Time([60971., 60973., 60975.], format='mjd', scale='utc')) + + # mode='any': only bin 1 excluded; bin 3 kept -> bin 2,3,4 merge into one interval + assert len(gti_any) == 2 + assert np.all(gti_any.tstart_list == Time([60970., 60972.], format='mjd', scale='utc')) + assert np.all(gti_any.tstop_list == Time([60971., 60975.], format='mjd', scale='utc')) def test_from_region_cut_invalid_earth_occ_mode():