Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ PolicyEngine constructs its representative household datasets through a multi-st

The Enhanced CPS (`make data-legacy`) produces a national-only calibrated dataset. For the current geography-specific pipeline, see [docs/calibration.md](docs/calibration.md).

The repo currently contains two calibration tracks:
- Legacy Enhanced CPS (`make data-legacy`), which uses the older `EnhancedCPS` / `build_loss_matrix()` path for national-only calibration.
- Unified calibration (`docs/calibration.md`), which uses `storage/calibration/policy_data.db` and the sparse matrix + L0 pipeline for current national and geography-specific builds.

For detailed calibration usage, see [docs/calibration.md](docs/calibration.md) and [modal_app/README.md](modal_app/README.md).

### Running the Full Pipeline
Expand Down
2 changes: 2 additions & 0 deletions docs/calibration.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

The unified calibration pipeline reweights cloned CPS records to match administrative targets using L0-regularized optimization. This guide covers the main workflows: lightweight build-then-fit, full pipeline with PUF, and fitting from a saved package.

This is the current production calibration path. The older national-only Enhanced CPS path (`make data-legacy`) remains in the repo for legacy reproduction and uses a separate `EnhancedCPS` / `build_loss_matrix()` flow.

## Quick Start

```bash
Expand Down
4 changes: 3 additions & 1 deletion docs/methodology.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ graph TD
classDef output fill:#5091CC,stroke:#2C6496,color:#FFFFFF
```

The current production calibration path is the geography-specific target-database pipeline shown above. The legacy national-only Enhanced CPS reweighting branch remains in the repo for reproduction, so calibration-target changes that must affect both paths need updates in both the unified database pipeline and the older `EnhancedCPS` / `build_loss_matrix()` flow.

## Stage 1: Variable Imputation

The imputation process begins by aging both the CPS and PUF datasets to the target year, then creating a copy of the aged CPS dataset. This allows us to preserve the original CPS structure while adding imputed tax variables.
Expand Down Expand Up @@ -298,4 +300,4 @@ Key files:
- `policyengine_us_data/calibration/unified_matrix_builder.py` — Sparse calibration matrix builder
- `policyengine_us_data/calibration/clone_and_assign.py` — Geography cloning and block assignment
- `policyengine_us_data/calibration/publish_local_area.py` — H5 file generation
- `policyengine_us_data/db/` — Target database ETL scripts
- `policyengine_us_data/db/` — Target database ETL scripts
34 changes: 28 additions & 6 deletions policyengine_us_data/db/etl_national_targets.py
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,22 @@ def extract_national_targets(dataset: str = DEFAULT_DATASET):
"notes": "ACA Premium Tax Credit recipients",
"year": HARDCODED_YEAR,
},
{
"constraint_variable": "spm_unit_energy_subsidy_reported",
"target_variable": "household_count",
"household_count": 5_939_605,
"source": "https://liheappm.acf.gov/sites/default/files/private/congress/profiles/2023/FY2023AllStates%28National%29Profile-508Compliant.pdf",
"notes": "LIHEAP total households served by state programs",
"year": 2023,
},
{
"constraint_variable": "spm_unit_energy_subsidy_reported",
"target_variable": "household_count",
"household_count": 5_876_646,
"source": "https://liheappm.acf.gov/sites/default/files/private/congress/profiles/2024/FY2024_AllStates%28National%29_Profile.pdf",
"notes": "LIHEAP total households served by state programs",
"year": 2024,
},
]

