Skip to content

Commit 337a379

Browse files
feat: distributed ATLAS setup (#13)
* add xcache support * add distributed processing setup * add caching notebook * support data and stacked plot
1 parent 8005f19 commit 337a379

File tree

11 files changed

+675
-599
lines changed

11 files changed

+675
-599
lines changed

.gitignore

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,3 @@ __marimo__/
212212
# NEVER CHANGE
213213
# CMS Internal files are not supposed to be published
214214
DONT_EXPOSE_CMS_INTERNAL/
215-
216-
# ATLAS setup
217-
atlas/ntuple_production/production_status.json

atlas/.gitignore

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,3 +208,7 @@ __marimo__/
208208
.asetup.save
209209
*.out
210210
*.err
211+
# bigpanda status
212+
ntuple_production/production_status.json
213+
# preprocess json
214+
preprocess_output.json

atlas/analysis.ipynb

Lines changed: 356 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,356 @@
1+
{
2+
"cells": [
3+
{
4+
"cell_type": "code",
5+
"execution_count": null,
6+
"id": "3f9f15ac-903b-4f33-b775-e7c14a3647a8",
7+
"metadata": {},
8+
"outputs": [],
9+
"source": [
10+
"# ! pip install --upgrade atlas_schema\n",
11+
"# ! pip install --upgrade git+https://github.com/scikit-hep/mplhep.git\n",
12+
"\n",
13+
"# import importlib\n",
14+
"# importlib.reload(utils)"
15+
]
16+
},
17+
{
18+
"cell_type": "code",
19+
"execution_count": null,
20+
"id": "94b8b953-dc21-4d5c-899d-b1f0b03c70b2",
21+
"metadata": {},
22+
"outputs": [],
23+
"source": [
24+
"import gzip\n",
25+
"import json\n",
26+
"import re\n",
27+
"import time\n",
28+
"\n",
29+
"import awkward as ak\n",
30+
"import dask\n",
31+
"import vector\n",
32+
"import hist\n",
33+
"import matplotlib.pyplot as plt\n",
34+
"import mplhep\n",
35+
"import numpy as np\n",
36+
"import uproot\n",
37+
"\n",
38+
"from atlas_schema.schema import NtupleSchema\n",
39+
"from coffea import processor\n",
40+
"from coffea.nanoevents import NanoEventsFactory\n",
41+
"from dask.distributed import Client, PipInstall\n",
42+
"\n",
43+
"\n",
44+
"import utils\n",
45+
"\n",
46+
"vector.register_awkward()\n",
47+
"mplhep.style.use(mplhep.style.ATLAS1)\n",
48+
"\n",
49+
"client = Client(\"tls://localhost:8786\")\n",
50+
"\n",
51+
"plugin = PipInstall(packages=[\"atlas_schema\"], pip_options=[\"--upgrade\"])\n",
52+
"client.register_plugin(plugin)"
53+
]
54+
},
55+
{
56+
"cell_type": "markdown",
57+
"id": "91bbd464-1423-4353-81cc-f43806f04a7e",
58+
"metadata": {},
59+
"source": [
60+
"### fileset preparation"
61+
]
62+
},
63+
{
64+
"cell_type": "code",
65+
"execution_count": null,
66+
"id": "3dcf6216-0eca-4ea0-921b-eae3eda04af1",
67+
"metadata": {},
68+
"outputs": [],
69+
"source": [
70+
"# load metadata from file\n",
71+
"fname = \"ntuple_production/file_metadata.json.gz\"\n",
72+
"with gzip.open(fname) as f:\n",
73+
" dataset_info = json.loads(f.read().decode())\n",
74+
"\n",
75+
"# construct fileset\n",
76+
"fileset = {}\n",
77+
"for containers_for_category in dataset_info.values():\n",
78+
" for container, metadata in containers_for_category.items():\n",
79+
" if metadata[\"files_output\"] is None:\n",
80+
" # print(f\"skipping missing {container}\")\n",
81+
" continue\n",
82+
"\n",
83+
" dsid, _, campaign = utils.dsid_rtag_campaign(container)\n",
84+
"\n",
85+
" # debugging shortcuts\n",
86+
" # if campaign not in [\"mc20a\", \"data15\", \"data16\"]: continue\n",
87+
" # if \"601352\" not in dsid: continue\n",
88+
"\n",
89+
" weight_xs = utils.sample_xs(campaign, dsid)\n",
90+
" lumi = utils.integrated_luminosity(campaign)\n",
91+
" fileset[container] = {\"files\": dict((path, \"reco\") for path in metadata[\"files_output\"]), \"metadata\": {\"dsid\": dsid, \"campaign\": campaign, \"weight_xs\": weight_xs, \"lumi\": lumi}}\n",
92+
"\n",
93+
"# minimal fileset for debugging\n",
94+
"# fileset = {\"mc20_13TeV.601352.PhPy8EG_tW_dyn_DR_incl_antitop.deriv.DAOD_PHYSLITE.e8547_s4231_r13144_p6697\": fileset[\"mc20_13TeV.601352.PhPy8EG_tW_dyn_DR_incl_antitop.deriv.DAOD_PHYSLITE.e8547_s4231_r13144_p6697\"]}\n",
95+
"# fileset"
96+
]
97+
},
98+
{
99+
"cell_type": "markdown",
100+
"id": "f4a081b9-c4ec-41c8-830c-a727e56ff472",
101+
"metadata": {},
102+
"source": [
103+
"### simple non-distributed reading"
104+
]
105+
},
106+
{
107+
"cell_type": "code",
108+
"execution_count": null,
109+
"id": "6b6c685f-7e9c-4c5b-8f80-19f1543de32f",
110+
"metadata": {},
111+
"outputs": [],
112+
"source": [
113+
"events = NanoEventsFactory.from_root(\n",
114+
" {list(fileset[list(fileset.keys())[0]][\"files\"])[0]: \"reco\"},\n",
115+
" mode=\"virtual\",\n",
116+
" schemaclass=NtupleSchema,\n",
117+
" entry_stop=1000\n",
118+
").events()\n",
119+
"\n",
120+
"h = hist.new.Regular(30, 0, 300, label=\"leading electron $p_T$\").StrCat([], name=\"variation\", growth=True).Weight()\n",
121+
"\n",
122+
"for variation in events.systematic_names:\n",
123+
" if variation != \"NOSYS\" and \"EG_SCALE_ALL\" not in variation:\n",
124+
" continue\n",
125+
"\n",
126+
" cut = events[variation][\"pass\"][\"ejets\"] == 1\n",
127+
" h.fill(events[variation][cut==1].el.pt[:, 0] / 1_000, variation=variation)\n",
128+
"\n",
129+
"fig, ax = plt.subplots()\n",
130+
"for variation in h.axes[1]:\n",
131+
" h[:, variation].plot(histtype=\"step\", label=variation, ax=ax)\n",
132+
"_ = ax.legend()"
133+
]
134+
},
135+
{
136+
"cell_type": "markdown",
137+
"id": "a31f4dd8-07aa-4dd0-b50f-013349abe59a",
138+
"metadata": {},
139+
"source": [
140+
"### pre-processing"
141+
]
142+
},
143+
{
144+
"cell_type": "code",
145+
"execution_count": null,
146+
"id": "1abaeac0-ca4c-4a36-8426-10438c4e034e",
147+
"metadata": {},
148+
"outputs": [],
149+
"source": [
150+
"run = processor.Runner(\n",
151+
" executor = processor.DaskExecutor(client=client),\n",
152+
" # executor = processor.IterativeExecutor(),\n",
153+
" schema=NtupleSchema,\n",
154+
" savemetrics=True,\n",
155+
" chunksize=100_000,\n",
156+
" skipbadfiles=True,\n",
157+
" # maxchunks=1\n",
158+
")\n",
159+
"\n",
160+
"preprocess_output = run.preprocess(fileset)\n",
161+
"\n",
162+
"# write to disk\n",
163+
"with open(\"preprocess_output.json\", \"w\") as f:\n",
164+
" json.dump(utils.preprocess_to_json(preprocess_output), f)\n",
165+
"\n",
166+
"# load from disk\n",
167+
"with open(\"preprocess_output.json\") as f:\n",
168+
" preprocess_output = utils.json_to_preprocess(json.load(f))\n",
169+
"\n",
170+
"len(preprocess_output), preprocess_output[:3]"
171+
]
172+
},
173+
{
174+
"cell_type": "markdown",
175+
"id": "4667b1bf-0ff3-4ccf-93e9-4f4a8e0aa3c7",
176+
"metadata": {},
177+
"source": [
178+
"### processing"
179+
]
180+
},
181+
{
182+
"cell_type": "code",
183+
"execution_count": null,
184+
"id": "dc78d57d-4fe3-4e11-ab4c-0f0afe60ca32",
185+
"metadata": {},
186+
"outputs": [],
187+
"source": [
188+
"class Analysis(processor.ProcessorABC):\n",
189+
" def __init__(self):\n",
190+
" self.h = hist.new.Regular(30, 0, 300, label=\"leading electron $p_T$\").\\\n",
191+
" StrCat([], name=\"dsid_and_campaign\", growth=True).\\\n",
192+
" StrCat([], name=\"variation\", growth=True).\\\n",
193+
" Weight()\n",
194+
"\n",
195+
" def process(self, events):\n",
196+
" f = uproot.open(events.metadata[\"filename\"])\n",
197+
"\n",
198+
" # this should match existing pre-determined metadata\n",
199+
" # sim_type, mc_campaign, dsid, etag = f[\"metadata\"].axes[0]\n",
200+
" # assert mc_campaign == events.metadata[\"campaign\"]\n",
201+
" # assert dsid == events.metadata[\"dsid\"]\n",
202+
"\n",
203+
" # ensure systematics in schema and in histogram match\n",
204+
" # systematics_from_hist = list(f[\"listOfSystematics\"].axes[0])\n",
205+
" # assert sorted(systematics_from_hist) == sorted(events.systematic_names)\n",
206+
"\n",
207+
" # categorize events by DSID and campaign with a single histogram axis\n",
208+
" dsid_and_campaign = f\"{events.metadata[\"dsid\"]}_{events.metadata[\"campaign\"]}\"\n",
209+
"\n",
210+
" if events.metadata[\"dsid\"] != \"data\":\n",
211+
" sumw = float(f[f.keys(filter_name=\"CutBookkeeper*NOSYS\")[0]].values()[1]) # initial sum of weights\n",
212+
" else:\n",
213+
" sumw = None # no normalization for data\n",
214+
"\n",
215+
" for variation in events.systematic_names:\n",
216+
" if variation != \"NOSYS\" and \"EG_SCALE_ALL\" not in variation:\n",
217+
" continue\n",
218+
"\n",
219+
" cut = events[variation][\"pass\"][\"ejets\"] == 1\n",
220+
" weight = (events[variation][cut==1].weight.mc if events.metadata[\"dsid\"] != \"data\" else 1.0) * events.metadata[\"weight_xs\"] * events.metadata[\"lumi\"]\n",
221+
" self.h.fill(events[variation][cut==1].el.pt[:, 0] / 1_000, dsid_and_campaign=dsid_and_campaign, variation=variation, weight=weight)\n",
222+
"\n",
223+
" return {\n",
224+
" \"hist\": self.h,\n",
225+
" \"meta\": {\n",
226+
" \"sumw\": {(events.metadata[\"dsid\"], events.metadata[\"campaign\"]): {(events.metadata[\"fileuuid\"], sumw)}}} # sumw in a set to avoid summing multiple times per file\n",
227+
" }\n",
228+
"\n",
229+
" def postprocess(self, accumulator):\n",
230+
" # normalize histograms\n",
231+
" # https://topcptoolkit.docs.cern.ch/latest/starting/running_local/#sum-of-weights\n",
232+
" for dsid_and_campaign in accumulator[\"hist\"].axes[1]:\n",
233+
" dsid, campaign = dsid_and_campaign.split(\"_\")\n",
234+
" if dsid == \"data\":\n",
235+
" continue # no normalization for data by total number of weighted events\n",
236+
" norm = 1 / sum([sumw for uuid, sumw in accumulator[\"meta\"][\"sumw\"][(dsid, campaign)]])\n",
237+
" count_normalized = accumulator[\"hist\"][:, dsid_and_campaign, :].values()*norm\n",
238+
" variance_normalized = accumulator[\"hist\"][:, dsid_and_campaign, :].variances()*norm**2\n",
239+
" accumulator[\"hist\"][:, dsid_and_campaign, :] = np.stack([count_normalized, variance_normalized], axis=-1)\n",
240+
"\n",
241+
"\n",
242+
"t0 = time.perf_counter()\n",
243+
"out, report = run(preprocess_output, processor_instance=Analysis())\n",
244+
"t1 = time.perf_counter()\n",
245+
"report"
246+
]
247+
},
248+
{
249+
"cell_type": "markdown",
250+
"id": "8663e9ff-f8bb-43a0-8978-f2d430d2bbbd",
251+
"metadata": {},
252+
"source": [
253+
"track XCache egress: [link](https://grafana.mwt2.org/d/EKefjM-Sz/af-network-200gbps-challenge?var-cnode=c111_af_uchicago_edu&var-cnode=c112_af_uchicago_edu&var-cnode=c113_af_uchicago_edu&var-cnode=c114_af_uchicago_edu&var-cnode=c115_af_uchicago_edu&viewPanel=195&kiosk=true&orgId=1&from=now-1h&to=now&timezone=browser&refresh=5s)"
254+
]
255+
},
256+
{
257+
"cell_type": "code",
258+
"execution_count": null,
259+
"id": "9575019b-d1a5-4a8e-9d5a-32d0d8bd0919",
260+
"metadata": {},
261+
"outputs": [],
262+
"source": [
263+
"print(f\"data read: {report[\"bytesread\"] / 1000**3:.2f} GB in {report[\"chunks\"]} chunks\")\n",
264+
"\n",
265+
"print(f\"core-average event rate using \\'processtime\\': {report[\"entries\"] / 1000 / report[\"processtime\"]:.2f} kHz\")\n",
266+
"print(f\"core-average data rate using \\'processtime\\': {report[\"bytesread\"] / 1000**3 * 8 / report[\"processtime\"]:.2f} Gbps\")\n",
267+
"\n",
268+
"print(f\"average event rate using walltime: {report[\"entries\"] / 1000 / (t1 - t0):.2f} kHz\")\n",
269+
"print(f\"average data rate using walltime: {report[\"bytesread\"] / 1000**3 * 8 / (t1 - t0):.2f} Gbps\")"
270+
]
271+
},
272+
{
273+
"cell_type": "code",
274+
"execution_count": null,
275+
"id": "982cce52-7f5c-4126-a5cc-6a4dfee70732",
276+
"metadata": {},
277+
"outputs": [],
278+
"source": [
279+
"mc_stack = []\n",
280+
"labels = []\n",
281+
"for key in dataset_info:\n",
282+
" dsids = []\n",
283+
" for container in dataset_info[key]:\n",
284+
" dsids.append(container.split(\".\")[1])\n",
285+
"\n",
286+
" dsids = sorted(set(dsids))\n",
287+
" dsids_in_hist = [dc for dc in out[\"hist\"].axes[1] if dc.split(\"_\")[0] in dsids]\n",
288+
" print(f\"{key}:\\n - expect {dsids}\\n - have {dsids_in_hist}\")\n",
289+
"\n",
290+
" if key in [\"data\", \"ttbar_H7\", \"ttbar_hdamp\", \"ttbar_pthard\", \"Wt_DS\", \"Wt_H7\", \"Wt_pthard\"] or len(dsids_in_hist) == 0:\n",
291+
" continue # data drawn separately, skip MC modeling variations and skip empty categories\n",
292+
"\n",
293+
" mc_stack.append(out[\"hist\"][:, :, \"NOSYS\"].integrate(\"dsid_and_campaign\", dsids_in_hist))\n",
294+
" labels.append(key)\n",
295+
"\n",
296+
"fig, ax1, ax2 = mplhep.data_model(\n",
297+
" data_hist=out[\"hist\"].integrate(\"dsid_and_campaign\", [dc for dc in out[\"hist\"].axes[1] if \"data\" in dc])[:, \"NOSYS\"],\n",
298+
" stacked_components=mc_stack,\n",
299+
" stacked_labels=labels,\n",
300+
" # https://scikit-hep.org/mplhep/gallery/model_with_stacked_and_unstacked_histograms_components/\n",
301+
" # unstacked_components=[],\n",
302+
" # unstacked_labels=[],\n",
303+
" xlabel=out[\"hist\"].axes[0].label,\n",
304+
" ylabel=\"Entries\",\n",
305+
")\n",
306+
"\n",
307+
"mplhep.atlas.label(\"Internal\", ax=ax1, data=True, lumi=f\"{utils.integrated_luminosity(\"\", total=True) / 1000:.0f}\", com=\"13/ \\\\ 13.6 \\\\ TeV\")\n",
308+
"mplhep.mpl_magic(ax=ax1)\n",
309+
"ax2.set_ylim([0.5, 1.5])\n",
310+
"\n",
311+
"# compare to e.g. https://atlas.web.cern.ch/Atlas/GROUPS/PHYSICS/PAPERS/HDBS-2020-11/fig_02a.png\n",
312+
"fig.savefig(\"el_pt.png\")"
313+
]
314+
},
315+
{
316+
"cell_type": "code",
317+
"execution_count": null,
318+
"id": "b2e3efe0-f724-4206-b233-202a51729014",
319+
"metadata": {},
320+
"outputs": [],
321+
"source": [
322+
"# save to disk\n",
323+
"import uhi.io.json\n",
324+
"\n",
325+
"with gzip.open(\"hist.json.gz\", \"w\") as f:\n",
326+
" f.write(json.dumps(out[\"hist\"], default=uhi.io.json.default).encode(\"utf-8\"))\n",
327+
"\n",
328+
"with gzip.open(\"hist.json.gz\") as f:\n",
329+
" h = hist.Hist(json.loads(f.read(), object_hook=uhi.io.json.object_hook))\n",
330+
"\n",
331+
"h[:, \"data_data15\", \"NOSYS\"]"
332+
]
333+
}
334+
],
335+
"metadata": {
336+
"kernelspec": {
337+
"display_name": "Python 3 (ipykernel)",
338+
"language": "python",
339+
"name": "python3"
340+
},
341+
"language_info": {
342+
"codemirror_mode": {
343+
"name": "ipython",
344+
"version": 3
345+
},
346+
"file_extension": ".py",
347+
"mimetype": "text/x-python",
348+
"name": "python",
349+
"nbconvert_exporter": "python",
350+
"pygments_lexer": "ipython3",
351+
"version": "3.12.11"
352+
}
353+
},
354+
"nbformat": 4,
355+
"nbformat_minor": 5
356+
}

atlas/ntuple_production/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@
4141
voms-proxy-init -voms atlas
4242
```
4343

44-
- After ntuple production: `write_ntuple_metadata.py` saves relevant metadata of input and output containers plus xrootd ntuple file paths to disk.
44+
- After ntuple production: `collect_file_metadata.py` saves relevant metadata of input and output containers plus xrootd ntuple file paths to disk.
4545

4646
## Notes for first and potential second production
4747

0 commit comments

Comments
 (0)