From 03b65ea030d6da12155ee0f273229985a9e7453e Mon Sep 17 00:00:00 2001 From: georgeRobertson <50412379+georgeRobertson@users.noreply.github.com> Date: Thu, 18 Dec 2025 12:35:21 +0000 Subject: [PATCH 1/8] refactor: rename the column headers for elements of the error report --- src/dve/reporting/excel_report.py | 7 +++++++ .../test_error_reporting/test_excel_report.py | 10 +++++----- tests/test_pipeline/test_spark_pipeline.py | 20 +++++++++---------- 3 files changed, 22 insertions(+), 15 deletions(-) diff --git a/src/dve/reporting/excel_report.py b/src/dve/reporting/excel_report.py index 727b244..200feed 100644 --- a/src/dve/reporting/excel_report.py +++ b/src/dve/reporting/excel_report.py @@ -443,6 +443,13 @@ def _text_length(value): @staticmethod def _format_headings(headings: list[str]) -> list[str]: + # TODO - ideally this would be config driven to allow customisation. + _renames = { + "Table": "Grouping", + "Data Item": "Data Item Submission Name", + "Error": "Errors and Warnings", + } headings = [heading.title() if heading[0].islower() else heading for heading in headings] headings = [heading.replace("_", " ") for heading in headings] + headings = [_renames.get(heading, heading) for heading in headings] return headings diff --git a/tests/test_error_reporting/test_excel_report.py b/tests/test_error_reporting/test_excel_report.py index 834a5e4..e29e45e 100644 --- a/tests/test_error_reporting/test_excel_report.py +++ b/tests/test_error_reporting/test_excel_report.py @@ -116,8 +116,8 @@ def test_excel_report(report_dfs): column_headings = [cell.value for cell in aggs["1"]] assert column_headings == [ "Type", - "Table", - "Data Item", + "Grouping", + "Data Item Submission Name", "Category", "Error Code", "Count", @@ -126,11 +126,11 @@ def test_excel_report(report_dfs): details = workbook["Error Data"] column_headings = [cell.value for cell in details["1"]] assert column_headings == [ - "Table", + "Grouping", "Type", "Error Code", - "Data Item", - "Error", + "Data Item Submission Name", + "Errors and Warnings", "Value", "ID", "Category", diff --git a/tests/test_pipeline/test_spark_pipeline.py b/tests/test_pipeline/test_spark_pipeline.py index 4e33a28..a0b8d19 100644 --- a/tests/test_pipeline/test_spark_pipeline.py +++ b/tests/test_pipeline/test_spark_pipeline.py @@ -469,8 +469,8 @@ def test_error_report_where_report_is_expected( # pylint: disable=redefined-out OrderedDict( **{ "Type": "Submission Failure", - "Table": "planets", - "Data Item": "orbitalPeriod", + "Grouping": "planets", + "Data Item Submission Name": "orbitalPeriod", "Category": "Bad value", "Error Code": "LONG_ORBIT", "Count": 1, @@ -479,8 +479,8 @@ def test_error_report_where_report_is_expected( # pylint: disable=redefined-out OrderedDict( **{ "Type": "Submission Failure", - "Table": "planets", - "Data Item": "gravity", + "Grouping": "planets", + "Data Item Submission Name": "gravity", "Category": "Bad value", "Error Code": "STRONG_GRAVITY", "Count": 1, @@ -497,11 +497,11 @@ def test_error_report_where_report_is_expected( # pylint: disable=redefined-out assert error_data_records == [ OrderedDict( **{ - "Table": "planets", + "Grouping": "planets", "Type": "Submission Failure", "Error Code": "LONG_ORBIT", - "Data Item": "orbitalPeriod", - "Error": "Planet has long orbital period", + "Data Item Submission Name": "orbitalPeriod", + "Errors and Warnings": "Planet has long orbital period", "Value": 365.20001220703125, "ID": None, "Category": "Bad value", @@ -509,11 +509,11 @@ def test_error_report_where_report_is_expected( # pylint: disable=redefined-out ), OrderedDict( **{ - "Table": "planets", + "Grouping": "planets", "Type": "Submission Failure", "Error Code": "STRONG_GRAVITY", - "Data Item": "gravity", - "Error": "Planet has too strong gravity", + "Data Item Submission Name": "gravity", + "Errors and Warnings": "Planet has too strong gravity", "Value": 9.800000190734863, "ID": None, "Category": "Bad value", From cbd4cfaa66c35c72f3c75d52179ba27ea75ac637 Mon Sep 17 00:00:00 2001 From: georgeRobertson <50412379+georgeRobertson@users.noreply.github.com> Date: Thu, 18 Dec 2025 18:53:27 +0000 Subject: [PATCH 2/8] refactor: rename "Grouping" to "Group" --- src/dve/reporting/excel_report.py | 2 +- tests/test_error_reporting/test_excel_report.py | 4 ++-- tests/test_pipeline/test_spark_pipeline.py | 8 ++++---- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/src/dve/reporting/excel_report.py b/src/dve/reporting/excel_report.py index 200feed..4dc3fee 100644 --- a/src/dve/reporting/excel_report.py +++ b/src/dve/reporting/excel_report.py @@ -445,7 +445,7 @@ def _text_length(value): def _format_headings(headings: list[str]) -> list[str]: # TODO - ideally this would be config driven to allow customisation. _renames = { - "Table": "Grouping", + "Table": "Group", "Data Item": "Data Item Submission Name", "Error": "Errors and Warnings", } diff --git a/tests/test_error_reporting/test_excel_report.py b/tests/test_error_reporting/test_excel_report.py index e29e45e..17c0dfd 100644 --- a/tests/test_error_reporting/test_excel_report.py +++ b/tests/test_error_reporting/test_excel_report.py @@ -116,7 +116,7 @@ def test_excel_report(report_dfs): column_headings = [cell.value for cell in aggs["1"]] assert column_headings == [ "Type", - "Grouping", + "Group", "Data Item Submission Name", "Category", "Error Code", @@ -126,7 +126,7 @@ def test_excel_report(report_dfs): details = workbook["Error Data"] column_headings = [cell.value for cell in details["1"]] assert column_headings == [ - "Grouping", + "Group", "Type", "Error Code", "Data Item Submission Name", diff --git a/tests/test_pipeline/test_spark_pipeline.py b/tests/test_pipeline/test_spark_pipeline.py index a0b8d19..7f4738f 100644 --- a/tests/test_pipeline/test_spark_pipeline.py +++ b/tests/test_pipeline/test_spark_pipeline.py @@ -469,7 +469,7 @@ def test_error_report_where_report_is_expected( # pylint: disable=redefined-out OrderedDict( **{ "Type": "Submission Failure", - "Grouping": "planets", + "Group": "planets", "Data Item Submission Name": "orbitalPeriod", "Category": "Bad value", "Error Code": "LONG_ORBIT", @@ -479,7 +479,7 @@ def test_error_report_where_report_is_expected( # pylint: disable=redefined-out OrderedDict( **{ "Type": "Submission Failure", - "Grouping": "planets", + "Group": "planets", "Data Item Submission Name": "gravity", "Category": "Bad value", "Error Code": "STRONG_GRAVITY", @@ -497,7 +497,7 @@ def test_error_report_where_report_is_expected( # pylint: disable=redefined-out assert error_data_records == [ OrderedDict( **{ - "Grouping": "planets", + "Group": "planets", "Type": "Submission Failure", "Error Code": "LONG_ORBIT", "Data Item Submission Name": "orbitalPeriod", @@ -509,7 +509,7 @@ def test_error_report_where_report_is_expected( # pylint: disable=redefined-out ), OrderedDict( **{ - "Grouping": "planets", + "Group": "planets", "Type": "Submission Failure", "Error Code": "STRONG_GRAVITY", "Data Item Submission Name": "gravity", From 304b704e9354985209be82edab1cfdbb869dfcc7 Mon Sep 17 00:00:00 2001 From: George Robertson <50412379+georgeRobertson@users.noreply.github.com> Date: Wed, 7 Jan 2026 15:01:42 +0000 Subject: [PATCH 3/8] fix: fix issue where reporting_entity resulted in key fields being removed from error reports (#23) --- .../core_engine/backends/implementations/duckdb/rules.py | 1 + .../core_engine/backends/implementations/spark/rules.py | 1 + src/dve/core_engine/message.py | 2 ++ src/dve/reporting/utils.py | 8 +++++++- 4 files changed, 11 insertions(+), 1 deletion(-) diff --git a/src/dve/core_engine/backends/implementations/duckdb/rules.py b/src/dve/core_engine/backends/implementations/duckdb/rules.py index dbc308e..b14700d 100644 --- a/src/dve/core_engine/backends/implementations/duckdb/rules.py +++ b/src/dve/core_engine/backends/implementations/duckdb/rules.py @@ -517,6 +517,7 @@ def notify(self, entities: DuckDBEntities, *, config: Notification) -> Messages: messages.append( FeedbackMessage( entity=config.reporting.reporting_entity_override or config.entity_name, + original_entity=config.entity_name, record=record, # type: ignore error_location=config.reporting.legacy_location, error_message=template_object(config.reporting.message, record), # type: ignore diff --git a/src/dve/core_engine/backends/implementations/spark/rules.py b/src/dve/core_engine/backends/implementations/spark/rules.py index 93baae9..15afa09 100644 --- a/src/dve/core_engine/backends/implementations/spark/rules.py +++ b/src/dve/core_engine/backends/implementations/spark/rules.py @@ -412,6 +412,7 @@ def notify(self, entities: SparkEntities, *, config: Notification) -> Messages: # more complex extraction done in reporting module FeedbackMessage( entity=config.reporting.reporting_entity_override or config.entity_name, + original_entity=config.entity_name, record=record.asDict(recursive=True), error_location=config.reporting.legacy_location, error_message=template_object( diff --git a/src/dve/core_engine/message.py b/src/dve/core_engine/message.py index 7dd4f02..029a7cd 100644 --- a/src/dve/core_engine/message.py +++ b/src/dve/core_engine/message.py @@ -105,6 +105,8 @@ class FeedbackMessage: # pylint: disable=too-many-instance-attributes still be completed (i.e. filters and joins can still be applied). """ + original_entity: Optional[EntityName] = None + """The original entity before any modifications to the name (if applicable).""" is_informational: bool = False """Whether the message is simply for information or has affected the outputs.""" error_type: Optional[str] = None diff --git a/src/dve/reporting/utils.py b/src/dve/reporting/utils.py index 3dac919..8832b6a 100644 --- a/src/dve/reporting/utils.py +++ b/src/dve/reporting/utils.py @@ -26,7 +26,13 @@ def dump_feedback_errors( processed = [] for message in messages: - primary_keys: list[str] = key_fields.get(message.entity if message.entity else "", []) + if message.original_entity is not None: + primary_keys = key_fields.get(message.original_entity, []) + elif message.entity is not None: + primary_keys = key_fields.get(message.entity, []) + else: + primary_keys = [] + error = message.to_dict( key_field=primary_keys, value_separator=" -- ", From cb728cae337876bab0661719bc262b08f9a79d13 Mon Sep 17 00:00:00 2001 From: stevenhsd <56357022+stevenhsd@users.noreply.github.com> Date: Thu, 8 Jan 2026 13:24:19 +0000 Subject: [PATCH 4/8] feat: added entity name override option in data contract error details to align with business rules --- .../components/field_error_detail.schema.json | 11 ++- src/dve/core_engine/message.py | 4 +- tests/features/movies.feature | 88 +++++++++---------- .../test_backends/fixtures.py | 6 +- .../test_duckdb/test_data_contract.py | 3 +- .../test_spark/test_data_contract.py | 1 + .../movies/movies_contract_error_details.json | 3 +- 7 files changed, 61 insertions(+), 55 deletions(-) diff --git a/docs/json_schemas/contract/components/field_error_detail.schema.json b/docs/json_schemas/contract/components/field_error_detail.schema.json index a0cb547..5f00ca9 100644 --- a/docs/json_schemas/contract/components/field_error_detail.schema.json +++ b/docs/json_schemas/contract/components/field_error_detail.schema.json @@ -11,12 +11,11 @@ }, "error_message": { "description": "The message to be used for the field and error type specified. This can include templating (specified using jinja2 conventions). During templating, the full record will be available with an additional __error_value to easily obtain nested offending values.", - "type": "string", - "enum": [ - "record_rejection", - "file_rejection", - "warning" - ] + "type": "string" + }, + "reporting_entity": { + "description": "The entity name to be given for grouping in error report. If left blank will default to the contract entity name", + "type": "string" } }, "required": [ diff --git a/src/dve/core_engine/message.py b/src/dve/core_engine/message.py index 029a7cd..dd580c6 100644 --- a/src/dve/core_engine/message.py +++ b/src/dve/core_engine/message.py @@ -30,6 +30,7 @@ class DataContractErrorDetail(BaseModel): error_code: str error_message: Optional[str] = None + reporting_entity: Optional[str] = None def template_message( self, @@ -232,7 +233,8 @@ def from_pydantic_error( messages.append( cls( - entity=entity, + entity=error_detail.reporting_entity or entity, + original_entity=entity, record=record, failure_type=failure_type, is_informational=is_informational, diff --git a/tests/features/movies.feature b/tests/features/movies.feature index b148547..d1dbca4 100644 --- a/tests/features/movies.feature +++ b/tests/features/movies.feature @@ -1,47 +1,47 @@ Feature: Pipeline tests using the movies dataset - Tests for the processing framework which use the movies dataset. + Tests for the processing framework which use the movies dataset. - This tests submissions in JSON format, with configuration in JSON config files. - Complex types are tested (arrays, nested structs) + This tests submissions in JSON format, with configuration in JSON config files. + Complex types are tested (arrays, nested structs) - Some validation of entity attributes is performed: SQL expressions and Python filter - functions are used, and templatable business rules feature in the transformations. + Some validation of entity attributes is performed: SQL expressions and Python filter + functions are used, and templatable business rules feature in the transformations. - Scenario: Validate and filter movies (spark) - Given I submit the movies file movies.json for processing - And A spark pipeline is configured - And I create the following reference data tables in the database movies_refdata - | table_name | parquet_path | - | sequels | tests/testdata/movies/refdata/movies_sequels.parquet | - And I add initial audit entries for the submission - Then the latest audit record for the submission is marked with processing status file_transformation - When I run the file transformation phase - Then the movies entity is stored as a parquet after the file_transformation phase - And the latest audit record for the submission is marked with processing status data_contract - When I run the data contract phase - Then there are 3 record rejections from the data_contract phase - And there are errors with the following details and associated error_count from the data_contract phase - | ErrorCode | ErrorMessage | error_count | - | BLANKYEAR | year not provided | 1 | - | DODGYYEAR | year value (NOT_A_NUMBER) is invalid | 1 | - | DODGYDATE | date_joined value is not valid: daft_date | 1 | - And the movies entity is stored as a parquet after the data_contract phase - And the latest audit record for the submission is marked with processing status business_rules - When I run the business rules phase - Then The rules restrict "movies" to 4 qualifying records - And there are errors with the following details and associated error_count from the business_rules phase - | ErrorCode | ErrorMessage | error_count | - | LIMITED_RATINGS | Movie has too few ratings ([6.1]) | 1 | - | RUBBISH_SEQUEL | The movie The Greatest Movie Ever has a rubbish sequel | 1 | - And the latest audit record for the submission is marked with processing status error_report - When I run the error report phase - Then An error report is produced - And The statistics entry for the submission shows the following information - | parameter | value | - | record_count | 5 | - | number_record_rejections | 4 | - | number_warnings | 1 | - And the error aggregates are persisted + Scenario: Validate and filter movies (spark) + Given I submit the movies file movies.json for processing + And A spark pipeline is configured + And I create the following reference data tables in the database movies_refdata + | table_name | parquet_path | + | sequels | tests/testdata/movies/refdata/movies_sequels.parquet | + And I add initial audit entries for the submission + Then the latest audit record for the submission is marked with processing status file_transformation + When I run the file transformation phase + Then the movies entity is stored as a parquet after the file_transformation phase + And the latest audit record for the submission is marked with processing status data_contract + When I run the data contract phase + Then there are 3 record rejections from the data_contract phase + And there are errors with the following details and associated error_count from the data_contract phase + | Entity | ErrorCode | ErrorMessage | error_count | + | movies | BLANKYEAR | year not provided | 1 | + | movies_rename_test | DODGYYEAR | year value (NOT_A_NUMBER) is invalid | 1 | + | movies | DODGYDATE | date_joined value is not valid: daft_date | 1 | + And the movies entity is stored as a parquet after the data_contract phase + And the latest audit record for the submission is marked with processing status business_rules + When I run the business rules phase + Then The rules restrict "movies" to 4 qualifying records + And there are errors with the following details and associated error_count from the business_rules phase + | ErrorCode | ErrorMessage | error_count | + | LIMITED_RATINGS | Movie has too few ratings ([6.1]) | 1 | + | RUBBISH_SEQUEL | The movie The Greatest Movie Ever has a rubbish sequel | 1 | + And the latest audit record for the submission is marked with processing status error_report + When I run the error report phase + Then An error report is produced + And The statistics entry for the submission shows the following information + | parameter | value | + | record_count | 5 | + | number_record_rejections | 4 | + | number_warnings | 1 | + And the error aggregates are persisted Scenario: Validate and filter movies (duckdb) Given I submit the movies file movies.json for processing @@ -57,10 +57,10 @@ Feature: Pipeline tests using the movies dataset When I run the data contract phase Then there are 3 record rejections from the data_contract phase And there are errors with the following details and associated error_count from the data_contract phase - | ErrorCode | ErrorMessage | error_count | - | BLANKYEAR | year not provided | 1 | - | DODGYYEAR | year value (NOT_A_NUMBER) is invalid | 1 | - | DODGYDATE | date_joined value is not valid: daft_date | 1 | + | Entity | ErrorCode | ErrorMessage | error_count | + | movies | BLANKYEAR | year not provided | 1 | + | movies_rename_test | DODGYYEAR | year value (NOT_A_NUMBER) is invalid | 1 | + | movies | DODGYDATE | date_joined value is not valid: daft_date | 1 | And the movies entity is stored as a parquet after the data_contract phase And the latest audit record for the submission is marked with processing status business_rules When I run the business rules phase diff --git a/tests/test_core_engine/test_backends/fixtures.py b/tests/test_core_engine/test_backends/fixtures.py index 14369b9..31c23d7 100644 --- a/tests/test_core_engine/test_backends/fixtures.py +++ b/tests/test_core_engine/test_backends/fixtures.py @@ -567,9 +567,11 @@ def nested_parquet_custom_dc_err_details(temp_dir): err_details = { "id": { "Blank": {"error_code": "TESTIDBLANK", - "error_message": "id cannot be null"}, + "error_message": "id cannot be null", + "reporting_entity": "test_rename"}, "Bad value": {"error_code": "TESTIDBAD", - "error_message": "id is invalid: id - {{id}}"} + "error_message": "id is invalid: id - {{id}}", + "reporting_entity": "test_rename"} }, "datetimefield": { "Bad value": {"error_code": "TESTDTFIELDBAD", diff --git a/tests/test_core_engine/test_backends/test_implementations/test_duckdb/test_data_contract.py b/tests/test_core_engine/test_backends/test_implementations/test_duckdb/test_data_contract.py index e4c08ad..61920c2 100644 --- a/tests/test_core_engine/test_backends/test_implementations/test_duckdb/test_data_contract.py +++ b/tests/test_core_engine/test_backends/test_implementations/test_duckdb/test_data_contract.py @@ -360,4 +360,5 @@ def test_duckdb_data_contract_custom_error_details(nested_all_string_parquet_w_e assert messages[0].error_code == "SUBFIELDTESTIDBAD" assert messages[0].error_message == "subfield id is invalid: subfield.id - WRONG" assert messages[1].error_code == "TESTIDBAD" - assert messages[1].error_message == "id is invalid: id - WRONG" \ No newline at end of file + assert messages[1].error_message == "id is invalid: id - WRONG" + assert messages[1].entity == "test_rename" \ No newline at end of file diff --git a/tests/test_core_engine/test_backends/test_implementations/test_spark/test_data_contract.py b/tests/test_core_engine/test_backends/test_implementations/test_spark/test_data_contract.py index fac6cdf..789ca1a 100644 --- a/tests/test_core_engine/test_backends/test_implementations/test_spark/test_data_contract.py +++ b/tests/test_core_engine/test_backends/test_implementations/test_spark/test_data_contract.py @@ -235,5 +235,6 @@ def test_spark_data_contract_custom_error_details(nested_all_string_parquet_w_er assert messages[0].error_message == "subfield id is invalid: subfield.id - WRONG" assert messages[1].error_code == "TESTIDBAD" assert messages[1].error_message == "id is invalid: id - WRONG" + assert messages[1].entity == "test_rename" \ No newline at end of file diff --git a/tests/testdata/movies/movies_contract_error_details.json b/tests/testdata/movies/movies_contract_error_details.json index 8c94c92..f8cd934 100644 --- a/tests/testdata/movies/movies_contract_error_details.json +++ b/tests/testdata/movies/movies_contract_error_details.json @@ -12,7 +12,8 @@ }, "Bad value": { "error_code": "DODGYYEAR", - "error_message": "year value ({{year}}) is invalid" + "error_message": "year value ({{year}}) is invalid", + "reporting_entity": "movies_rename_test" } }, "cast.date_joined": { From 0a49761174847df7dbd9a702673417cf46590de7 Mon Sep 17 00:00:00 2001 From: stevenhsd <56357022+stevenhsd@users.noreply.github.com> Date: Tue, 13 Jan 2026 13:38:18 +0000 Subject: [PATCH 5/8] =?UTF-8?q?fix:=20Amend=20relation=20to=20python=20dic?= =?UTF-8?q?tionaries=20approach=20as=20using=20polars=20(=E2=80=A6=20(#25)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix: Amend relation to python dictionaries approach as using polars (via arrow) is leading to dates to be transformed to datetimes --- .../components/field_error_type.schema.json | 21 +++++++ .../implementations/duckdb/duckdb_helpers.py | 13 +++++ .../backends/implementations/duckdb/rules.py | 3 +- tests/features/movies.feature | 4 +- .../test_duckdb/test_duckdb_helpers.py | 58 ++++++++++++++++--- tests/testdata/movies/movies.json | 2 +- 6 files changed, 90 insertions(+), 11 deletions(-) create mode 100644 docs/json_schemas/contract/components/field_error_type.schema.json diff --git a/docs/json_schemas/contract/components/field_error_type.schema.json b/docs/json_schemas/contract/components/field_error_type.schema.json new file mode 100644 index 0000000..694948a --- /dev/null +++ b/docs/json_schemas/contract/components/field_error_type.schema.json @@ -0,0 +1,21 @@ +{ + "$schema": "https://json-schema.org/draft-07/schema", + "$id": "data-ingest:contract/components/field_error_type.schema.json", + "title": "field_error_detail", + "description": "The error type for a field when a validation error is raised during the data contract phase", + "type": "object", + "properties": { + "error_type": { + "description": "The type of error the details are for", + "type": "string", + "enum": [ + "Blank", + "Bad value", + "Wrong format" + ], + "additionalProperties": { + "$ref": "field_error_detail.schema.json" + } + } + } +} \ No newline at end of file diff --git a/src/dve/core_engine/backends/implementations/duckdb/duckdb_helpers.py b/src/dve/core_engine/backends/implementations/duckdb/duckdb_helpers.py index a261f7b..843ee40 100644 --- a/src/dve/core_engine/backends/implementations/duckdb/duckdb_helpers.py +++ b/src/dve/core_engine/backends/implementations/duckdb/duckdb_helpers.py @@ -273,3 +273,16 @@ def get_all_registered_udfs(connection: DuckDBPyConnection) -> set[str]: """ connection.sql("CREATE TEMP TABLE IF NOT EXISTS dve_udfs (function_name VARCHAR)") return {rw[0] for rw in connection.sql("SELECT * FROM dve_udfs").fetchall()} + + +def duckdb_rel_to_dictionaries( + entity: DuckDBPyRelation, batch_size=1000 +) -> Iterator[dict[str, Any]]: + """Iterator converting DuckDBPyRelation to lists of dictionaries. + Avoids issues where dates are getting converted to datetimes using polars as intermediate.""" + # TODO - look into float conversion - floats that can't be stored exactly in binary + # TODO - are given to nearest approximation. Tried Decimal, causes issues in arrays + # TODO - with templating (as in complex fields, repr used when str called in jinja templating). + cols: tuple[str] = tuple(entity.columns) # type: ignore + while rows := entity.fetchmany(batch_size): + yield from (dict(zip(cols, rw)) for rw in rows) diff --git a/src/dve/core_engine/backends/implementations/duckdb/rules.py b/src/dve/core_engine/backends/implementations/duckdb/rules.py index b14700d..e556c6b 100644 --- a/src/dve/core_engine/backends/implementations/duckdb/rules.py +++ b/src/dve/core_engine/backends/implementations/duckdb/rules.py @@ -23,6 +23,7 @@ from dve.core_engine.backends.implementations.duckdb.duckdb_helpers import ( DDBStruct, duckdb_read_parquet, + duckdb_rel_to_dictionaries, duckdb_write_parquet, get_all_registered_udfs, get_duckdb_type_from_annotation, @@ -511,7 +512,7 @@ def notify(self, entities: DuckDBEntities, *, config: Notification) -> Messages: if config.excluded_columns: matched = matched.select(StarExpression(exclude=config.excluded_columns)) - for record in matched.df().to_dict(orient="records"): + for record in duckdb_rel_to_dictionaries(matched): # NOTE: only templates using values directly accessible in record - nothing nested # more complex extraction done in reporting module messages.append( diff --git a/tests/features/movies.feature b/tests/features/movies.feature index d1dbca4..d737574 100644 --- a/tests/features/movies.feature +++ b/tests/features/movies.feature @@ -31,7 +31,7 @@ Feature: Pipeline tests using the movies dataset Then The rules restrict "movies" to 4 qualifying records And there are errors with the following details and associated error_count from the business_rules phase | ErrorCode | ErrorMessage | error_count | - | LIMITED_RATINGS | Movie has too few ratings ([6.1]) | 1 | + | LIMITED_RATINGS | Movie has too few ratings ([6.5]) | 1 | | RUBBISH_SEQUEL | The movie The Greatest Movie Ever has a rubbish sequel | 1 | And the latest audit record for the submission is marked with processing status error_report When I run the error report phase @@ -67,7 +67,7 @@ Feature: Pipeline tests using the movies dataset Then The rules restrict "movies" to 4 qualifying records And there are errors with the following details and associated error_count from the business_rules phase | ErrorCode | ErrorMessage | error_count | - | LIMITED_RATINGS | Movie has too few ratings ([6.1]) | 1 | + | LIMITED_RATINGS | Movie has too few ratings ([6.5]) | 1 | | RUBBISH_SEQUEL | The movie The Greatest Movie Ever has a rubbish sequel | 1 | And the latest audit record for the submission is marked with processing status error_report When I run the error report phase diff --git a/tests/test_core_engine/test_backends/test_implementations/test_duckdb/test_duckdb_helpers.py b/tests/test_core_engine/test_backends/test_implementations/test_duckdb/test_duckdb_helpers.py index 76d5d07..5c39e36 100644 --- a/tests/test_core_engine/test_backends/test_implementations/test_duckdb/test_duckdb_helpers.py +++ b/tests/test_core_engine/test_backends/test_implementations/test_duckdb/test_duckdb_helpers.py @@ -1,13 +1,18 @@ """Test Duck DB helpers""" + +import datetime import tempfile from pathlib import Path +from typing import Any import pytest import pyspark.sql.types as pst from duckdb import DuckDBPyRelation, DuckDBPyConnection from pyspark.sql import Row, SparkSession -from dve.core_engine.backends.implementations.duckdb.duckdb_helpers import _ddb_read_parquet +from dve.core_engine.backends.implementations.duckdb.duckdb_helpers import ( + _ddb_read_parquet, + duckdb_rel_to_dictionaries) class TempConnection: @@ -15,6 +20,7 @@ class TempConnection: Full object would be a DataContract object but this simplified down to meet min requirements of the test. """ + def __init__(self, connection: DuckDBPyConnection) -> None: self._connection = connection @@ -25,7 +31,7 @@ def __init__(self, connection: DuckDBPyConnection) -> None: ("movie_ratings"), ("movie_ratings/"), ("file://movie_ratings/"), - ] + ], ) def test__ddb_read_parquet_with_hive_format( spark: SparkSession, temp_ddb_conn: DuckDBPyConnection, outpath: str @@ -38,11 +44,13 @@ def test__ddb_read_parquet_with_hive_format( Row(movie_name="Hot Fuzz", avg_user_rating=7.7, avg_critic_rating=6.5), Row(movie_name="Nemo", avg_user_rating=8.8, avg_critic_rating=7.6), ], - pst.StructType([ - pst.StructField("movie_name", pst.StringType()), - pst.StructField("avg_user_rating", pst.FloatType()), - pst.StructField("avg_critic_rating", pst.FloatType()), - ]) + pst.StructType( + [ + pst.StructField("movie_name", pst.StringType()), + pst.StructField("avg_user_rating", pst.FloatType()), + pst.StructField("avg_critic_rating", pst.FloatType()), + ] + ), ) out_path = str(Path(temp_dir_path, outpath)) test_data_df.coalesce(1).write.parquet(out_path) @@ -51,3 +59,39 @@ def test__ddb_read_parquet_with_hive_format( assert isinstance(ddby_relation, DuckDBPyRelation) assert ddby_relation.count("*").fetchone()[0] == 2 # type: ignore + + +@pytest.mark.parametrize( + "data", + ( + + [ + { + "str_field": "hi", + "int_field": 5, + "array_float_field": [6.5, 7.25], + "date_field": datetime.date(2021, 5, 3), + "timestamp_field": datetime.datetime(2022, 6, 7, 1, 2, 3), + }, + { + "str_field": "bye", + "int_field": 3, + "array_float_field": None, + "date_field": datetime.date(2021, 8, 11), + "timestamp_field": datetime.datetime(2022, 4, 3, 1, 2, 3), + }, + ], + + ), +) +def test_duckdb_rel_to_dictionaries(temp_ddb_conn: DuckDBPyConnection, + data: list[dict[str, Any]]): + _, con = temp_ddb_conn + test_rel = con.query("select dta.* from (select unnest($data) as dta)", + params={"data": data}) + res: list = [] + for chunk in duckdb_rel_to_dictionaries(test_rel, 1): + res.append(chunk) + + assert res == data + diff --git a/tests/testdata/movies/movies.json b/tests/testdata/movies/movies.json index 87f1149..afa606d 100644 --- a/tests/testdata/movies/movies.json +++ b/tests/testdata/movies/movies.json @@ -36,7 +36,7 @@ "year": 2020, "genre": ["Fantasy", "Family"], "duration_minutes": 110, - "ratings": [6.1], + "ratings": [6.5], "cast": [ { "name": "R. Williams", "role": "Cat", "date_joined": "2016-05-06" }, { "name": "T. Brown", "role": "Dog", "date_joined": "2016-05-07" } From c0553c39a90d99353a2eea8409b144e6042060b4 Mon Sep 17 00:00:00 2001 From: George Robertson <50412379+georgeRobertson@users.noreply.github.com> Date: Fri, 16 Jan 2026 12:23:50 +0000 Subject: [PATCH 6/8] build: upgrade urlib3 to resolve CVE-2026-21441 vuln (#27) --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 228b0d8..0335a5f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -39,7 +39,7 @@ requests = "2.32.4" # Mitigates security vuln in < 2.31.0 schedula = "1.2.19" sqlalchemy = "2.0.19" typing_extensions = "4.6.2" -urllib3 = "2.6.0" # Mitigates security vuln in < 2.5.0 +urllib3 = "2.6.3" # Mitigates security vuln in < 2.6.0 xmltodict = "0.13.0" [tool.poetry.group.dev] From 211232510ef6e1ece73162712ab6a259562115c4 Mon Sep 17 00:00:00 2001 From: stevenhsd <56357022+stevenhsd@users.noreply.github.com> Date: Fri, 16 Jan 2026 13:10:50 +0000 Subject: [PATCH 7/8] refactor: added reporting_period_start and end attribute to submission_info model (#28) * refactor: added reporting_period_start and end attribute to submission_info model --- src/dve/core_engine/models.py | 16 +++++++++------- tests/features/steps/steps_pipeline.py | 2 ++ .../test_duckdb/test_audit_ddb.py | 3 ++- .../test_spark/test_audit_spark.py | 6 ++++-- tests/test_core_engine/test_models.py | 3 ++- 5 files changed, 19 insertions(+), 11 deletions(-) diff --git a/src/dve/core_engine/models.py b/src/dve/core_engine/models.py index 2e6578f..75a14ed 100644 --- a/src/dve/core_engine/models.py +++ b/src/dve/core_engine/models.py @@ -51,16 +51,18 @@ class SubmissionInfo(AuditRecord): """The name of the submitted file.""" file_extension: str """The extension of the file received.""" - submission_method: str = None # type: ignore + submission_method: Optional[str] = None # type: ignore """The method that the file was submitted""" - submitting_org: str = None # type: ignore + submitting_org: Optional[str] = None # type: ignore """The organisation who submitted the file.""" - reporting_period: str = None # type: ignore - """The reporting period the submission relates to.""" - file_size: int = None # type: ignore + reporting_period_start: Optional[str] = None # type: ignore + """The start of the reporting period the submission relates to.""" + reporting_period_end: Optional[str] = None # type: ignore + """The end of the reporting period the submission relates to.""" + file_size: Optional[int] = None # type: ignore """The size (in bytes) of the file received.""" - datetime_received: dt.datetime = None # type: ignore - """The datetime the SEFT transfer finished.""" + datetime_received: Optional[dt.datetime] = None # type: ignore + """The datetime the file was received.""" @validator("file_name") def _ensure_metadata_extension_removed(cls, filename): # pylint: disable=no-self-argument diff --git a/tests/features/steps/steps_pipeline.py b/tests/features/steps/steps_pipeline.py index fb85cfb..f3d1f0f 100644 --- a/tests/features/steps/steps_pipeline.py +++ b/tests/features/steps/steps_pipeline.py @@ -225,6 +225,8 @@ def submit_file_for_processing(context: Context, dataset: str, file_name: str): "dataset_id": dataset, "file_name": file_name, "file_extension": Path(file_name).suffix, + "reporting_period_start": "2025-11-01 00:00:00", + "reporting_period_end": "2025-11-30 23:59:59" } ctxt.set_submission_info(context, SubmissionInfo(**sub_info)) # type: ignore # add processing location diff --git a/tests/test_core_engine/test_backends/test_implementations/test_duckdb/test_audit_ddb.py b/tests/test_core_engine/test_backends/test_implementations/test_duckdb/test_audit_ddb.py index b0598eb..6260965 100644 --- a/tests/test_core_engine/test_backends/test_implementations/test_duckdb/test_audit_ddb.py +++ b/tests/test_core_engine/test_backends/test_implementations/test_duckdb/test_audit_ddb.py @@ -40,7 +40,8 @@ def dve_metadata_file() -> Iterator[Path]: "file_extension": "xml", "file_size": 123456789, "submitting_org": "TEST", - "reporting_period": "FY2023-24_TEST", + "reporting_period_start": "FY2023-24_START_TEST", + "reporting_period_end": "FY2023-24_END_TEST", "dataset_id": "TEST_DATASET", "datetime_received": "2023-10-03T10:53:36.1231998Z" }""" diff --git a/tests/test_core_engine/test_backends/test_implementations/test_spark/test_audit_spark.py b/tests/test_core_engine/test_backends/test_implementations/test_spark/test_audit_spark.py index a5146f9..4b328df 100644 --- a/tests/test_core_engine/test_backends/test_implementations/test_spark/test_audit_spark.py +++ b/tests/test_core_engine/test_backends/test_implementations/test_spark/test_audit_spark.py @@ -43,7 +43,8 @@ def dve_metadata_file() -> Iterator[Path]: "file_extension": "xml", "file_size": 123456789, "submitting_org": "TEST", - "reporting_period": "FY2023-24_TEST", + "reporting_period_start": "FY2023-24_START_TEST", + "reporting_period_end": "FY2023-24_END_TEST", "dataset_id": "TEST_DATASET", "datetime_received": "2023-10-03T10:53:36.1231998Z" }""" @@ -147,7 +148,8 @@ def test_submission_info_from_metadata_file(dve_metadata_file): submitting_org="TEST", file_name="TESTFILE_TEST_20230323T084600", file_extension="xml", - reporting_period="FY2023-24_TEST", + reporting_period_start="FY2023-24_START_TEST", + reporting_period_end="FY2023-24_END_TEST", file_size=123456789, datetime_received=datetime(2023, 10, 3, 10, 53, 36, 123199, tzinfo=timezone.utc), ) diff --git a/tests/test_core_engine/test_models.py b/tests/test_core_engine/test_models.py index ef85173..87c8f9d 100644 --- a/tests/test_core_engine/test_models.py +++ b/tests/test_core_engine/test_models.py @@ -73,7 +73,8 @@ def test_submission_info( # pylint: disable=missing-function-docstring "time_updated", "submission_method", "submitting_org", - "reporting_period", + "reporting_period_start", + "reporting_period_end", "file_size", "datetime_received", ] From 079891bb247db20aa8594bd47a58b65fefe335cc Mon Sep 17 00:00:00 2001 From: georgeRobertson <50412379+georgeRobertson@users.noreply.github.com> Date: Fri, 16 Jan 2026 14:13:12 +0000 Subject: [PATCH 8/8] =?UTF-8?q?bump:=20version=200.4.0=20=E2=86=92=200.5.0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- CHANGELOG.md | 17 +++++++++++++++++ pyproject.toml | 2 +- 2 files changed, 18 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1098297..9ebd629 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,20 @@ +## v0.5.0 (2026-01-16) + +### Feat + +- added entity name override option in data contract error details to align with business rules + +### Fix + +- Amend relation to python dictionaries approach as using polars (… (#25) +- fix issue where reporting_entity resulted in key fields being removed from error reports (#23) + +### Refactor + +- added reporting_period_start and end attribute to submission_info model (#28) +- rename "Grouping" to "Group" +- rename the column headers for elements of the error report + ## v0.4.0 (2025-12-17) ### Feat diff --git a/pyproject.toml b/pyproject.toml index 0335a5f..52263a9 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "nhs_dve" -version = "0.4.0" +version = "0.5.0" description = "`nhs data validation engine` is a framework used to validate data" authors = ["NHS England "] readme = "README.md"