|
| 1 | +import pytest |
| 2 | +import numpy as np |
| 3 | +import awkward as ak |
| 4 | + |
| 5 | +from imas.wrangler import wrangle, unwrangle |
| 6 | +from imas.ids_factory import IDSFactory |
| 7 | +from imas.util import idsdiffgen |
| 8 | + |
| 9 | +@pytest.fixture |
| 10 | +def test_data(): |
| 11 | + data = {"equilibrium": {}} |
| 12 | + data["equilibrium"]["N_time"] = 100 |
| 13 | + data["equilibrium"]["N_radial"] = 100 |
| 14 | + data["equilibrium"]["N_grid"] = 1 |
| 15 | + data["equilibrium"]["time"] = np.linspace(0.0, 5.0, data["equilibrium"]["N_time"]) |
| 16 | + data["equilibrium"]["psi_1d"] = np.linspace(0.0, 1.0, data["equilibrium"]["N_radial"]) |
| 17 | + data["equilibrium"]["r"] = np.linspace(1.0, 2.0, data["equilibrium"]["N_radial"]) |
| 18 | + data["equilibrium"]["z"] = np.linspace(-1.0, 1.0, data["equilibrium"]["N_radial"]) |
| 19 | + r_grid, z_grid = np.meshgrid(data["equilibrium"]["r"], |
| 20 | + data["equilibrium"]["z"], indexing="ij") |
| 21 | + data["equilibrium"]["psi_2d"] = (r_grid - 1.5) ** 2 + z_grid**2 |
| 22 | + |
| 23 | + data["thomson_scattering"] = {} |
| 24 | + data["thomson_scattering"]["N_ch"] = (20,10) |
| 25 | + data["thomson_scattering"]["N_time"] = (100, 300) |
| 26 | + data["thomson_scattering"]["r"] = np.concatenate([np.ones(data["thomson_scattering"]["N_ch"][0])*1.6, |
| 27 | + np.ones(data["thomson_scattering"]["N_ch"][1])*1.7]) |
| 28 | + data["thomson_scattering"]["z"] = np.concatenate([np.linspace(-1.0, 1.0, data["thomson_scattering"]["N_ch"][0]), |
| 29 | + np.linspace(-1.0, 1.0, data["thomson_scattering"]["N_ch"][1])]) |
| 30 | + data["thomson_scattering"]["t_e"] = data["thomson_scattering"]["z"]**2 * 5.e3 |
| 31 | + data["thomson_scattering"]["n_e"] = data["thomson_scattering"]["z"]**2 * 5.e19 |
| 32 | + data["thomson_scattering"]["time"] = (np.linspace(0,5.0, data["thomson_scattering"]["N_time"][0]), |
| 33 | + np.linspace(0,5.0, data["thomson_scattering"]["N_time"][1])) |
| 34 | + return data |
| 35 | + |
| 36 | +@pytest.fixture |
| 37 | +def flat(test_data): |
| 38 | + flat = {} |
| 39 | + # Equilibrium test data |
| 40 | + flat["equilibrium.time"] = test_data["equilibrium"]["time"] |
| 41 | + flat["equilibrium.time_slice.time"] = test_data["equilibrium"]["time"] |
| 42 | + flat["equilibrium.ids_properties.homogeneous_time"] = 1 |
| 43 | + flat["equilibrium.time_slice.profiles_1d.psi"] = np.zeros( |
| 44 | + (test_data["equilibrium"]["N_time"], test_data["equilibrium"]["N_radial"]) |
| 45 | + ) |
| 46 | + flat["equilibrium.time_slice.profiles_1d.psi"][:] = test_data["equilibrium"]["psi_1d"] |
| 47 | + flat["equilibrium.time_slice.profiles_2d.grid.dim1"] = np.zeros( |
| 48 | + (test_data["equilibrium"]["N_time"], |
| 49 | + test_data["equilibrium"]["N_grid"], |
| 50 | + test_data["equilibrium"]["N_radial"]) |
| 51 | + ) |
| 52 | + flat["equilibrium.time_slice.profiles_2d.grid.dim1"][:] = test_data["equilibrium"]["r"][None, :] |
| 53 | + flat["equilibrium.time_slice.profiles_2d.grid.dim2"] = np.zeros( |
| 54 | + (test_data["equilibrium"]["N_time"], |
| 55 | + test_data["equilibrium"]["N_grid"], |
| 56 | + test_data["equilibrium"]["N_radial"]) |
| 57 | + ) |
| 58 | + flat["equilibrium.time_slice.profiles_2d.grid.dim2"][:] = test_data["equilibrium"]["z"][None, :] |
| 59 | + flat["equilibrium.time_slice.profiles_2d.psi"] = np.zeros( |
| 60 | + ( |
| 61 | + test_data["equilibrium"]["N_time"], |
| 62 | + test_data["equilibrium"]["N_grid"], |
| 63 | + test_data["equilibrium"]["N_radial"], |
| 64 | + test_data["equilibrium"]["N_radial"], |
| 65 | + ) |
| 66 | + ) |
| 67 | + flat["equilibrium.time_slice.profiles_2d.psi"][:] = test_data["equilibrium"]["psi_2d"][None, ...] |
| 68 | + |
| 69 | + # Thomson scattering test data (ragged) |
| 70 | + flat["thomson_scattering.ids_properties.homogeneous_time"] = 0 |
| 71 | + flat["thomson_scattering.channel.t_e.time"] = ak.concatenate([np.tile(test_data["thomson_scattering"]["time"][0], |
| 72 | + (test_data["thomson_scattering"]["N_ch"][0],1)), |
| 73 | + np.tile(test_data["thomson_scattering"]["time"][1], |
| 74 | + (test_data["thomson_scattering"]["N_ch"][1],1))]) |
| 75 | + flat["thomson_scattering.channel.t_e.data"] = ak.concatenate([np.tile(test_data["thomson_scattering"]["t_e"][0], |
| 76 | + (test_data["thomson_scattering"]["N_ch"][0],1)), |
| 77 | + np.tile(test_data["thomson_scattering"]["t_e"][1], |
| 78 | + (test_data["thomson_scattering"]["N_ch"][1],1))]) |
| 79 | + flat["thomson_scattering.channel.n_e.time"] = ak.concatenate([np.tile(test_data["thomson_scattering"]["time"][0], |
| 80 | + (test_data["thomson_scattering"]["N_ch"][0],1)), |
| 81 | + np.tile(test_data["thomson_scattering"]["time"][1], |
| 82 | + (test_data["thomson_scattering"]["N_ch"][1],1))]) |
| 83 | + flat["thomson_scattering.channel.n_e.data"] = ak.concatenate([np.tile(test_data["thomson_scattering"]["n_e"][0], |
| 84 | + (test_data["thomson_scattering"]["N_ch"][0],1)), |
| 85 | + np.tile(test_data["thomson_scattering"]["n_e"][1], |
| 86 | + (test_data["thomson_scattering"]["N_ch"][1],1))]) |
| 87 | + flat["thomson_scattering.channel.position.r"] = test_data["thomson_scattering"]["r"] |
| 88 | + flat["thomson_scattering.channel.position.z"] = test_data["thomson_scattering"]["z"] |
| 89 | + return flat |
| 90 | + |
| 91 | +@pytest.fixture |
| 92 | +def test_ids_dict(test_data): |
| 93 | + factory = IDSFactory("3.41.0") |
| 94 | + equilibrium = factory.equilibrium() |
| 95 | + equilibrium.time = test_data["equilibrium"]["time"] |
| 96 | + equilibrium.time_slice.resize(test_data["equilibrium"]["N_time"]) |
| 97 | + equilibrium.ids_properties.homogeneous_time = 1 |
| 98 | + for i in range(test_data["equilibrium"]["N_time"]): |
| 99 | + equilibrium.time_slice[i].time = test_data["equilibrium"]["time"][i] |
| 100 | + equilibrium.time_slice[i].profiles_1d.psi = test_data["equilibrium"]["psi_1d"] |
| 101 | + equilibrium.time_slice[i].profiles_2d.resize(1) |
| 102 | + equilibrium.time_slice[i].profiles_2d[0].grid.dim1 = test_data["equilibrium"]["r"] |
| 103 | + equilibrium.time_slice[i].profiles_2d[0].grid.dim2 = test_data["equilibrium"]["z"] |
| 104 | + equilibrium.time_slice[i].profiles_2d[0].psi = test_data["equilibrium"]["psi_2d"] |
| 105 | + |
| 106 | + thomson_scattering = factory.thomson_scattering() |
| 107 | + thomson_scattering.ids_properties.homogeneous_time = 0 |
| 108 | + N = test_data["thomson_scattering"]["N_ch"][0] + test_data["thomson_scattering"]["N_ch"][1] |
| 109 | + thomson_scattering.channel.resize(N) |
| 110 | + index = 0 |
| 111 | + for i in range(N): |
| 112 | + if i == test_data["thomson_scattering"]["N_ch"][0]: |
| 113 | + index = 1 |
| 114 | + thomson_scattering.channel[i].t_e.time = test_data["thomson_scattering"]["time"][index] |
| 115 | + thomson_scattering.channel[i].t_e.data = np.tile(test_data["thomson_scattering"]["t_e"][i], |
| 116 | + test_data["thomson_scattering"]["N_time"][index]) |
| 117 | + thomson_scattering.channel[i].n_e.time = test_data["thomson_scattering"]["time"][index] |
| 118 | + thomson_scattering.channel[i].n_e.data = np.tile(test_data["thomson_scattering"]["t_e"][i], |
| 119 | + test_data["thomson_scattering"]["N_time"][index]) |
| 120 | + thomson_scattering.channel[i].position.r = test_data["thomson_scattering"]["r"][i] |
| 121 | + thomson_scattering.channel[i].position.z = test_data["thomson_scattering"]["z"][i] |
| 122 | + |
| 123 | + return {"equilibrium":equilibrium, "thomson_scattering": thomson_scattering} |
| 124 | + |
| 125 | + |
| 126 | +def test_wrangle(test_ids_dict, flat): |
| 127 | + wrangled = wrangle(flat) |
| 128 | + for key in test_ids_dict: |
| 129 | + diff = idsdiffgen(wrangled[key],test_ids_dict[key]) |
| 130 | + assert len(list(diff)) == 0, diff |
| 131 | + |
| 132 | +def test_unwrangle(test_ids_dict, flat): |
| 133 | + result = unwrangle(list(flat.keys()), test_ids_dict) |
| 134 | + for key in flat.keys(): |
| 135 | + np.testing.assert_allclose(result[key], flat[key]) |
0 commit comments