Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 12 additions & 13 deletions stixcore/io/product_processors/fits/processors.py
Original file line number Diff line number Diff line change
Expand Up @@ -723,22 +723,23 @@ def generate_primary_header(cls, filename, product, *, version=0):
# if not isinstance(product.obt_beg, SCETime):
# raise ValueError("Expected SCETime as time format")

scet_timerange = product.scet_timerange
headers = FitsProcessor.generate_common_header(filename, product, version=version) + (
# Name, Value, Comment
# ('MJDREF', product.obs_beg.mjd),
# ('DATEREF', product.obs_beg.fits),
("OBT_BEG", product.scet_timerange.start.as_float().value, "Start acquisition time in OBT"),
("OBT_END", product.scet_timerange.end.as_float().value, "End acquisition time in OBT"),
("OBT_BEG", scet_timerange.start.as_float().value, "Start acquisition time in OBT"),
("OBT_END", scet_timerange.end.as_float().value, "End acquisition time in OBT"),
("TIMESYS", "OBT", "System used for time keywords"),
("LEVEL", "L0", "Processing level of the data"),
("DATE-OBS", product.scet_timerange.start.to_string(), "Depreciated, same as DATE-BEG"),
("DATE-BEG", product.scet_timerange.start.to_string(), "Start time of observation"),
("DATE-AVG", product.scet_timerange.avg.to_string(), "Average time of observation"),
("DATE-END", product.scet_timerange.end.to_string(), "End time of observation"),
("DATE-OBS", scet_timerange.start.to_string(), "Depreciated, same as DATE-BEG"),
("DATE-BEG", scet_timerange.start.to_string(), "Start time of observation"),
("DATE-AVG", scet_timerange.avg.to_string(), "Average time of observation"),
("DATE-END", scet_timerange.end.to_string(), "End time of observation"),
("DATAMIN", product.dmin, "Minimum valid physical value"),
("DATAMAX", product.dmax, "Maximum valid physical value"),
("BUNIT", product.bunit, "Units of physical value, after application of BSCALE, BZERO"),
("XPOSURE", product.exposure, "[s] shortest exposure time"),
("XPOSURE", product.min_exposure, "[s] shortest exposure time"),
("XPOMAX", product.max_exposure, "[s] maximum exposure time"),
)

Expand Down Expand Up @@ -780,7 +781,7 @@ def generate_primary_header(self, filename, product, *, version=0):
("DATAMIN", empty_if_nan(product.dmin), "Minimum valid physical value"),
("DATAMAX", empty_if_nan(product.dmax), "Maximum valid physical value"),
("BUNIT", product.bunit, "Units of physical value, after application of BSCALE, BZERO"),
("XPOSURE", empty_if_nan(product.exposure), "[s] shortest exposure time"),
("XPOSURE", empty_if_nan(product.min_exposure), "[s] shortest exposure time"),
("XPOMAX", empty_if_nan(product.max_exposure), "[s] maximum exposure time"),
)

Expand Down Expand Up @@ -897,10 +898,8 @@ def write_fits(self, product, *, version=0):

# In TM sent as uint in units of 0.1 so convert to cs as the time center
# can be on 0.5ds points
data["time"] = np.atleast_1d(
np.around((data["time"] - prod.scet_timerange.start).as_float().to(u.cs)).astype("uint32")
)
data["timedel"] = np.atleast_1d(np.uint32(np.around(data["timedel"].as_float().to(u.cs))))
data["time"] = np.atleast_1d(np.around((data["time"] - prod.utc_timerange.start).to(u.cs)).astype("uint32"))
data["timedel"] = np.atleast_1d(np.uint32(np.around(data["timedel"].to(u.cs))))

