Aggregate results of FISPACT runs to efficient datasets.
Note
This document is in progress.
The module loads FISPACT JSON output files and converts to Polars dataframes with minor data normalization. The dataframes can be stored either to parquet files or DuckDb database. This allows efficient data aggregation and analysis. Multiple JSON files for different FISPACT runs can be combined using simple additional identification. So far, we use just two-dimensional identification by material and case. The case usually identifies certain neutron flux energy distribution.
- export to DuckDB
- export to parquet files
- neutron flux presentation conversion
From PyPI
pip install xpypact
As dependency
poetry add xpypact
From source
pip install htpps://github.com/MC-kit/xpypact.git
from xpypact import FullDataCollector, Inventory
def get_material_id(p: Path) -> int:
...
def get_case_id(p: Path) -> int:
...
jsons = [path1, path2, ...]
material_ids = {p: get_material_id(p) for p in jsons }
case_ids = {c: get_case_id(p) for p in jsons }
collector = FullDataCollector()
if sequential_load:
for json in jsons:
inventory = Inventory.from_json(json)
collector.append(inventory, material_id=material_ids[json], case_id=case_ids[json])
else: # multithreading is allowed for collector as well
task_list = ... # list of tuples[directory, case_id, tasks_sequence]
threads = 16 # whatever
def _find_path(arg) -> tuple[int, int, Path]:
_case, path, inventory = arg
json_path: Path = (Path(path) / inventory).with_suffix(".json")
if not json_path.exists():
msg = f"Cannot find file {json_path}"
raise FindPathError(msg)
try:
material_id = int(inventory[_LEN_INVENTORY:])
case_str = json_path.parent.parts[-1]
case_id = int(case_str[_LEN_CASE:])
except (ValueError, IndexError) as x:
msg = f"Cannot define material_id and case_id from {json_path}"
raise FindPathError(msg) from x
if case_id != _case:
msg = f"Contradicting values of case_id in case path and database: {case_id} != {_case}"
raise FindPathError(msg)
return material_id, case_id, json_path
with futures.ThreadPoolExecutor(max_workers=threads) as executor:
mcp_futures = [
executor.submit(_find_path, arg)
for arg in (
(task_case[0], task_case[1], task)
for task_case in task_list
for task in task_case[2].split(",")
if task.startswith("inventory-")
)
]
mips = [x.result() for x in futures.as_completed(mcp_futures)]
mips.sort(key=lambda x: x[0:2]) # sort by material_id, case_id
def _load_json(arg) -> None:
collector, material_id, case_id, json_path = arg
collector.append(from_json(json_path.read_text(encoding="utf8")), material_id, case_id)
with futures.ThreadPoolExecutor(max_workers=threads) as executor:
executor.map(_load_json, ((collector, *mip) for mip in mips))
collected = collector.get_result()
# save to parquet files
collected.save_to_parquets(Path.cwd() / "parquets")
# or use DuckDB database
import from xpypact.dao save
import duckdb as db
con = db.connect()
save(con, collected)
gamma_from_db = con.sql(
"""
select
g, rate
from timestep_gamma
where material_id = 1 and case_id = 54 and time_step_number = 7
order by g
""",
).fetchall()
Just follow ordinary practice: