Real-time transient classification pipeline using Superphot+. Consumes astronomical alerts from Kafka (LSST and ZTF surveys), fits light curves with SVI sampling, classifies supernovae using LightGBM, and posts results to a Fritz/SkyPortal instance.
Kafka Topics Consumers Pipeline Output
───────────── ───────────────────── ────────────────── ─────────────────
LSST_alerts_results → alerts_consumer_lsst.py → superphot_boom_lsst.py → Fritz annotations
→ CSV results
→ diagnostic plots
ZTF_alerts_results → alerts_consumer_ztf.py → superphot_boom_ztf.py → Fritz annotations
→ CSV results
→ diagnostic plots
Local CSV files → run_superphot_from_csv() → JSON results
(data/photometry/) → run_batch_from_csv() → CSV results
→ diagnostic plots
filter-support/
├── src/filter_support/
│ ├── superphot_boom_lsst.py # LSST classification pipeline
│ ├── superphot_boom_ztf.py # ZTF classification pipeline
│ ├── alerts_consumer_lsst.py # LSST Kafka consumer
│ ├── alerts_consumer_ztf.py # ZTF Kafka consumer
│ └── superphot_results/ # Output diagnostic plots
├── data/photometry/ # Local CSV photometry files (500 ZTF sources)
├── data/models/
│ ├── global_priors_hier_svi/ # SVI sampler priors
│ ├── model_superphot_full.pt # Full-phase LightGBM classifier
│ ├── model_superphot_early.pt # Early-phase LightGBM classifier
│ ├── model_superphot_redshift.pt
│ └── model_superphot_early_redshift.pt
├── pyproject.toml
└── README.md
Each run_superphot() call performs the following:
- Fetch photometry from MongoDB (
LSST_alerts_auxorZTF_alerts_aux) - Optionally combine cross-survey photometry via aliases
- Filter to r and g bands, validate minimum data (>2 points per filter)
- Create SNAPI photometry, merge close-time observations
- Phase and truncate light curves (-50 to +100 days)
- Apply Milky Way extinction correction
- Normalize and pad to power-of-2 length
- Fit using numpyro SVI sampler (3000 iterations)
- Filter fits by quality score (cutoff 1.2)
- Classify using early-phase or full-phase LightGBM model
- Generate diagnostic plot (if probability > 0.5)
Returns (event_dict, image_path) with per-class probabilities for: SLSN-I, SN Ia, SN Ibc, SN II, SN IIn.
Both pipeline modules support direct CSV input, bypassing Kafka and MongoDB entirely. This is useful for batch-processing archival photometry stored locally.
The data/photometry/ directory contains CSV files for 500 random ZTF sources. Each CSV has standard ZTF alert columns (jd, fid, magpsf, sigmapsf, ra, dec, etc.).
from filter_support.superphot_boom_ztf import run_superphot_from_csv
result = run_superphot_from_csv("data/photometry/ZTF17aabuqoz.csv")
if result:
event_dict, image_path = resultfrom filter_support.superphot_boom_ztf import run_batch_from_csv
run_batch_from_csv("data/photometry", "superphot_results")The LSST module exposes the same functions:
from filter_support.superphot_boom_lsst import run_superphot_from_csv, run_batch_from_csv| File | Description |
|---|---|
superphot_results/<source_id>.json |
Per-source classification results |
superphot_results/superphot_results_ztf_batch.csv |
Combined results (ZTF module) |
superphot_results/superphot_results_lsst_batch.csv |
Combined results (LSST module) |
superphot_results/<source_id>_superphot.png |
Diagnostic plot (if probability > 0.5) |
The two Kafka consumers run independently and can operate simultaneously:
| LSST | ZTF | |
|---|---|---|
| Topic | LSST_alerts_results |
ZTF_alerts_results |
| CSV output | superphot_results_lsst.csv |
superphot_results_ztf.csv |
| State file | consumer_state_lsst.json |
consumer_state_ztf.json |
| Log file | superphot_lsst.log |
superphot_ztf.log |
Each consumer:
- Runs superphot on every alert that passes the filter
- Posts/updates annotations on Fritz with per-class probabilities
- Posts/replaces comments with best class and probability
- Persists state to JSON for crash recovery
Results are posted to the Orcus instance (orcusgate.org/api):
- Annotations: Per-class probability dict (POST on first run, PUT on updates)
- Comments:
Superphot+ Classification: SN Ia (Probability: 0.996)
The annotate_fritz() function handles both creation and update: if a POST fails because an annotation already exists, it automatically fetches the existing annotation ID and performs a PUT.
Note: The Kafka consumers require access to the BOOM database (MongoDB) and the Kafka alert streams. The CSV-based functions (run_superphot_from_csv, run_batch_from_csv) only require the model files and local photometry CSVs -- no MongoDB or Kafka needed.
- Python >= 3.11
- MongoDB (local or remote)
- Kafka broker
- Conda environment recommended (
superphot)
poetry installCreate ~/.env with:
BOOM_DATABASE__USERNAME=<mongodb_user>
BOOM_DATABASE__PASSWORD=<mongodb_password>
ORCUS_TOKEN=<fritz_api_token>
SLACK_BOT_TOKEN=<slack_token> # optional
Start from src/filter_support/:
# Run LSST consumer
python alerts_consumer_lsst.py
# Run ZTF consumer (separate terminal)
python alerts_consumer_ztf.py
# Batch-process local CSV photometry (no Kafka/MongoDB required)
python -c "from filter_support.superphot_boom_ztf import run_batch_from_csv; run_batch_from_csv()"- superphot-plus - SVI light curve fitting and LightGBM classification
- snapi - Photometry processing (phasing, truncation, normalization)
- jax / numpyro - Probabilistic SVI sampler backend
- pymongo - MongoDB access
- confluent-kafka / fastavro - Kafka consumer with Avro deserialization
- astropy / dustmaps - Coordinates, extinction correction