Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
e14f22f
start validation declaration in pipeline
sagemaso Nov 20, 2025
31d69d9
start validation declaration in pipeline
sagemaso Nov 20, 2025
8d1f07e
Merge remote-tracking branch 'origin/feature/validation_integration' …
sagemaso Nov 20, 2025
fcb2995
debug spacing
sagemaso Nov 26, 2025
6460cac
debug spacing
sagemaso Nov 26, 2025
9193ff3
add validation report as dataset
sagemaso Nov 26, 2025
b0aeb0b
add egon-validation as dependency
sagemaso Nov 26, 2025
d69f3e4
change how to save validation results, use db from pipeline
sagemaso Nov 26, 2025
57a45c2
change out_dir, use .dev
sagemaso Nov 26, 2025
3c8b1c1
debug table count
sagemaso Dec 3, 2025
942d89d
use egon-validation v1.1.0
sagemaso Dec 9, 2025
b727c0b
add validation rules to vg250
sagemaso Dec 11, 2025
059170f
Add sanity check validation to HouseholdElectricityDemand dataset
sagemaso Dec 15, 2025
a4ba079
Merge remote-tracking branch 'origin/feature/validation_integration' …
sagemaso Dec 17, 2025
655af1b
add validation boundaries.egon_map_zensus_buildings_residential and _…
sagemaso Dec 31, 2025
01e0123
Add automatic boundary/scenario-dependent validation parameter resolu…
sagemaso Jan 8, 2026
42808cd
correct spelling demand
sagemaso Jan 6, 2026
c9f2ce4
add formal validation to main demand datasets
sagemaso Jan 6, 2026
e95d92f
add different boundaries
sagemaso Jan 8, 2026
83aface
add 2 heat datasets
sagemaso Jan 9, 2026
ecb86dc
start sanity check integration
sagemaso Dec 9, 2025
9fc0e06
start storage sanity interation
sagemaso Dec 10, 2025
76f5923
migrate gas sanity rules
Dec 29, 2025
b489657
debug RuleResult: write debug information to message
sagemaso Dec 30, 2025
78e06ff
add sanity rules: gas_loads_generators.py
sagemaso Dec 30, 2025
fc03a3b
add sanity rules: heat_demand and electricity_capacity
sagemaso Dec 30, 2025
d99703d
debug sanity rules
sagemaso Dec 30, 2025
d31ef46
debug sanity rules
sagemaso Dec 30, 2025
04f5fda
add electrical loads sanity_check
Jan 5, 2026
6bd58c3
refactor on_validation_failure
sagemaso Dec 31, 2025
1de9edb
add different boundaries
sagemaso Jan 8, 2026
6faecaa
add grid datasets
sagemaso Jan 12, 2026
0116467
finalize grid datasets
sagemaso Jan 13, 2026
09ffbdf
add openstreetmap datasets
sagemaso Jan 13, 2026
5deafe1
add scenario dataset
sagemaso Jan 13, 2026
2e04112
add society datasets
sagemaso Jan 13, 2026
23a770d
add supply datasets
sagemaso Jan 14, 2026
6d03200
add SRID validation
sagemaso Jan 15, 2026
26095b2
add ArrayCardinalityValidation
sagemaso Jan 15, 2026
f762aa9
add comment to grid.egon_etrago_line_timeseries RowCountValidation
sagemaso Jan 15, 2026
f5d8c78
correct typo
sagemaso Jan 16, 2026
3998741
add example as validation placeholder
sagemaso Jan 16, 2026
8cd5368
delete scenario parameter
sagemaso Jan 16, 2026
e6f2dc4
delete .dev
sagemaso Jan 19, 2026
06c5235
refactor rule_ids
sagemaso Jan 19, 2026
65c1d70
remove .dev in final_validations.py
sagemaso Jan 20, 2026
0860b8a
add table first validation
sagemaso Jan 21, 2026
28d794f
move functionality from validation_utils to different files
sagemaso Jan 21, 2026
d38871f
use eGon-validation v1.2.1
sagemaso Jan 22, 2026
3e72509
fix import error bug
sagemaso Jan 22, 2026
6ae1b6b
fix bug circular import
sagemaso Jan 22, 2026
b6806f1
fix bug circular import
sagemaso Jan 22, 2026
dac7788
fix bug missing rule_id in ArrayCardinalityValidation initialization
sagemaso Jan 22, 2026
6707790
bug fix: remove spacing from dataset for task_id
sagemaso Jan 22, 2026
f663df9
bug fix: correct typo
sagemaso Jan 22, 2026
e56973c
correct linting errors
sagemaso Jan 22, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
593 changes: 593 additions & 0 deletions SANITY_CHECKS_MIGRATION.md

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ dependencies = [
"cdsapi",
"click<8.1",
"disaggregator @ git+https://github.com/openego/disaggregator.git@features/update-cache-directory#egg=disaggregator",
"egon-validation @ git+https://github.com/sagemaso/eGon-validation.git@v1.2.1",
"entsoe-py>=0.6.2",
"fiona==1.9.6",
"Flask-Session<0.6.0",
Expand Down
29 changes: 29 additions & 0 deletions src/egon/data/airflow/dags/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,9 @@
from egon.data.datasets.zensus_vg250 import ZensusVg250
from egon.data.metadata import Json_Metadata

from egon.data.datasets.validation_report import ValidationReport
from egon.data.datasets.final_validations import FinalValidations

# Set number of threads used by numpy and pandas
set_numexpr_threads()

Expand Down Expand Up @@ -730,6 +733,32 @@
]
)

