Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
332 changes: 332 additions & 0 deletions cosipy/response/ideal_response.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,14 @@
from cosipy.util.iterables import itertools_batched
from scoords import SpacecraftFrame

from astromodels.functions.function import Function1D
from functools import cached_property
from scipy.integrate import trapezoid, cumulative_trapezoid
from scipy.interpolate import interp1d
from dataclasses import dataclass
import numpy.typing as npt
from tqdm import tqdm


def _to_rad(angle):
if isinstance(angle, (Quantity, Angle)):
Expand Down Expand Up @@ -1079,3 +1087,327 @@ def event_probability(self) -> Iterable[float]:

return self._cached_event_probability

class RandomEventDataFromContinuumInSCFrame(EmCDSEventDataInSCFrameInterface):

"""
Generate unbinned events from a continuum photon flux spectrum in the
spacecraft frame.

It involves four steps:

1. Compute counts information, see method `unpol_counts`;

2. Compute inverse CDF, see method `construct_inverse_cdf`;

3. Sample incident photon energies, see method `incident_photons`;

4. Simulate sampled incident photons, see method `simulate_events`.

Parameters
----------
irf: FarFieldInstrumentResponseFunctionInterface
Must handle PhotonWithDirectionAndEnergyInSCFrameInterface
spectrum : astromodels.functions.function.Function1D
The input spectrum must be differential photon spectrum.
direction : astropy.coordinates.SkyCoord
The location of the point source.
duration : astropy.units.Quantity
The duration of the point source.
energy_min, energy_max : astropy.units.Quantity
The energy range of simulation.
energy_grid_number : int, optional
The number of energy grids to compute the counts information.
More grids will give more accurate count estimates.
rng : numpy.random.Generator, optional
The random generator to Poisson the total indicent photons.

"""

def __init__(
self,
irf: FarFieldInstrumentResponseFunctionInterface,
spectrum: Function1D,
direction: SkyCoord,
duration: Quantity,
energy_min: Quantity,
energy_max: Quantity,
energy_grid_number: int = 1_000_000,
rng: np.random.Generator | None = None,
):

self.unpolarized_irf = irf

self.spectrum = spectrum

self.rng = np.random.default_rng() if rng is None else rng

if not energy_min < energy_max:
raise ValueError("energy_max must be greater than energy_min!")

# unify energy unit to keV
self.energy_min: Quantity = energy_min.to(u.keV)
self.energy_max: Quantity = energy_max.to(u.keV)

self.energy_grid_number = energy_grid_number

# unify duration unit to second
# self.duration_s: np.float64 = duration.to_value(u.s)
self.duration_s: Quantity = duration.to(u.s)

# get source longitude and latitude
direction = direction.transform_to('spacecraftframe')
self.source_direction_lon_rad: np.float64 = direction.lon.rad
self.source_direction_lat_rad: np.float64 = direction.lat.rad


@cached_property
def energy_grid(self) -> Quantity:
"""Energy grid spanning [energy_min, energy_max] with unit keV."""
return np.linspace(self.energy_min, self.energy_max, self.energy_grid_number)

@cached_property
def unpolarized_aeff(self) -> Quantity:

r"""
Compute the unpolarized effective area on the energy grid.

This method queries the instrument response for the unpolarized
effective area $A_{\rm eff}(E)$ at each grid energy.

Returns
-------
effective_area : astropy.units.Quantity
Unpolarized effective area evaluated on `self.energy_grid`.
The returned array has shape `(energy_grid_number,)` and unit ${\text cm}^2$.
"""


# get unpolarized photon at each energy grid
unpolarized_photon = PhotonWithDirectionAndEnergyInSCFrame(
self.source_direction_lon_rad,
self.source_direction_lat_rad,
self.energy_grid.value,
)

aeff_cm2 = np.asarray(
self.unpolarized_irf.effective_area_cm2(unpolarized_photon)
)

return aeff_cm2 * (u.cm**2)

@dataclass(frozen=True)
class UnpolCountsSummary:
"""
The container to store three important values:
- rate density (ph/s/keV)
- rate (ph/s)
- total expected counts: (ph)

"""
rate_density: Quantity
rate: Quantity
total_expected_counts: Quantity

@cached_property
def unpol_counts(self) -> UnpolCountsSummary:

r"""
Sample the total number of detected photons from a continuum source.

Model the detected continuum photon as a Poisson process. The detected
rate density is the source differential photon spectrum convolved with
the effective area.

Notes
-----
- $S(E)$ is the differential photon flux spectrum.
- $w(E) \equiv S(E)\,A_{\mathrm{eff}}(E)$
- Units: $[w(E)] = \dfrac{\mathrm{ph}}{\mathrm{s}\times \mathrm{keV}}$

Integrated detected photon rate

$$I_0 = \int_{E_{\mathrm{min}}}^{E_{\mathrm{max}}} w(E)\,\mathrm{d}E,$$

with units

$$[I_0] = \dfrac{\mathrm{ph}}{\mathrm{s}}.$$

Expected detected counts and Poisson sampling

For an exposure time $T$, the expected detected counts are

$$\mu = T\,I_0,$$

and the realized detected counts are sampled as

$$N \sim \mathrm{Poisson}(\mu).$$

Numerical integration (trapezoids)

In practice, $S(E)$ may not be analytical, and $A_{\mathrm{eff}}(E)$ is
usually not analytical either. Therefore we compute $I_0$ numerically on
a fine energy grid using the trapezoidal rule.

- Choose an energy grid $E_j \in [E_{\mathrm{min}}, E_{\mathrm{max}}]$
- Evaluate $S(E_j)$ and $A_{\mathrm{eff}}(E_j)$ and form
$w(E_j)=S(E_j)A_{\mathrm{eff}}(E_j)$
- Approximate the integral with trapezoids:
$$I_0 \approx \mathrm{trapz}\bigl(w(E_j), E_j\bigr).$$

Returns
-------
N : astropy.units.Quantity
Realized total detected counts sampled from the Poisson process.
This is a scalar count, optionally tagged with ``u.ph``.

"""