# Add SSN card type NONE targets for multiple years
Expand Down Expand Up @@ -730,6 +746,8 @@ def load_national_targets(
for cond_target in conditional_targets:
constraint_var = cond_target["constraint_variable"]
target_year = cond_target["year"]
target_variable = cond_target.get("target_variable", "person_count")
target_value = cond_target.get(target_variable)

# Determine constraint details
if constraint_var == "medicaid":
Expand All @@ -740,6 +758,10 @@ def load_national_targets(
stratum_notes = "National ACA Premium Tax Credit Recipients"
constraint_operation = ">"
constraint_value = "0"
elif constraint_var == "spm_unit_energy_subsidy_reported":
stratum_notes = "National LIHEAP Recipient Households"
constraint_operation = ">"
constraint_value = "0"
elif constraint_var == "ssn_card_type":
stratum_notes = "National Undocumented Population"
constraint_operation = "=="
Expand All @@ -765,23 +787,23 @@ def load_national_targets(
session.query(Target)
.filter(
Target.stratum_id == existing_stratum.stratum_id,
Target.variable == "person_count",
Target.variable == target_variable,
Target.period == target_year,
)
.first()
)

if existing_target:
existing_target.value = cond_target["person_count"]
existing_target.value = target_value
existing_target.source = "PolicyEngine"
print(f"Updated enrollment target for {constraint_var}")
else:
# Add new target to existing stratum
new_target = Target(
stratum_id=existing_stratum.stratum_id,
variable="person_count",
variable=target_variable,
period=target_year,
value=cond_target["person_count"],
value=target_value,
active=True,
source="PolicyEngine",
notes=f"{cond_target['notes']} | Source: {cond_target['source']}",
Expand All @@ -807,9 +829,9 @@ def load_national_targets(
# Add target
new_stratum.targets_rel = [
Target(
variable="person_count",
variable=target_variable,
period=target_year,
value=cond_target["person_count"],
value=target_value,
active=True,
source="PolicyEngine",
notes=f"{cond_target['notes']} | Source: {cond_target['source']}",
Expand Down
134 changes: 133 additions & 1 deletion policyengine_us_data/utils/loss.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@
import pandas as pd
import numpy as np
import logging
import sqlite3

from policyengine_us_data.storage import CALIBRATION_FOLDER
from policyengine_us_data.storage import CALIBRATION_FOLDER, STORAGE_FOLDER
from policyengine_us_data.storage.calibration_targets.pull_soi_targets import (
STATE_ABBR_TO_FIPS,
)
Expand Down Expand Up @@ -118,6 +119,133 @@ def fmt(x):
return f"{x / 1e9:.1f}bn"


def _parse_constraint_value(value):
if value == "True":
return True
if value == "False":
return False
try:
return int(value)
except (TypeError, ValueError):
try:
return float(value)
except (TypeError, ValueError):
return value


def _apply_constraint(values, operation: str, raw_value: str):
if operation == "in":
allowed_values = [part.strip() for part in raw_value.split("|")]
return np.isin(values, allowed_values)

value = _parse_constraint_value(raw_value)
if operation in ("equals", "==", "="):
return values == value
if operation in ("greater_than", ">"):
return values > value
if operation in ("greater_than_or_equal", ">="):
return values >= value
if operation in ("less_than", "<"):
return values < value
if operation in ("less_than_or_equal", "<="):
return values <= value
if operation in ("not_equals", "!=", "<>"):
return values != value

raise ValueError(f"Unsupported stratum constraint operation: {operation}")


def _geo_label_from_ucgid(ucgid_str: str) -> str:
if ucgid_str in (None, "", "0100000US"):
return "nation"
return f"geo/{ucgid_str}"


def _add_liheap_targets_from_db(loss_matrix, targets_list, sim, time_period):
db_path = STORAGE_FOLDER / "calibration" / "policy_data.db"
if not db_path.exists():
return targets_list, loss_matrix

query = """
SELECT
t.target_id,
t.variable,
t.value AS target_value,
s.notes,
sc.constraint_variable,
sc.operation,
sc.value AS constraint_value
FROM targets t
JOIN strata s
ON s.stratum_id = t.stratum_id
JOIN stratum_constraints sc
ON sc.stratum_id = s.stratum_id
WHERE
t.active = 1
AND t.reform_id = 0
AND t.period = ?
AND s.notes LIKE '%LIHEAP%'
ORDER BY t.target_id
"""

with sqlite3.connect(db_path) as conn:
target_rows = pd.read_sql_query(query, conn, params=[time_period])

if target_rows.empty:
return targets_list, loss_matrix

household_values_cache = {
"household_weight": sim.calculate("household_weight").values
}

def get_household_values(variable: str):
if variable not in household_values_cache:
household_values_cache[variable] = sim.calculate(
variable,
map_to="household",
).values
return household_values_cache[variable]

n_households = len(household_values_cache["household_weight"])

for _, target_df in target_rows.groupby("target_id", sort=False):
mask = np.ones(n_households, dtype=bool)
for row in target_df.itertuples(index=False):
if (
row.constraint_variable == "ucgid_str"
and row.constraint_value == "0100000US"
):
continue
values = get_household_values(row.constraint_variable)
mask &= _apply_constraint(
values,
row.operation,
row.constraint_value,
)

variable = target_df["variable"].iat[0]
if variable == "household_count":
metric = mask.astype(float)
else:
metric = np.where(mask, get_household_values(variable), 0.0)

ucgid_constraints = target_df.loc[
target_df.constraint_variable == "ucgid_str", "constraint_value"
]
geo_label = _geo_label_from_ucgid(
ucgid_constraints.iat[0] if not ucgid_constraints.empty else None
)
label = f"{geo_label}/db/liheap/{variable}"
loss_matrix[label] = metric
targets_list.append(target_df["target_value"].iat[0])

logging.info(
f"Loaded {target_rows['target_id'].nunique()} LIHEAP targets from the local targets DB"
)

return targets_list, loss_matrix


def build_loss_matrix(dataset: type, time_period):
loss_matrix = pd.DataFrame()
df = pe_to_soi(dataset, time_period)
Expand Down Expand Up @@ -667,6 +795,10 @@ def build_loss_matrix(dataset: type, time_period):
targets_array.extend(snap_state_targets)
loss_matrix = _add_snap_metric_columns(loss_matrix, sim)

targets_array, loss_matrix = _add_liheap_targets_from_db(
loss_matrix, targets_array, sim, time_period
)

del sim, df
gc.collect()

Expand Down
64 changes: 64 additions & 0 deletions tests/unit/test_etl_national_targets.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,3 +137,67 @@ def test_load_national_targets_deactivates_stale_baseline_rows(tmp_path, monkeyp
in (target.notes or "")
for target in reform_rows
)


def test_load_national_targets_supports_liheap_household_counts(tmp_path, monkeypatch):
calibration_dir = tmp_path / "calibration"
calibration_dir.mkdir()
db_uri = f"sqlite:///{calibration_dir / 'policy_data.db'}"
engine = create_database(db_uri)

with Session(engine) as session:
national = _make_stratum(session, notes="United States")
assert national is not None

monkeypatch.setattr(
"policyengine_us_data.db.etl_national_targets.STORAGE_FOLDER",
tmp_path,
)

conditional_targets = [
{
"constraint_variable": "spm_unit_energy_subsidy_reported",
"target_variable": "household_count",
"household_count": 5_876_646,
"source": "https://example.com/liheap-2024.pdf",
"notes": "LIHEAP total households served by state programs",
"year": 2024,
}
]

load_national_targets(
direct_targets_df=pd.DataFrame(),
tax_filer_df=pd.DataFrame(),
tax_expenditure_df=pd.DataFrame(),
conditional_targets=conditional_targets,
)

with Session(engine) as session:
liheap_stratum = (
session.query(Stratum)
.filter(Stratum.notes == "National LIHEAP Recipient Households")
.first()
)
assert liheap_stratum is not None

constraints = {
(
constraint.constraint_variable,
constraint.operation,
constraint.value,
)
for constraint in liheap_stratum.constraints_rel
}
assert ("spm_unit_energy_subsidy_reported", ">", "0") in constraints

liheap_target = (
session.query(Target)
.filter(
Target.stratum_id == liheap_stratum.stratum_id,
Target.variable == "household_count",
Target.period == 2024,
)
.first()
)
assert liheap_target is not None
assert liheap_target.value == 5_876_646
Loading