with TaskGroup(group_id="final_validations") as final_validations_group:
# Cross-cutting validations that check data consistency across datasets
# These run after all data generation but before the validation report
final_validations = FinalValidations(
dependencies=[
insert_data_ch4_storages, # CH4Storages - for CH4 store validation
insert_H2_storage, # HydrogenStoreEtrago - for H2 saltcavern validation
storage_etrago, # StorageEtrago - general storage validation
hts_etrago_table,
fill_etrago_generators,
household_electricity_demand_annual,
cts_demand_buildings,
emobility_mit,
low_flex_scenario,
]
)

with TaskGroup(group_id="validation_report") as validation_report_group:
# Generate validation report from all validation tasks
# Runs after all validations (including final_validations) are complete
validation_report = ValidationReport(
dependencies=[
final_validations, # Wait for final validations
]
)

with TaskGroup(group_id="sanity_checks") as sanity_checks_group:
# ########## Keep this dataset at the end
# Sanity Checks
Expand Down
33 changes: 33 additions & 0 deletions src/egon/data/datasets/DSM_cts_ind.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@
sources,
)

from egon_validation import (
ArrayCardinalityValidation
)

# CONSTANTS
# TODO: move to datasets.yml
CON = db.engine()
Expand Down Expand Up @@ -142,6 +146,35 @@ def __init__(self, dependencies):
version=self.version,
dependencies=dependencies,
tasks=(dsm_cts_ind_processing,),
validation={
"data-quality": [
ArrayCardinalityValidation(
table="demand.egon_demandregio_sites_ind_electricity_dsm_timeseries",
rule_id="ARRAY_VALIDATION.egon_demandregio_sites_ind_electricity_dsm_timeseries",
array_column="p_set",
expected_length=8760,
),
ArrayCardinalityValidation(
table="demand.egon_etrago_electricity_cts_dsm_timeseries",
rule_id="ARRAY_VALIDATION.egon_etrago_electricity_cts_dsm_timeseries",
array_column="p_set",
expected_length=8760,
),
ArrayCardinalityValidation(
table="demand.egon_osm_ind_load_curves_individual_dsm_timeseries",
rule_id="ARRAY_VALIDATION.egon_osm_ind_load_curves_individual_dsm_timeseries",
array_column="p_set",
expected_length=8760,
),
ArrayCardinalityValidation(
table="demand.egon_sites_ind_load_curves_individual_dsm_timeseries",
rule_id="ARRAY_VALIDATION.egon_sites_ind_load_curves_individual_dsm_timeseries",
array_column="p_set",
expected_length=8760,
),
]
},
on_validation_failure="continue"
)


Expand Down
63 changes: 62 additions & 1 deletion src/egon/data/datasets/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from __future__ import annotations

