Skip to content

Commit 08eb95d

Browse files
authored
refactor: Improve skimming and metadata code organization, naming, and S3 support (#8)
1 parent 337a379 commit 08eb95d

File tree

413 files changed

+5519
-97705
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

413 files changed

+5519
-97705
lines changed

cms/.gitattributes

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
*.ipynb filter=nbstripout

cms/README.md

Lines changed: 27 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,24 +16,40 @@ cms/
1616
analysis.py # Main analysis script
1717
pixi.toml, pixi.lock # Environment configuration (pixi)
1818
analysis/ # Analysis logic and base classes
19-
corrections/ # Correction files (JSON, text, etc.)
20-
example/ # Example datasets and outputs
21-
user/ # User analysis configuration, cuts, observables
22-
utils/ # Utility modules (output, plotting, stats, etc.)
19+
example_opendata/ # Open-data example configs, cuts, datasets
20+
example_cms/ # CMS internal-style example configs
21+
utils/ # Utility modules (output manager, skimming, schema, etc.)
2322
```
24-
- Configuration files for the analysis are found in `cms/user/` (e.g., `configuration.py`).
23+
- Start from the example configurations in `example_opendata/configs/` or `example_cms/configs/`—they provide complete analysis dictionaries (datasets, skimming, channels) that you can copy and adapt for your own campaign.
2524
- Main scripts and entry points are in `cms/`.
2625

2726
## Metadata and preprocessing
28-
Metadata extraction and preprocessing are handled before the main analysis. Metadata includes information about datasets, event counts, and cross sections, and is used to configure the analysis and normalization. Preprocessing steps may include filtering, object selection, and preparing input files for skimming and analysis.
27+
Metadata extraction and preprocessing are handled before the main analysis. Metadata includes information about datasets, event counts, and cross-sections, and is used to configure the analysis and normalization. Preprocessing steps may include filtering, object selection, and preparing input files for skimming and analysis.
2928

3029
## Skimming
31-
To skim NanoAOD datasets, use the provided scripts and configuration files in the `analysis/` and `user/` directories. Adjust the configuration as needed for your analysis channels and observables.
30+
Preprocessing is controlled by the `preprocess` block in the configuration. The `skimming` subsection now uses a single `output` stanza to steer how skimmed NanoAOD chunks are persisted:
3231

33-
Currently, the code writes out skimmed files as intermediate outputs. The plan is to integrate the workflow so that all steps, including skimming, are performed on-the-fly without writing intermediate files, streamlining the analysis process.
32+
```python
33+
preprocess = {
34+
"skimming": {
35+
"function": default_skim_selection,
36+
"use": [("PuppiMET", None), ("HLT", None)],
37+
"output": {
38+
"format": "parquet", # other options: root_ttree, rntuple, safetensors (stubs)
39+
"local": True,
40+
"base_uri": "s3://bucket", # optional override for remote storage
41+
"to_kwargs": {"compression": "zstd"}, # forwarded to ak.to_parquet
42+
"from_kwargs": {"storage_options": {...}} # forwarded to NanoEventsFactory.from_parquet
43+
},
44+
},
45+
}
46+
```
47+
48+
The file suffix is fixed to `{dataset}/file_{index}/part_{chunk}.{ext}`, so switching between local and remote storage only requires changing the `local` flag and optional `base_uri`.
3449

35-
If you need pre-skimmed data, it is available on CERNBox upon request. Please contact Mohamed Aly (mohamed.aly@cern.ch) for access.
36-
If you want to reproduce the skimmed files yourself, set the option `general.run_skimming=True` in the configuration file `cms/user/configuration.py`. This takes roughly 1-1.5 hours for the whole set of data. If you want only a subset, you can specify the maximum number of files to process per dataset using the `datasets.max_files` option in the same configuration file under the dataset configuration section.
50+
- Set `general.run_skimming=True` to regenerate skims. Use `datasets.max_files` to limit input size when experimenting.
51+
- Downstream steps load the same path, so no separate cache copy is needed; cached Awkward objects are still produced automatically for faster reruns.
52+
- Dataset-level options such as lumi masks live next to each dataset definition (for example `lumi_mask`: `{ "function": cuts.lumi_mask, "use": [...], "static_kwargs": {"lumifile": "...json"} }`).
3753

3854
## Running code
3955
To run the main analysis chain, execute the relevant Python scripts or notebooks. Outputs such as histograms and fit results will be saved in the `outputs/` directory. For example:
@@ -48,4 +64,4 @@ The following is guaranteed to produce a result if skimming is already performed
4864

4965
```sh
5066
python3 analysis.py general.run_skimming=False general.read_from_cache=True general.run_mva_training=False general.run_plots_only=False general.run_metadata_generation=False
51-
```
67+
```

cms/analysis.py

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,12 @@
1010
import warnings
1111

1212
from analysis.nondiff import NonDiffAnalysis
13-
from user.configuration import config as ZprimeConfig
13+
from example_opendata.configs.configuration import config as ZprimeConfig
1414
from utils.datasets import ConfigurableDatasetManager
1515
from utils.logging import setup_logging, log_banner
1616
from utils.schema import Config, load_config_with_restricted_cli
1717
from utils.metadata_extractor import NanoAODMetadataGenerator
18-
from utils.skimming import process_workitems_with_skimming
18+
from utils.skimming import process_and_load_events
1919
from utils.output_manager import OutputDirectoryManager
2020

2121
# -----------------------------
@@ -56,17 +56,20 @@ def main():
5656
# Generate metadata and fileset from NanoAODs
5757
generator = NanoAODMetadataGenerator(dataset_manager=dataset_manager, output_manager=output_manager)
5858
generator.run(generate_metadata=config.general.run_metadata_generation)
59-
fileset = generator.fileset
59+
datasets = generator.datasets
6060
workitems = generator.workitems
6161
if not workitems:
6262
logger.error("No workitems available. Please ensure metadata generation completed successfully.")
6363
sys.exit(1)
64+
if not datasets:
65+
logger.error("No datasets available. Please ensure metadata generation completed successfully.")
66+
sys.exit(1)
6467

6568
logger.info(log_banner("SKIMMING AND PROCESSING"))
66-
logger.info(f"Processing {len(workitems)} workitems")
69+
logger.info(f"Processing {len(workitems)} workitems across {len(datasets)} datasets")
6770

68-
# Process workitems with dask-awkward
69-
processed_datasets = process_workitems_with_skimming(workitems, config, output_manager, fileset, generator.nanoaods_summary)
71+
# Process workitems and populate Dataset objects with events
72+
datasets = process_and_load_events(workitems, config, output_manager, datasets, generator.nanoaods_summary)
7073

7174

7275
analysis_mode = config.general.analysis
@@ -77,7 +80,7 @@ def main():
7780
return
7881
elif analysis_mode == "nondiff":
7982
logger.info(log_banner("Running Non-Differentiable Analysis"))
80-
nondiff_analysis = NonDiffAnalysis(config, processed_datasets, output_manager)
83+
nondiff_analysis = NonDiffAnalysis(config, datasets, output_manager)
8184
nondiff_analysis.run_analysis_chain()
8285

8386

cms/analysis/base.py

Lines changed: 29 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ class Analysis:
6363
def __init__(
6464
self,
6565
config: Dict[str, Any],
66-
processed_datasets: Dict[str, List[Tuple[Any, Dict[str, Any]]]],
66+
datasets: List,
6767
output_manager: OutputDirectoryManager,
6868
) -> None:
6969
"""
@@ -78,16 +78,16 @@ def __init__(
7878
- 'corrections': Correction configurations
7979
- 'channels': Analysis channel definitions
8080
- 'general': General settings including output directory
81-
processed_datasets : Dict[str, List[Tuple[Any, Dict[str, Any]]]]
82-
Pre-processed datasets from skimming (required)
81+
datasets : List[Dataset]
82+
List of Dataset objects with populated events from skimming (required)
8383
output_manager : OutputDirectoryManager
8484
Centralized output directory manager (required)
8585
"""
8686
self.config = config
8787
self.channels = config.channels
8888
self.systematics = config.systematics
8989
self.corrections = config.corrections
90-
self.processed_datasets = processed_datasets
90+
self.datasets = datasets
9191
self.output_manager = output_manager
9292
self.corrlib_evaluators = self._load_correctionlib()
9393

@@ -161,13 +161,16 @@ def get_good_objects(
161161
"""
162162
good_objects = {}
163163
for mask_config in masks:
164-
mask_args = get_function_arguments(
164+
mask_args, mask_static_kwargs = get_function_arguments(
165165
mask_config.use,
166166
object_copies,
167167
function_name=mask_config.function.__name__,
168+
static_kwargs=mask_config.get("static_kwargs"),
168169
)
169170

170-
selection_mask = mask_config.function(*mask_args)
171+
selection_mask = mask_config.function(
172+
*mask_args, **mask_static_kwargs
173+
)
171174
if not isinstance(selection_mask, ak.Array):
172175
raise TypeError(
173176
f"Mask must be an awkward array. Got {type(selection_mask)}"
@@ -301,6 +304,7 @@ def apply_syst_function(
301304
function_args: List[ak.Array],
302305
affected_arrays: Union[ak.Array, List[ak.Array]],
303306
operation: str,
307+
static_kwargs: Optional[Dict[str, Any]] = None,
304308
) -> Union[ak.Array, List[ak.Array]]:
305309
"""
306310
Apply function-based systematic variation.
@@ -312,7 +316,9 @@ def apply_syst_function(
312316
syst_function : Callable[..., ak.Array]
313317
Variation function
314318
function_args : List[ak.Array]
315-
Function arguments
319+
Positional arguments for the variation function
320+
static_kwargs : Optional[Dict[str, Any]]
321+
Static keyword arguments for the variation function
316322
affected_arrays : Union[ak.Array, List[ak.Array]]
317323
Array(s) to modify
318324
operation : str
@@ -324,7 +330,8 @@ def apply_syst_function(
324330
Modified array(s)
325331
"""
326332
logger.debug("Applying function-based systematic: %s", syst_name)
327-
variation = syst_function(*function_args)
333+
kwargs = static_kwargs or {}
334+
variation = syst_function(*function_args, **kwargs)
328335

329336
if isinstance(affected_arrays, list):
330337
return [
@@ -459,10 +466,11 @@ def apply_object_corrections(
459466
continue
460467

461468
# Prepare arguments and targets
462-
args = get_function_arguments(
469+
corr_args, corr_static_kwargs = get_function_arguments(
463470
correction.use,
464471
object_copies,
465472
function_name=f"correction::{correction.name}",
473+
static_kwargs=correction.get("static_kwargs"),
466474
)
467475
targets = self._get_target_arrays(
468476
correction.target,
@@ -487,7 +495,7 @@ def apply_object_corrections(
487495
correction_name=correction.name,
488496
correction_key=key,
489497
direction=corr_direction,
490-
correction_args=args,
498+
correction_args=corr_args,
491499
target=targets,
492500
operation=operation,
493501
transform=transform,
@@ -498,9 +506,10 @@ def apply_object_corrections(
498506
corrected_values = self.apply_syst_function(
499507
syst_name=correction.name,
500508
syst_function=syst_func,
501-
function_args=args,
509+
function_args=corr_args,
502510
affected_arrays=targets,
503511
operation=operation,
512+
static_kwargs=corr_static_kwargs,
504513
)
505514
else:
506515
corrected_values = targets
@@ -542,10 +551,11 @@ def apply_event_weight_correction(
542551
return weights
543552

544553
# Prepare arguments
545-
args = get_function_arguments(
554+
weight_args, weight_static_kwargs = get_function_arguments(
546555
systematic.use,
547556
object_copies,
548557
function_name=f"systematic::{systematic.name}",
558+
static_kwargs=systematic.get("static_kwargs"),
549559
)
550560
operation = systematic.op
551561
key = systematic.key
@@ -559,7 +569,7 @@ def apply_event_weight_correction(
559569
correction_name=systematic.name,
560570
correction_key=key,
561571
direction=corr_direction,
562-
correction_args=args,
572+
correction_args=weight_args,
563573
target=weights,
564574
operation=operation,
565575
transform=transform,
@@ -570,9 +580,10 @@ def apply_event_weight_correction(
570580
return self.apply_syst_function(
571581
syst_name=systematic.name,
572582
syst_function=syst_func,
573-
function_args=args,
583+
function_args=weight_args,
574584
affected_arrays=weights,
575585
operation=operation,
586+
static_kwargs=weight_static_kwargs,
576587
)
577588
return weights
578589

@@ -596,10 +607,11 @@ def compute_ghost_observables(
596607
for ghost in self.config.ghost_observables:
597608

598609
logger.debug("Computing ghost observables: %s", ghost.names)
599-
args = get_function_arguments(
600-
ghost.use, object_copies, function_name=ghost.function.__name__
610+
ghost_args, ghost_static_kwargs = get_function_arguments(
611+
ghost.use, object_copies, function_name=ghost.function.__name__,
612+
static_kwargs=ghost.get("static_kwargs")
601613
)
602-
outputs = ghost.function(*args)
614+
outputs = ghost.function(*ghost_args, **ghost_static_kwargs)
603615

604616
# Normalize outputs to list
605617
if not isinstance(outputs, (list, tuple)):

0 commit comments

Comments
 (0)