diff --git a/README.md b/README.md index 5270fce79..ca038e6f5 100644 --- a/README.md +++ b/README.md @@ -28,6 +28,10 @@ PolicyEngine constructs its representative household datasets through a multi-st The Enhanced CPS (`make data-legacy`) produces a national-only calibrated dataset. For the current geography-specific pipeline, see [docs/calibration.md](docs/calibration.md). +The repo currently contains two calibration tracks: +- Legacy Enhanced CPS (`make data-legacy`), which uses the older `EnhancedCPS` / `build_loss_matrix()` path for national-only calibration. +- Unified calibration (`docs/calibration.md`), which uses `storage/calibration/policy_data.db` and the sparse matrix + L0 pipeline for current national and geography-specific builds. + For detailed calibration usage, see [docs/calibration.md](docs/calibration.md) and [modal_app/README.md](modal_app/README.md). ### Running the Full Pipeline diff --git a/docs/calibration.md b/docs/calibration.md index fa9f9ac2e..9787f0335 100644 --- a/docs/calibration.md +++ b/docs/calibration.md @@ -2,6 +2,8 @@ The unified calibration pipeline reweights cloned CPS records to match administrative targets using L0-regularized optimization. This guide covers the main workflows: lightweight build-then-fit, full pipeline with PUF, and fitting from a saved package. +This is the current production calibration path. The older national-only Enhanced CPS path (`make data-legacy`) remains in the repo for legacy reproduction and uses a separate `EnhancedCPS` / `build_loss_matrix()` flow. + ## Quick Start ```bash diff --git a/docs/methodology.md b/docs/methodology.md index 9cca425b7..601f41b67 100644 --- a/docs/methodology.md +++ b/docs/methodology.md @@ -95,6 +95,8 @@ graph TD classDef output fill:#5091CC,stroke:#2C6496,color:#FFFFFF ``` +The current production calibration path is the geography-specific target-database pipeline shown above. The legacy national-only Enhanced CPS reweighting branch remains in the repo for reproduction, so calibration-target changes that must affect both paths need updates in both the unified database pipeline and the older `EnhancedCPS` / `build_loss_matrix()` flow. + ## Stage 1: Variable Imputation The imputation process begins by aging both the CPS and PUF datasets to the target year, then creating a copy of the aged CPS dataset. This allows us to preserve the original CPS structure while adding imputed tax variables. @@ -298,4 +300,4 @@ Key files: - `policyengine_us_data/calibration/unified_matrix_builder.py` — Sparse calibration matrix builder - `policyengine_us_data/calibration/clone_and_assign.py` — Geography cloning and block assignment - `policyengine_us_data/calibration/publish_local_area.py` — H5 file generation -- `policyengine_us_data/db/` — Target database ETL scripts \ No newline at end of file +- `policyengine_us_data/db/` — Target database ETL scripts diff --git a/policyengine_us_data/db/etl_national_targets.py b/policyengine_us_data/db/etl_national_targets.py index 6e6b3b3ab..773411dc5 100644 --- a/policyengine_us_data/db/etl_national_targets.py +++ b/policyengine_us_data/db/etl_national_targets.py @@ -321,6 +321,22 @@ def extract_national_targets(dataset: str = DEFAULT_DATASET): "notes": "ACA Premium Tax Credit recipients", "year": HARDCODED_YEAR, }, + { + "constraint_variable": "spm_unit_energy_subsidy_reported", + "target_variable": "household_count", + "household_count": 5_939_605, + "source": "https://liheappm.acf.gov/sites/default/files/private/congress/profiles/2023/FY2023AllStates%28National%29Profile-508Compliant.pdf", + "notes": "LIHEAP total households served by state programs", + "year": 2023, + }, + { + "constraint_variable": "spm_unit_energy_subsidy_reported", + "target_variable": "household_count", + "household_count": 5_876_646, + "source": "https://liheappm.acf.gov/sites/default/files/private/congress/profiles/2024/FY2024_AllStates%28National%29_Profile.pdf", + "notes": "LIHEAP total households served by state programs", + "year": 2024, + }, ] # Add SSN card type NONE targets for multiple years @@ -730,6 +746,8 @@ def load_national_targets( for cond_target in conditional_targets: constraint_var = cond_target["constraint_variable"] target_year = cond_target["year"] + target_variable = cond_target.get("target_variable", "person_count") + target_value = cond_target.get(target_variable) # Determine constraint details if constraint_var == "medicaid": @@ -740,6 +758,10 @@ def load_national_targets( stratum_notes = "National ACA Premium Tax Credit Recipients" constraint_operation = ">" constraint_value = "0" + elif constraint_var == "spm_unit_energy_subsidy_reported": + stratum_notes = "National LIHEAP Recipient Households" + constraint_operation = ">" + constraint_value = "0" elif constraint_var == "ssn_card_type": stratum_notes = "National Undocumented Population" constraint_operation = "==" @@ -765,23 +787,23 @@ def load_national_targets( session.query(Target) .filter( Target.stratum_id == existing_stratum.stratum_id, - Target.variable == "person_count", + Target.variable == target_variable, Target.period == target_year, ) .first() ) if existing_target: - existing_target.value = cond_target["person_count"] + existing_target.value = target_value existing_target.source = "PolicyEngine" print(f"Updated enrollment target for {constraint_var}") else: # Add new target to existing stratum new_target = Target( stratum_id=existing_stratum.stratum_id, - variable="person_count", + variable=target_variable, period=target_year, - value=cond_target["person_count"], + value=target_value, active=True, source="PolicyEngine", notes=f"{cond_target['notes']} | Source: {cond_target['source']}", @@ -807,9 +829,9 @@ def load_national_targets( # Add target new_stratum.targets_rel = [ Target( - variable="person_count", + variable=target_variable, period=target_year, - value=cond_target["person_count"], + value=target_value, active=True, source="PolicyEngine", notes=f"{cond_target['notes']} | Source: {cond_target['source']}", diff --git a/policyengine_us_data/utils/loss.py b/policyengine_us_data/utils/loss.py index 8588c3263..e141cdf43 100644 --- a/policyengine_us_data/utils/loss.py +++ b/policyengine_us_data/utils/loss.py @@ -3,8 +3,9 @@ import pandas as pd import numpy as np import logging +import sqlite3 -from policyengine_us_data.storage import CALIBRATION_FOLDER +from policyengine_us_data.storage import CALIBRATION_FOLDER, STORAGE_FOLDER from policyengine_us_data.storage.calibration_targets.pull_soi_targets import ( STATE_ABBR_TO_FIPS, ) @@ -118,6 +119,133 @@ def fmt(x): return f"{x / 1e9:.1f}bn" +def _parse_constraint_value(value): + if value == "True": + return True + if value == "False": + return False + try: + return int(value) + except (TypeError, ValueError): + try: + return float(value) + except (TypeError, ValueError): + return value + + +def _apply_constraint(values, operation: str, raw_value: str): + if operation == "in": + allowed_values = [part.strip() for part in raw_value.split("|")] + return np.isin(values, allowed_values) + + value = _parse_constraint_value(raw_value) + if operation in ("equals", "==", "="): + return values == value + if operation in ("greater_than", ">"): + return values > value + if operation in ("greater_than_or_equal", ">="): + return values >= value + if operation in ("less_than", "<"): + return values < value + if operation in ("less_than_or_equal", "<="): + return values <= value + if operation in ("not_equals", "!=", "<>"): + return values != value + + raise ValueError(f"Unsupported stratum constraint operation: {operation}") + + +def _geo_label_from_ucgid(ucgid_str: str) -> str: + if ucgid_str in (None, "", "0100000US"): + return "nation" + return f"geo/{ucgid_str}" + + +def _add_liheap_targets_from_db(loss_matrix, targets_list, sim, time_period): + db_path = STORAGE_FOLDER / "calibration" / "policy_data.db" + if not db_path.exists(): + return targets_list, loss_matrix + + query = """ + SELECT + t.target_id, + t.variable, + t.value AS target_value, + s.notes, + sc.constraint_variable, + sc.operation, + sc.value AS constraint_value + FROM targets t + JOIN strata s + ON s.stratum_id = t.stratum_id + JOIN stratum_constraints sc + ON sc.stratum_id = s.stratum_id + WHERE + t.active = 1 + AND t.reform_id = 0 + AND t.period = ? + AND s.notes LIKE '%LIHEAP%' + ORDER BY t.target_id + """ + + with sqlite3.connect(db_path) as conn: + target_rows = pd.read_sql_query(query, conn, params=[time_period]) + + if target_rows.empty: + return targets_list, loss_matrix + + household_values_cache = { + "household_weight": sim.calculate("household_weight").values + } + + def get_household_values(variable: str): + if variable not in household_values_cache: + household_values_cache[variable] = sim.calculate( + variable, + map_to="household", + ).values + return household_values_cache[variable] + + n_households = len(household_values_cache["household_weight"]) + + for _, target_df in target_rows.groupby("target_id", sort=False): + mask = np.ones(n_households, dtype=bool) + for row in target_df.itertuples(index=False): + if ( + row.constraint_variable == "ucgid_str" + and row.constraint_value == "0100000US" + ): + continue + values = get_household_values(row.constraint_variable) + mask &= _apply_constraint( + values, + row.operation, + row.constraint_value, + ) + + variable = target_df["variable"].iat[0] + if variable == "household_count": + metric = mask.astype(float) + else: + metric = np.where(mask, get_household_values(variable), 0.0) + + ucgid_constraints = target_df.loc[ + target_df.constraint_variable == "ucgid_str", "constraint_value" + ] + geo_label = _geo_label_from_ucgid( + ucgid_constraints.iat[0] if not ucgid_constraints.empty else None + ) + label = f"{geo_label}/db/liheap/{variable}" + loss_matrix[label] = metric + targets_list.append(target_df["target_value"].iat[0]) + + logging.info( + f"Loaded {target_rows['target_id'].nunique()} LIHEAP targets from the local targets DB" + ) + + return targets_list, loss_matrix + + def build_loss_matrix(dataset: type, time_period): loss_matrix = pd.DataFrame() df = pe_to_soi(dataset, time_period) @@ -667,6 +795,10 @@ def build_loss_matrix(dataset: type, time_period): targets_array.extend(snap_state_targets) loss_matrix = _add_snap_metric_columns(loss_matrix, sim) + targets_array, loss_matrix = _add_liheap_targets_from_db( + loss_matrix, targets_array, sim, time_period + ) + del sim, df gc.collect() diff --git a/tests/unit/test_etl_national_targets.py b/tests/unit/test_etl_national_targets.py index 10e0ece31..6e9448376 100644 --- a/tests/unit/test_etl_national_targets.py +++ b/tests/unit/test_etl_national_targets.py @@ -137,3 +137,67 @@ def test_load_national_targets_deactivates_stale_baseline_rows(tmp_path, monkeyp in (target.notes or "") for target in reform_rows ) + + +def test_load_national_targets_supports_liheap_household_counts(tmp_path, monkeypatch): + calibration_dir = tmp_path / "calibration" + calibration_dir.mkdir() + db_uri = f"sqlite:///{calibration_dir / 'policy_data.db'}" + engine = create_database(db_uri) + + with Session(engine) as session: + national = _make_stratum(session, notes="United States") + assert national is not None + + monkeypatch.setattr( + "policyengine_us_data.db.etl_national_targets.STORAGE_FOLDER", + tmp_path, + ) + + conditional_targets = [ + { + "constraint_variable": "spm_unit_energy_subsidy_reported", + "target_variable": "household_count", + "household_count": 5_876_646, + "source": "https://example.com/liheap-2024.pdf", + "notes": "LIHEAP total households served by state programs", + "year": 2024, + } + ] + + load_national_targets( + direct_targets_df=pd.DataFrame(), + tax_filer_df=pd.DataFrame(), + tax_expenditure_df=pd.DataFrame(), + conditional_targets=conditional_targets, + ) + + with Session(engine) as session: + liheap_stratum = ( + session.query(Stratum) + .filter(Stratum.notes == "National LIHEAP Recipient Households") + .first() + ) + assert liheap_stratum is not None + + constraints = { + ( + constraint.constraint_variable, + constraint.operation, + constraint.value, + ) + for constraint in liheap_stratum.constraints_rel + } + assert ("spm_unit_energy_subsidy_reported", ">", "0") in constraints + + liheap_target = ( + session.query(Target) + .filter( + Target.stratum_id == liheap_stratum.stratum_id, + Target.variable == "household_count", + Target.period == 2024, + ) + .first() + ) + assert liheap_target is not None + assert liheap_target.value == 5_876_646