from collections import abc
from dataclasses import dataclass
from dataclasses import dataclass, field
from functools import partial, reduce, update_wrapper
from typing import Callable, Iterable, Set, Tuple, Union
import re
Expand All @@ -12,9 +12,17 @@
from airflow.operators.python import PythonOperator
from sqlalchemy import Column, ForeignKey, Integer, String, Table, orm, tuple_
from sqlalchemy.ext.declarative import declarative_base
from typing import Dict, List
from egon.data.validation import create_validation_tasks

from egon.data import config, db, logger

try:
from egon_validation.rules.base import Rule
except ImportError:
Rule = None # Type hint only


Base = declarative_base()
SCHEMA = "metadata"

Expand Down Expand Up @@ -197,6 +205,8 @@ class Dataset:
#: The tasks of this :class:`Dataset`. A :class:`TaskGraph` will
#: automatically be converted to :class:`Tasks_`.
tasks: Tasks = ()
validation: Dict[str, List] = field(default_factory=dict)
on_validation_failure: str = "continue"

def check_version(self, after_execution=()):
scenario_names = config.settings()["egon-data"]["--scenarios"]
Expand Down Expand Up @@ -264,6 +274,27 @@ def __post_init__(self):
self.dependencies = list(self.dependencies)
if not isinstance(self.tasks, Tasks_):
self.tasks = Tasks_(self.tasks)
# Process validation configuration
if self.validation:
validation_tasks = create_validation_tasks(
validation_dict=self.validation,
dataset_name=self.name,
on_failure=self.on_validation_failure
)

# Append validation tasks to existing tasks
if validation_tasks:
if hasattr(self.tasks, 'graph'):
graph = self.tasks.graph
else:
graph = self.tasks
if isinstance(graph, (tuple, set, list)):
task_list = list(graph)
else:
task_list = [graph]
task_list.extend(validation_tasks)
self.tasks = Tasks_(tuple(task_list))

if len(self.tasks.last) > 1:
# Explicitly create single final task, because we can't know
# which of the multiple tasks finishes last.
Expand Down Expand Up @@ -302,3 +333,33 @@ def __post_init__(self):
for p in predecessors:
for first in self.tasks.first:
p.set_downstream(first)

# Link validation tasks to run after data tasks
if self.validation and validation_tasks:
# Get last non-validation tasks
non_validation_task_ids = [
task.task_id for task in self.tasks.values()
if not any(
task.task_id.endswith(f".validate.{name}")
for name in self.validation.keys()
)
]

last_data_tasks = [
task for task in self.tasks.values()
if task.task_id in non_validation_task_ids
and task in self.tasks.last
]

if not last_data_tasks:
# Fallback to last non-validation task
last_data_tasks = [
task for task in self.tasks.values()
if task.task_id in non_validation_task_ids
][-1:]

# Link each validation task downstream of last data tasks
for validation_task in validation_tasks:
for last_task in last_data_tasks:
last_task.set_downstream(validation_task)

87 changes: 87 additions & 0 deletions src/egon/data/datasets/chp/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,15 @@
sources,
)

from egon_validation import (
RowCountValidation,
DataTypeValidation,
NotNullAndNotNaNValidation,
WholeTableNotNullAndNotNaNValidation,
ValueSetValidation,
SRIDUniqueNonZero
)

Base = declarative_base()