# find the differential flux at each energy grid
s: Quantity = self.spectrum(self.energy_grid.value) * u.ph / u.cm**2 / u.s / u.keV

# find the photon rate density (ph/s/keV)
w: Quantity = s * self.unpolarized_aeff

# find the photon rate (ph/s)
I_0: Quantity = trapezoid(w, self.energy_grid)

# get the expected total counts (ph) and poisson it
mu: Quantity = I_0 * self.duration_s
# here transform mu to dimensionless expected counts
# before poissoning it.
unpolarized_expected_counts: Quantity = poisson(mu.to_value(u.ph)).rvs() * u.ph

return RandomEventDataFromContinuumInSCFrame.UnpolCountsSummary(
rate_density=w,
rate=I_0,
total_expected_counts=unpolarized_expected_counts
)

def construct_inverse_cdf(self) -> interp1d:

r"""
Construct an inverse-CDF interpolator for sampling photon energies.

Compute the normalized CDF from $w(E)=S(E)A_{\mathrm{eff}}(E)$ using cumulative
trapezoids and normalize by $I_0$:

$$\mathrm{CDF}(E) = \frac{\int_{E_{\mathrm{min}}}^{E} w(E'),\mathrm{d}E'}{I_0}.$$

Returns
-------
inverse_cdf : scipy.interpolate.interp1d
The inverse CDF that can be used to map u ~ Unif(0,1) to energies.

"""

# get counts rate density as array
w_val: npt.NDArray[np.float64] = self.unpol_counts.rate_density.value

# get enegyr grid as array
E_val: npt.NDArray[np.float64] = self.energy_grid.value

cdf_raw: npt.NDArray[np.float64] = cumulative_trapezoid(
x=E_val,
y=w_val,
initial=0)

# normalize cdf
I0_val = self.unpol_counts.rate.value
cdf: npt.NDArray[np.float64] = cdf_raw / I0_val

# quality check
if not np.all(np.diff(cdf)>0):
raise ValueError("CDF is not strictly increasing (plateau or decrease).")

if not np.allclose(cdf[-1], 1.0, rtol=0.0, atol=1e-5):
raise ValueError(f"CDF[-1]={cdf[-1]} deviates from 1 beyond tolerance (1e-5).")


inverse_cdf = interp1d(
x=cdf,
y=self.energy_grid.value,
kind="linear",
)

return inverse_cdf

@cached_property
def incident_photons(self) -> tuple[Quantity, npt.NDArray[np.float64]]:

"""
Sample detected photon energies using inverse-CDF sampling.

Returns
-------
E_i_unique : astropy.units.Quantity
Sorted unique sampled energies (keV).
counts : numpy.ndarray
Number of each sampled energy.
"""

inverse_cdf: interp1d = self.construct_inverse_cdf()

# get a uniform distribution from [0,1)
# u ~ Unif(0,1)
uniform = self.rng.random(
int(self.unpol_counts.total_expected_counts.value),
)

E_i: Quantity = inverse_cdf(uniform) * u.keV

E_i_unique, counts = np.unique(E_i.value, return_counts = True)

return E_i_unique * u.keV, counts

def simulate_events(self):

"""
Generate detected events by passing sampled photons.
"""

self._events = []

E_unique, counts = self.incident_photons

pbar = tqdm(enumerate(E_unique), total=len(E_unique))
for idx, energy in pbar:
pbar.set_description_str(f"Simulating energy {energy:<9.3f}")

unpolarized_photon = PhotonWithDirectionAndEnergyInSCFrame(
self.source_direction_lon_rad,
self.source_direction_lat_rad,
energy.value,
)

unpolarized_photons = self.unpolarized_irf.photon_list_type.from_photon(
unpolarized_photon,
repeat = counts[idx]
)

unpolarized_events = iter(self.unpolarized_irf.random_events(unpolarized_photons))

nthrown_unpolarized = 0

while nthrown_unpolarized < counts[idx]:

# Unpolarized component
if nthrown_unpolarized < counts[idx]:
self._events.append(next(unpolarized_events))
nthrown_unpolarized += 1

return

def __iter__(self) -> Iterator[EmCDSEventInSCFrameInterface]:

"""
Iterate over realized events stored in `self._events`.
"""
yield from self._events


@property
def nevents(self) -> int:
"""
Return the number of realized events stored in `self._events`.
"""
return len(self._events)
Loading