try:
control["time_stamp"] = control["time_stamp"].as_float()
Expand Down Expand Up @@ -998,7 +997,7 @@ def generate_primary_header(self, filename, product, *, version=0):
("DATAMIN", empty_if_nan(product.dmin), "Minimum valid physical value"),
("DATAMAX", empty_if_nan(product.dmax), "Maximum valid physical value"),
("BUNIT", product.bunit, "Units of physical value, after application of BSCALE, BZERO"),
("XPOSURE", empty_if_nan(product.exposure), "[s] shortest exposure time"),
("XPOSURE", empty_if_nan(product.min_exposure), "[s] shortest exposure time"),
("XPOMAX", empty_if_nan(product.max_exposure), "[s] maximum exposure time"),
)

Expand Down
15 changes: 11 additions & 4 deletions stixcore/io/product_processors/tests/test_processors.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from stixcore.products.product import Product
from stixcore.soop.manager import SOOPManager
from stixcore.time import SCETime, SCETimeRange
from stixcore.time.datetime import SCETimeDelta


@pytest.fixture
Expand Down Expand Up @@ -189,15 +190,21 @@ def test_level0_processor_generate_primary_header(datetime, product):
def test_count_data_mixin(p_file):
processor = FitsL0Processor("some/path")
p = Product(p_file)

if isinstance(p.data["timedel"], SCETimeDelta):
assert p.min_exposure == p.data["timedel"].as_float().min().to_value("s")
assert p.max_exposure == p.data["timedel"].as_float().max().to_value("s")
else:
assert p.min_exposure == p.data["timedel"].min().to_value("s")
assert p.max_exposure == p.data["timedel"].max().to_value("s")

assert p.dmin == p.data["counts"].min().value
assert p.dmax == p.data["counts"].max().value
assert p.exposure == p.data["timedel"].min().as_float().to_value()
assert p.max_exposure == p.data["timedel"].max().as_float().to_value()

test_data = {
"DATAMAX": p.dmax,
"DATAMIN": p.dmin,
"XPOSURE": p.exposure,
"XPOSURE": p.min_exposure,
"XPOMAX": p.max_exposure,
"BUNIT": "counts",
}
Expand Down Expand Up @@ -257,7 +264,7 @@ def test_level1_processor_generate_primary_header(product, soop_manager):
product.dmax = 1
product.dunit = ""
product.max_exposure = 1
product.exposure = 1
product.min_exposure = 1
product.service_type = 1
product.service_subtype = 2
product.ssid = 3
Expand Down
45 changes: 29 additions & 16 deletions stixcore/processing/tests/test_publish.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,15 @@ def test_publish_fits_to_esa_incomplete(product, out_dir):
)

t = SCETime(coarse=[beg.coarse, end.coarse])
product.data = QTable({"time": t, "timedel": t - beg, "fcounts": np.array([1, 2]), "control_index": [1, 1]})
t_utc = t.to_time()
product.data = QTable(
{
"time": t_utc,
"timedel": (t - beg).as_float().to("cs"),
"fcounts": np.array([1, 2]),
"control_index": [1, 1],
}
)
product.raw = ["packet1.xml", "packet2.xml"]
product.parent = ["packet1.xml", "packet2.xml"]
product.level = "L1"
Expand All @@ -135,10 +143,10 @@ def test_publish_fits_to_esa_incomplete(product, out_dir):
product.NAME = "background"
product.obt_beg = beg
product.obt_end = end
product.date_obs = beg
product.date_beg = beg
product.date_end = end
product.exposure = 2
product.date_obs = beg.to_datetime()
product.date_beg = beg.to_datetime()
product.date_end = end.to_datetime()
product.min_exposure = 2
product.max_exposure = 3
product.dmin = 2
product.dmax = 3
Expand Down Expand Up @@ -222,10 +230,12 @@ def test_fits_incomplete_switch_over(out_dir):
)

t = SCETime(coarse=[beg.coarse, end.coarse])
t_utc = t.to_time()

