Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,20 @@
## v0.5.0 (2026-01-16)

### Feat

- added entity name override option in data contract error details to align with business rules

### Fix

- Amend relation to python dictionaries approach as using polars (… (#25)
- fix issue where reporting_entity resulted in key fields being removed from error reports (#23)

### Refactor

- added reporting_period_start and end attribute to submission_info model (#28)
- rename "Grouping" to "Group"
- rename the column headers for elements of the error report

## v0.4.0 (2025-12-17)

### Feat
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,11 @@
},
"error_message": {
"description": "The message to be used for the field and error type specified. This can include templating (specified using jinja2 conventions). During templating, the full record will be available with an additional __error_value to easily obtain nested offending values.",
"type": "string",
"enum": [
"record_rejection",
"file_rejection",
"warning"
]
"type": "string"
},
"reporting_entity": {
"description": "The entity name to be given for grouping in error report. If left blank will default to the contract entity name",
"type": "string"
}
},
"required": [
Expand Down
21 changes: 21 additions & 0 deletions docs/json_schemas/contract/components/field_error_type.schema.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
{
"$schema": "https://json-schema.org/draft-07/schema",
"$id": "data-ingest:contract/components/field_error_type.schema.json",
"title": "field_error_detail",
"description": "The error type for a field when a validation error is raised during the data contract phase",
"type": "object",
"properties": {
"error_type": {
"description": "The type of error the details are for",
"type": "string",
"enum": [
"Blank",
"Bad value",
"Wrong format"
],
"additionalProperties": {
"$ref": "field_error_detail.schema.json"
}
}
}
}
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "nhs_dve"
version = "0.4.0"
version = "0.5.0"
description = "`nhs data validation engine` is a framework used to validate data"
authors = ["NHS England <england.contactus@nhs.net>"]
readme = "README.md"
Expand Down Expand Up @@ -39,7 +39,7 @@ requests = "2.32.4" # Mitigates security vuln in < 2.31.0
schedula = "1.2.19"
sqlalchemy = "2.0.19"
typing_extensions = "4.6.2"
urllib3 = "2.6.0" # Mitigates security vuln in < 2.5.0
urllib3 = "2.6.3" # Mitigates security vuln in < 2.6.0
xmltodict = "0.13.0"

[tool.poetry.group.dev]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,3 +273,16 @@ def get_all_registered_udfs(connection: DuckDBPyConnection) -> set[str]:
"""
connection.sql("CREATE TEMP TABLE IF NOT EXISTS dve_udfs (function_name VARCHAR)")
return {rw[0] for rw in connection.sql("SELECT * FROM dve_udfs").fetchall()}


def duckdb_rel_to_dictionaries(
entity: DuckDBPyRelation, batch_size=1000
) -> Iterator[dict[str, Any]]:
"""Iterator converting DuckDBPyRelation to lists of dictionaries.
Avoids issues where dates are getting converted to datetimes using polars as intermediate."""
# TODO - look into float conversion - floats that can't be stored exactly in binary
# TODO - are given to nearest approximation. Tried Decimal, causes issues in arrays
# TODO - with templating (as in complex fields, repr used when str called in jinja templating).
cols: tuple[str] = tuple(entity.columns) # type: ignore
while rows := entity.fetchmany(batch_size):
yield from (dict(zip(cols, rw)) for rw in rows)
4 changes: 3 additions & 1 deletion src/dve/core_engine/backends/implementations/duckdb/rules.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from dve.core_engine.backends.implementations.duckdb.duckdb_helpers import (
DDBStruct,
duckdb_read_parquet,
duckdb_rel_to_dictionaries,
duckdb_write_parquet,
get_all_registered_udfs,
get_duckdb_type_from_annotation,
Expand Down Expand Up @@ -511,12 +512,13 @@ def notify(self, entities: DuckDBEntities, *, config: Notification) -> Messages:
if config.excluded_columns:
matched = matched.select(StarExpression(exclude=config.excluded_columns))

for record in matched.df().to_dict(orient="records"):
for record in duckdb_rel_to_dictionaries(matched):
# NOTE: only templates using values directly accessible in record - nothing nested
# more complex extraction done in reporting module
messages.append(
FeedbackMessage(
entity=config.reporting.reporting_entity_override or config.entity_name,
original_entity=config.entity_name,
record=record, # type: ignore
error_location=config.reporting.legacy_location,
error_message=template_object(config.reporting.message, record), # type: ignore
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,7 @@ def notify(self, entities: SparkEntities, *, config: Notification) -> Messages:
# more complex extraction done in reporting module
FeedbackMessage(
entity=config.reporting.reporting_entity_override or config.entity_name,
original_entity=config.entity_name,
record=record.asDict(recursive=True),
error_location=config.reporting.legacy_location,
error_message=template_object(
Expand Down
6 changes: 5 additions & 1 deletion src/dve/core_engine/message.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ class DataContractErrorDetail(BaseModel):

error_code: str
error_message: Optional[str] = None
reporting_entity: Optional[str] = None

def template_message(
self,
Expand Down Expand Up @@ -105,6 +106,8 @@ class FeedbackMessage: # pylint: disable=too-many-instance-attributes
still be completed (i.e. filters and joins can still be applied).

"""
original_entity: Optional[EntityName] = None
"""The original entity before any modifications to the name (if applicable)."""
is_informational: bool = False
"""Whether the message is simply for information or has affected the outputs."""
error_type: Optional[str] = None
Expand Down Expand Up @@ -230,7 +233,8 @@ def from_pydantic_error(

messages.append(
cls(
entity=entity,
entity=error_detail.reporting_entity or entity,
original_entity=entity,
record=record,
failure_type=failure_type,
is_informational=is_informational,
Expand Down
16 changes: 9 additions & 7 deletions src/dve/core_engine/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,16 +51,18 @@ class SubmissionInfo(AuditRecord):
"""The name of the submitted file."""
file_extension: str
"""The extension of the file received."""
submission_method: str = None # type: ignore
submission_method: Optional[str] = None # type: ignore
"""The method that the file was submitted"""
submitting_org: str = None # type: ignore
submitting_org: Optional[str] = None # type: ignore
"""The organisation who submitted the file."""
reporting_period: str = None # type: ignore
"""The reporting period the submission relates to."""
file_size: int = None # type: ignore
reporting_period_start: Optional[str] = None # type: ignore
"""The start of the reporting period the submission relates to."""
reporting_period_end: Optional[str] = None # type: ignore
"""The end of the reporting period the submission relates to."""
file_size: Optional[int] = None # type: ignore
"""The size (in bytes) of the file received."""
datetime_received: dt.datetime = None # type: ignore
"""The datetime the SEFT transfer finished."""
datetime_received: Optional[dt.datetime] = None # type: ignore
"""The datetime the file was received."""

@validator("file_name")
def _ensure_metadata_extension_removed(cls, filename): # pylint: disable=no-self-argument
Expand Down
7 changes: 7 additions & 0 deletions src/dve/reporting/excel_report.py
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,13 @@ def _text_length(value):

@staticmethod
def _format_headings(headings: list[str]) -> list[str]:
# TODO - ideally this would be config driven to allow customisation.
_renames = {
"Table": "Group",
"Data Item": "Data Item Submission Name",
"Error": "Errors and Warnings",
}
headings = [heading.title() if heading[0].islower() else heading for heading in headings]
headings = [heading.replace("_", " ") for heading in headings]
headings = [_renames.get(heading, heading) for heading in headings]
return headings
8 changes: 7 additions & 1 deletion src/dve/reporting/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,13 @@ def dump_feedback_errors(
processed = []

for message in messages:
primary_keys: list[str] = key_fields.get(message.entity if message.entity else "", [])
if message.original_entity is not None:
primary_keys = key_fields.get(message.original_entity, [])
elif message.entity is not None:
primary_keys = key_fields.get(message.entity, [])
else:
primary_keys = []

error = message.to_dict(
key_field=primary_keys,
value_separator=" -- ",
Expand Down
90 changes: 45 additions & 45 deletions tests/features/movies.feature
Original file line number Diff line number Diff line change
@@ -1,47 +1,47 @@
Feature: Pipeline tests using the movies dataset
Tests for the processing framework which use the movies dataset.
Tests for the processing framework which use the movies dataset.

This tests submissions in JSON format, with configuration in JSON config files.
Complex types are tested (arrays, nested structs)
This tests submissions in JSON format, with configuration in JSON config files.
Complex types are tested (arrays, nested structs)

Some validation of entity attributes is performed: SQL expressions and Python filter
functions are used, and templatable business rules feature in the transformations.
Some validation of entity attributes is performed: SQL expressions and Python filter
functions are used, and templatable business rules feature in the transformations.

Scenario: Validate and filter movies (spark)
Given I submit the movies file movies.json for processing
And A spark pipeline is configured
And I create the following reference data tables in the database movies_refdata
| table_name | parquet_path |
| sequels | tests/testdata/movies/refdata/movies_sequels.parquet |
And I add initial audit entries for the submission
Then the latest audit record for the submission is marked with processing status file_transformation
When I run the file transformation phase
Then the movies entity is stored as a parquet after the file_transformation phase
And the latest audit record for the submission is marked with processing status data_contract
When I run the data contract phase
Then there are 3 record rejections from the data_contract phase
And there are errors with the following details and associated error_count from the data_contract phase
| ErrorCode | ErrorMessage | error_count |
| BLANKYEAR | year not provided | 1 |
| DODGYYEAR | year value (NOT_A_NUMBER) is invalid | 1 |
| DODGYDATE | date_joined value is not valid: daft_date | 1 |
And the movies entity is stored as a parquet after the data_contract phase
And the latest audit record for the submission is marked with processing status business_rules
When I run the business rules phase
Then The rules restrict "movies" to 4 qualifying records
And there are errors with the following details and associated error_count from the business_rules phase
| ErrorCode | ErrorMessage | error_count |
| LIMITED_RATINGS | Movie has too few ratings ([6.1]) | 1 |
| RUBBISH_SEQUEL | The movie The Greatest Movie Ever has a rubbish sequel | 1 |
And the latest audit record for the submission is marked with processing status error_report
When I run the error report phase
Then An error report is produced
And The statistics entry for the submission shows the following information
| parameter | value |
| record_count | 5 |
| number_record_rejections | 4 |
| number_warnings | 1 |
And the error aggregates are persisted
Scenario: Validate and filter movies (spark)
Given I submit the movies file movies.json for processing
And A spark pipeline is configured
And I create the following reference data tables in the database movies_refdata
| table_name | parquet_path |
| sequels | tests/testdata/movies/refdata/movies_sequels.parquet |
And I add initial audit entries for the submission
Then the latest audit record for the submission is marked with processing status file_transformation
When I run the file transformation phase
Then the movies entity is stored as a parquet after the file_transformation phase
And the latest audit record for the submission is marked with processing status data_contract
When I run the data contract phase
Then there are 3 record rejections from the data_contract phase
And there are errors with the following details and associated error_count from the data_contract phase
| Entity | ErrorCode | ErrorMessage | error_count |
| movies | BLANKYEAR | year not provided | 1 |
| movies_rename_test | DODGYYEAR | year value (NOT_A_NUMBER) is invalid | 1 |
| movies | DODGYDATE | date_joined value is not valid: daft_date | 1 |
And the movies entity is stored as a parquet after the data_contract phase
And the latest audit record for the submission is marked with processing status business_rules
When I run the business rules phase
Then The rules restrict "movies" to 4 qualifying records
And there are errors with the following details and associated error_count from the business_rules phase
| ErrorCode | ErrorMessage | error_count |
| LIMITED_RATINGS | Movie has too few ratings ([6.5]) | 1 |
| RUBBISH_SEQUEL | The movie The Greatest Movie Ever has a rubbish sequel | 1 |
And the latest audit record for the submission is marked with processing status error_report
When I run the error report phase
Then An error report is produced
And The statistics entry for the submission shows the following information
| parameter | value |
| record_count | 5 |
| number_record_rejections | 4 |
| number_warnings | 1 |
And the error aggregates are persisted

Scenario: Validate and filter movies (duckdb)
Given I submit the movies file movies.json for processing
Expand All @@ -57,17 +57,17 @@ Feature: Pipeline tests using the movies dataset
When I run the data contract phase
Then there are 3 record rejections from the data_contract phase
And there are errors with the following details and associated error_count from the data_contract phase
| ErrorCode | ErrorMessage | error_count |
| BLANKYEAR | year not provided | 1 |
| DODGYYEAR | year value (NOT_A_NUMBER) is invalid | 1 |
| DODGYDATE | date_joined value is not valid: daft_date | 1 |
| Entity | ErrorCode | ErrorMessage | error_count |
| movies | BLANKYEAR | year not provided | 1 |
| movies_rename_test | DODGYYEAR | year value (NOT_A_NUMBER) is invalid | 1 |
| movies | DODGYDATE | date_joined value is not valid: daft_date | 1 |
And the movies entity is stored as a parquet after the data_contract phase
And the latest audit record for the submission is marked with processing status business_rules
When I run the business rules phase
Then The rules restrict "movies" to 4 qualifying records
And there are errors with the following details and associated error_count from the business_rules phase
| ErrorCode | ErrorMessage | error_count |
| LIMITED_RATINGS | Movie has too few ratings ([6.1]) | 1 |
| LIMITED_RATINGS | Movie has too few ratings ([6.5]) | 1 |
| RUBBISH_SEQUEL | The movie The Greatest Movie Ever has a rubbish sequel | 1 |
And the latest audit record for the submission is marked with processing status error_report
When I run the error report phase
Expand Down
2 changes: 2 additions & 0 deletions tests/features/steps/steps_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,8 @@ def submit_file_for_processing(context: Context, dataset: str, file_name: str):
"dataset_id": dataset,
"file_name": file_name,
"file_extension": Path(file_name).suffix,
"reporting_period_start": "2025-11-01 00:00:00",
"reporting_period_end": "2025-11-30 23:59:59"
}
ctxt.set_submission_info(context, SubmissionInfo(**sub_info)) # type: ignore
# add processing location
Expand Down
6 changes: 4 additions & 2 deletions tests/test_core_engine/test_backends/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -567,9 +567,11 @@ def nested_parquet_custom_dc_err_details(temp_dir):
err_details = {
"id": {
"Blank": {"error_code": "TESTIDBLANK",
"error_message": "id cannot be null"},
"error_message": "id cannot be null",
"reporting_entity": "test_rename"},
"Bad value": {"error_code": "TESTIDBAD",
"error_message": "id is invalid: id - {{id}}"}
"error_message": "id is invalid: id - {{id}}",
"reporting_entity": "test_rename"}
},
"datetimefield": {
"Bad value": {"error_code": "TESTDTFIELDBAD",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ def dve_metadata_file() -> Iterator[Path]:
"file_extension": "xml",
"file_size": 123456789,
"submitting_org": "TEST",
"reporting_period": "FY2023-24_TEST",
"reporting_period_start": "FY2023-24_START_TEST",
"reporting_period_end": "FY2023-24_END_TEST",
"dataset_id": "TEST_DATASET",
"datetime_received": "2023-10-03T10:53:36.1231998Z"
}"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -360,4 +360,5 @@ def test_duckdb_data_contract_custom_error_details(nested_all_string_parquet_w_e
assert messages[0].error_code == "SUBFIELDTESTIDBAD"
assert messages[0].error_message == "subfield id is invalid: subfield.id - WRONG"
assert messages[1].error_code == "TESTIDBAD"
assert messages[1].error_message == "id is invalid: id - WRONG"
assert messages[1].error_message == "id is invalid: id - WRONG"
assert messages[1].entity == "test_rename"
Loading