Expand Down Expand Up @@ -853,4 +862,82 @@ def __init__(self, dependencies):
version=self.version,
dependencies=dependencies,
tasks=tasks,
validation={
"data-quality": [
RowCountValidation(
table="supply.egon_chp_plants",
rule_id="ROW_COUNT.egon_chp_plants",
expected_count={
"Schleswig-Holstein": 1720,
"Everything": 40197
}
),
DataTypeValidation(
table="supply.egon_chp_plants",
rule_id="DATA_TYPES.egon_chp_plants",
column_types={
"id": "integer",
"sources": "jsonb",
"source_id": "jsonb",
"carrier": "character varying",
"district_heating": "boolean",
"el_capacity": "double precision",
"th_capacity": "double precision",
"electrical_bus_id": "integer",
"district_heating_area_id": "integer",
"ch4_bus_id": "integer",
"voltage_level": "integer",
"scenario": "character varying",
"geom": "geometry"
}
),
NotNullAndNotNaNValidation(
table="supply.egon_chp_plants",
rule_id="NOT_NAN.egon_chp_plants",
columns=[
"id",
"sources",
"source_id",
"carrier",
"district_heating",
"el_capacity",
"th_capacity",
"electrical_bus_id",
"district_heating_area_id",
"ch4_bus_id",
"voltage_level",
"scenario",
"geom"
]
),
WholeTableNotNullAndNotNaNValidation(
table="supply.egon_chp_plants",
rule_id="TABLE_NOT_NAN.egon_chp_plants"
),
ValueSetValidation(
table="supply.egon_chp_plants",
rule_id="VALUE_SET_VALIDATION_CARRIER.egon_chp_plants",
column="carrier",
expected_values=[
"oil",
"others",
"gas",
"gas extended",
"biomass"
]
),
ValueSetValidation(
table="supply.egon_chp_plants",
rule_id="VALUE_SET_VALIDATION_SCENARIO.egon_chp_plants",
column="scenario",
expected_values=["eGon2035", "eGon100RE"]
),
SRIDUniqueNonZero(
table="supply.egon_chp_plants",
rule_id="SRIDUniqueNonZero.egon_chp_plants",
column="geom"
)
]
},
on_validation_failure="continue"
)
69 changes: 69 additions & 0 deletions src/egon/data/datasets/demandregio/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,13 @@
)
import egon.data.config
import egon.data.datasets.scenario_parameters.parameters as scenario_parameters
from egon_validation import (
RowCountValidation,
DataTypeValidation,
WholeTableNotNullAndNotNaNValidation,
ValueSetValidation,
ArrayCardinalityValidation
)

try:
from disaggregator import config, data, spatial, temporal
Expand Down Expand Up @@ -87,6 +94,68 @@ def __init__(self, dependencies):
insert_cts_ind_demands,
},
),
validation={
"data_quality": [
RowCountValidation(
table=" demand.egon_demandregio_hh",
rule_id="ROW_COUNT.egon_demandregio_hh",
expected_count={
"Schleswig-Holstein": 180,
"everything": 7218
}
),
DataTypeValidation(
table="demand.egon_demandregio_hh",
rule_id="DATA_MULTIPLE_TYPES.egon_demandregio_hh",
column_types={"nuts3": "character varying",
"hh_size": "integer",
"scenario": "character varying",
"year": "integer",
"demand": "double precision"
}
),
WholeTableNotNullAndNotNaNValidation(
table="demand.egon_demandregio_hh",
rule_id="WHOLE_TABLE_NOT_NAN.egon_demandregio_hh"
),
ValueSetValidation(
table="demand.egon_demandregio_hh",
rule_id="VALUE_SET_VALIDATION_SCENARIO.egon_demandregio_hh",
column="scenario",
expected_values=["eGon2035", "eGon100RE", "eGon2021"]
),
RowCountValidation(
table=" demand.egon_demandregio_wz",
rule_id="ROW_COUNT.egon_demandregio_wz",
expected_count=87
),
DataTypeValidation(
table="demand.egon_demandregio_wz",
rule_id="DATA_MULTIPLE_TYPES.egon_demandregio_wz",
column_types={"wz": "integer",
"sector": "character varying",
"definition": "character varying"
}
),
WholeTableNotNullAndNotNaNValidation(
table="demand.egon_demandregio_wz",
rule_id="WHOLE_TABLE_NOT_NAN.egon_demandregio_wz"
),
ValueSetValidation(
table="demand.egon_demandregio_wz",
rule_id="VALUE_SET_VALIDATION_SECTOR.egon_demandregio_wz",
column="sector",
expected_values=["industry", "CTS"]
),
ArrayCardinalityValidation(
table="demand.egon_demandregio_sites_ind_electricity_dsm_timeseries",
rule_id="ARRAY_VALIDATION.egon_demandregio_sites_ind_electricity_dsm_timeseries",
array_column="load_curve",
expected_length=8760,
)
]
},
on_validation_failure="continue"
)


Expand Down
Loading