product.data = QTable(
{
"time": t,
"timedel": t - beg,
"time": t_utc,
"timedel": (t - beg).as_float().to("cs"),
"fcounts": np.array([1, 2]),
"counts": np.array([1, 2]) * u.deg_C,
"control_index": [1, 1],
Expand All @@ -244,10 +254,10 @@ def test_fits_incomplete_switch_over(out_dir):
product.name = "background"
product.obt_beg = beg
product.obt_end = end
product.date_obs = beg
product.date_beg = beg
product.date_end = end
product.exposure = 2
product.date_obs = beg.to_datetime()
product.date_beg = beg.to_datetime()
product.date_end = end.to_datetime()
product.min_exposure = 2
product.max_exposure = 3
product.dmin = 2
product.dmax = 3
Expand Down Expand Up @@ -371,7 +381,10 @@ def test_publish_fits_to_esa(product, out_dir):
)

t = SCETime(coarse=[beg.coarse, end.coarse])
product.data = QTable({"time": t, "timedel": t - beg, "fcounts": np.array([1, 2]), "control_index": [1, 1]})
t_utc = t.to_time()
product.data = QTable(
{"time": t_utc, "timedel": (t - beg).as_float().to("cs"), "fcounts": np.array([1, 2]), "control_index": [1, 1]}
)
product.raw = ["packet1.xml", "packet2.xml"]
product.parent = ["packet1.xml", "packet2.xml"]
product.level = "L1"
Expand All @@ -382,10 +395,10 @@ def test_publish_fits_to_esa(product, out_dir):
product.name = "xray-spec"
product.obt_beg = beg
product.obt_end = end
product.date_obs = beg
product.date_beg = beg
product.date_end = end
product.exposure = 2
product.date_obs = beg.to_datetime()
product.date_beg = beg.to_datetime()
product.date_end = end.to_datetime()
product.min_exposure = 2
product.max_exposure = 3
product.dmin = 2
product.dmax = 3
Expand Down
14 changes: 14 additions & 0 deletions stixcore/products/level1/quicklookL1.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from stixcore.products.level0.quicklookL0 import QLProduct
from stixcore.products.product import L1Mixin
from stixcore.time import SCETimeRange
from stixcore.time.datetime import SCETime, SCETimeDelta
from stixcore.util.logging import get_logger

__all__ = ["LightCurve", "Background", "Spectra", "Variance", "FlareFlag", "EnergyCalibration", "TMStatusFlareList"]
Expand Down Expand Up @@ -242,6 +243,19 @@ def from_level0(cls, l0product, parent=""):
l1.level = "L1"
engineering.raw_to_engineering_product(l1, IDBManager.instance)

# convert SCETimes to UTC Time
if "time" in l1.data.colnames and isinstance(l1.data["time"], SCETime):
l1.data.replace_column(
"time",
l1.data["time"].to_time(),
)
# convert SCETimesDelta to Quantity (s)
if "timedel" in l1.data.colnames and isinstance(l1.data["timedel"], SCETimeDelta):
l1.data.replace_column(
"timedel",
l1.data["timedel"].as_float(),
)

# fix for wrong calibration in IDB https://github.com/i4Ds/STIXCore/issues/432
# nix00122 was wrong assumed to be in ds but it is plain s
l1.control["integration_time"] = l1.control["integration_time"] * 10
Expand Down
4 changes: 2 additions & 2 deletions stixcore/products/level3/flarelist.py
Original file line number Diff line number Diff line change
Expand Up @@ -515,7 +515,7 @@ def dmax(self):
return (self.data["lc_peak"].sum(axis=1)).max().value if len(self.data) > 0 else np.nan

@property
def exposure(self):
def min_exposure(self):
return self.data["duration"].min().to_value("s") if len(self.data) > 0 else np.nan

@property
Expand Down Expand Up @@ -649,7 +649,7 @@ def dmax(self):
return (self.data["lc_peak"].sum(axis=1)).max().value if len(self.data) > 0 else np.nan

@property
def exposure(self):
def min_exposure(self):
return self.data["duration"].min().to_value("s") if len(self.data) > 0 else np.nan

@property
Expand Down
Loading
Loading