From 5b1aff997c79d5a12dfcea26708b72971b1372b3 Mon Sep 17 00:00:00 2001 From: stevenhsd <56357022+stevenhsd@users.noreply.github.com> Date: Mon, 12 Jan 2026 22:37:04 +0000 Subject: [PATCH 1/2] fix: Amend relation to python dictionaries approach as using polars (via arrow) is leading to dates to be transformed to datetimes --- .../components/field_error_type.schema.json | 21 +++++++ pyproject.toml | 1 + .../implementations/duckdb/duckdb_helpers.py | 13 +++++ .../backends/implementations/duckdb/rules.py | 39 +++++++------ tests/features/movies.feature | 4 +- .../test_duckdb/test_duckdb_helpers.py | 58 ++++++++++++++++--- tests/testdata/movies/movies.dischema.json | 2 +- tests/testdata/movies/movies.json | 2 +- 8 files changed, 111 insertions(+), 29 deletions(-) create mode 100644 docs/json_schemas/contract/components/field_error_type.schema.json diff --git a/docs/json_schemas/contract/components/field_error_type.schema.json b/docs/json_schemas/contract/components/field_error_type.schema.json new file mode 100644 index 0000000..694948a --- /dev/null +++ b/docs/json_schemas/contract/components/field_error_type.schema.json @@ -0,0 +1,21 @@ +{ + "$schema": "https://json-schema.org/draft-07/schema", + "$id": "data-ingest:contract/components/field_error_type.schema.json", + "title": "field_error_detail", + "description": "The error type for a field when a validation error is raised during the data contract phase", + "type": "object", + "properties": { + "error_type": { + "description": "The type of error the details are for", + "type": "string", + "enum": [ + "Blank", + "Bad value", + "Wrong format" + ], + "additionalProperties": { + "$ref": "field_error_detail.schema.json" + } + } + } +} \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index 228b0d8..42e0b22 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -52,6 +52,7 @@ include-groups = [ [tool.poetry.group.dev.dependencies] commitizen = "4.9.1" pre-commit = "4.3.0" +ipykernel = "^7.1.0" [tool.poetry.group.test] optional = true diff --git a/src/dve/core_engine/backends/implementations/duckdb/duckdb_helpers.py b/src/dve/core_engine/backends/implementations/duckdb/duckdb_helpers.py index a261f7b..d9dc1b6 100644 --- a/src/dve/core_engine/backends/implementations/duckdb/duckdb_helpers.py +++ b/src/dve/core_engine/backends/implementations/duckdb/duckdb_helpers.py @@ -273,3 +273,16 @@ def get_all_registered_udfs(connection: DuckDBPyConnection) -> set[str]: """ connection.sql("CREATE TEMP TABLE IF NOT EXISTS dve_udfs (function_name VARCHAR)") return {rw[0] for rw in connection.sql("SELECT * FROM dve_udfs").fetchall()} + + +def duckdb_rel_to_dictionaries( + entity: DuckDBPyRelation, batch_size=1000 +) -> Iterator[list[dict[str, Any]]]: + """Iterator converting DuckDBPyRelation to lists of dictionaries. + Avoids issues where dates are getting converted to datetimes using polars as intermediate.""" + # TODO - look into float conversion - floats that can't be stored exactly in binary + # TODO - are given to nearest approximation. Tried Decimal, causes issues in arrays + # TODO - with templating (as in complex fields, repr used when str called in jinja templating). + cols: tuple[str] = tuple(entity.columns) # type: ignore + while rows := entity.fetchmany(batch_size): + yield [dict(zip(cols, rw)) for rw in rows] diff --git a/src/dve/core_engine/backends/implementations/duckdb/rules.py b/src/dve/core_engine/backends/implementations/duckdb/rules.py index b14700d..430f2dd 100644 --- a/src/dve/core_engine/backends/implementations/duckdb/rules.py +++ b/src/dve/core_engine/backends/implementations/duckdb/rules.py @@ -23,6 +23,7 @@ from dve.core_engine.backends.implementations.duckdb.duckdb_helpers import ( DDBStruct, duckdb_read_parquet, + duckdb_rel_to_dictionaries, duckdb_write_parquet, get_all_registered_udfs, get_duckdb_type_from_annotation, @@ -511,23 +512,25 @@ def notify(self, entities: DuckDBEntities, *, config: Notification) -> Messages: if config.excluded_columns: matched = matched.select(StarExpression(exclude=config.excluded_columns)) - for record in matched.df().to_dict(orient="records"): - # NOTE: only templates using values directly accessible in record - nothing nested - # more complex extraction done in reporting module - messages.append( - FeedbackMessage( - entity=config.reporting.reporting_entity_override or config.entity_name, - original_entity=config.entity_name, - record=record, # type: ignore - error_location=config.reporting.legacy_location, - error_message=template_object(config.reporting.message, record), # type: ignore - failure_type=config.reporting.legacy_error_type, - error_type=config.reporting.legacy_error_type, - error_code=config.reporting.code, - reporting_field=config.reporting.legacy_reporting_field, - reporting_field_name=config.reporting.reporting_field_override, - is_informational=config.reporting.emit in ("warning", "info"), - category=config.reporting.category, + for chunk in duckdb_rel_to_dictionaries(matched): + for record in chunk: + # NOTE: only templates using values directly accessible in record - nothing nested + # more complex extraction done in reporting module + messages.append( + FeedbackMessage( + entity=config.reporting.reporting_entity_override or config.entity_name, + original_entity=config.entity_name, + record=record, # type: ignore + error_location=config.reporting.legacy_location, + error_message=template_object(config.reporting.message, + record), # type: ignore + failure_type=config.reporting.legacy_error_type, + error_type=config.reporting.legacy_error_type, + error_code=config.reporting.code, + reporting_field=config.reporting.legacy_reporting_field, + reporting_field_name=config.reporting.reporting_field_override, + is_informational=config.reporting.emit in ("warning", "info"), + category=config.reporting.category, + ) ) - ) return messages diff --git a/tests/features/movies.feature b/tests/features/movies.feature index d1dbca4..d737574 100644 --- a/tests/features/movies.feature +++ b/tests/features/movies.feature @@ -31,7 +31,7 @@ Feature: Pipeline tests using the movies dataset Then The rules restrict "movies" to 4 qualifying records And there are errors with the following details and associated error_count from the business_rules phase | ErrorCode | ErrorMessage | error_count | - | LIMITED_RATINGS | Movie has too few ratings ([6.1]) | 1 | + | LIMITED_RATINGS | Movie has too few ratings ([6.5]) | 1 | | RUBBISH_SEQUEL | The movie The Greatest Movie Ever has a rubbish sequel | 1 | And the latest audit record for the submission is marked with processing status error_report When I run the error report phase @@ -67,7 +67,7 @@ Feature: Pipeline tests using the movies dataset Then The rules restrict "movies" to 4 qualifying records And there are errors with the following details and associated error_count from the business_rules phase | ErrorCode | ErrorMessage | error_count | - | LIMITED_RATINGS | Movie has too few ratings ([6.1]) | 1 | + | LIMITED_RATINGS | Movie has too few ratings ([6.5]) | 1 | | RUBBISH_SEQUEL | The movie The Greatest Movie Ever has a rubbish sequel | 1 | And the latest audit record for the submission is marked with processing status error_report When I run the error report phase diff --git a/tests/test_core_engine/test_backends/test_implementations/test_duckdb/test_duckdb_helpers.py b/tests/test_core_engine/test_backends/test_implementations/test_duckdb/test_duckdb_helpers.py index 76d5d07..5b68de4 100644 --- a/tests/test_core_engine/test_backends/test_implementations/test_duckdb/test_duckdb_helpers.py +++ b/tests/test_core_engine/test_backends/test_implementations/test_duckdb/test_duckdb_helpers.py @@ -1,13 +1,18 @@ """Test Duck DB helpers""" + +import datetime import tempfile from pathlib import Path +from typing import Any import pytest import pyspark.sql.types as pst from duckdb import DuckDBPyRelation, DuckDBPyConnection from pyspark.sql import Row, SparkSession -from dve.core_engine.backends.implementations.duckdb.duckdb_helpers import _ddb_read_parquet +from dve.core_engine.backends.implementations.duckdb.duckdb_helpers import ( + _ddb_read_parquet, + duckdb_rel_to_dictionaries) class TempConnection: @@ -15,6 +20,7 @@ class TempConnection: Full object would be a DataContract object but this simplified down to meet min requirements of the test. """ + def __init__(self, connection: DuckDBPyConnection) -> None: self._connection = connection @@ -25,7 +31,7 @@ def __init__(self, connection: DuckDBPyConnection) -> None: ("movie_ratings"), ("movie_ratings/"), ("file://movie_ratings/"), - ] + ], ) def test__ddb_read_parquet_with_hive_format( spark: SparkSession, temp_ddb_conn: DuckDBPyConnection, outpath: str @@ -38,11 +44,13 @@ def test__ddb_read_parquet_with_hive_format( Row(movie_name="Hot Fuzz", avg_user_rating=7.7, avg_critic_rating=6.5), Row(movie_name="Nemo", avg_user_rating=8.8, avg_critic_rating=7.6), ], - pst.StructType([ - pst.StructField("movie_name", pst.StringType()), - pst.StructField("avg_user_rating", pst.FloatType()), - pst.StructField("avg_critic_rating", pst.FloatType()), - ]) + pst.StructType( + [ + pst.StructField("movie_name", pst.StringType()), + pst.StructField("avg_user_rating", pst.FloatType()), + pst.StructField("avg_critic_rating", pst.FloatType()), + ] + ), ) out_path = str(Path(temp_dir_path, outpath)) test_data_df.coalesce(1).write.parquet(out_path) @@ -51,3 +59,39 @@ def test__ddb_read_parquet_with_hive_format( assert isinstance(ddby_relation, DuckDBPyRelation) assert ddby_relation.count("*").fetchone()[0] == 2 # type: ignore + + +@pytest.mark.parametrize( + "data", + ( + + [ + { + "str_field": "hi", + "int_field": 5, + "array_float_field": [6.5, 7.25], + "date_field": datetime.date(2021, 5, 3), + "timestamp_field": datetime.datetime(2022, 6, 7, 1, 2, 3), + }, + { + "str_field": "bye", + "int_field": 3, + "array_float_field": None, + "date_field": datetime.date(2021, 8, 11), + "timestamp_field": datetime.datetime(2022, 4, 3, 1, 2, 3), + }, + ], + + ), +) +def test_duckdb_rel_to_dictionaries(temp_ddb_conn: DuckDBPyConnection, + data: list[dict[str, Any]]): + _, con = temp_ddb_conn + test_rel = con.query("select dta.* from (select unnest($data) as dta)", + params={"data": data}) + res: list = [] + for chunk in duckdb_rel_to_dictionaries(test_rel): + res.extend(chunk) + + assert res == data + diff --git a/tests/testdata/movies/movies.dischema.json b/tests/testdata/movies/movies.dischema.json index aa55882..781f4ac 100644 --- a/tests/testdata/movies/movies.dischema.json +++ b/tests/testdata/movies/movies.dischema.json @@ -21,7 +21,7 @@ }, "duration_minutes": "int", "ratings": { - "type": "NonNegativeFloat", + "type": "float", "is_array": true }, "cast": { diff --git a/tests/testdata/movies/movies.json b/tests/testdata/movies/movies.json index 87f1149..afa606d 100644 --- a/tests/testdata/movies/movies.json +++ b/tests/testdata/movies/movies.json @@ -36,7 +36,7 @@ "year": 2020, "genre": ["Fantasy", "Family"], "duration_minutes": 110, - "ratings": [6.1], + "ratings": [6.5], "cast": [ { "name": "R. Williams", "role": "Cat", "date_joined": "2016-05-06" }, { "name": "T. Brown", "role": "Dog", "date_joined": "2016-05-07" } From 7ea9a693d07f440d2e91efab35dcb4df2e1db4f1 Mon Sep 17 00:00:00 2001 From: stevenhsd <56357022+stevenhsd@users.noreply.github.com> Date: Tue, 13 Jan 2026 11:32:26 +0000 Subject: [PATCH 2/2] fix: small refactors following code review --- pyproject.toml | 1 - .../implementations/duckdb/duckdb_helpers.py | 6 +-- .../backends/implementations/duckdb/rules.py | 38 +++++++++---------- .../test_duckdb/test_duckdb_helpers.py | 4 +- tests/testdata/movies/movies.dischema.json | 2 +- 5 files changed, 24 insertions(+), 27 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 42e0b22..228b0d8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -52,7 +52,6 @@ include-groups = [ [tool.poetry.group.dev.dependencies] commitizen = "4.9.1" pre-commit = "4.3.0" -ipykernel = "^7.1.0" [tool.poetry.group.test] optional = true diff --git a/src/dve/core_engine/backends/implementations/duckdb/duckdb_helpers.py b/src/dve/core_engine/backends/implementations/duckdb/duckdb_helpers.py index d9dc1b6..843ee40 100644 --- a/src/dve/core_engine/backends/implementations/duckdb/duckdb_helpers.py +++ b/src/dve/core_engine/backends/implementations/duckdb/duckdb_helpers.py @@ -277,12 +277,12 @@ def get_all_registered_udfs(connection: DuckDBPyConnection) -> set[str]: def duckdb_rel_to_dictionaries( entity: DuckDBPyRelation, batch_size=1000 -) -> Iterator[list[dict[str, Any]]]: +) -> Iterator[dict[str, Any]]: """Iterator converting DuckDBPyRelation to lists of dictionaries. Avoids issues where dates are getting converted to datetimes using polars as intermediate.""" # TODO - look into float conversion - floats that can't be stored exactly in binary # TODO - are given to nearest approximation. Tried Decimal, causes issues in arrays # TODO - with templating (as in complex fields, repr used when str called in jinja templating). - cols: tuple[str] = tuple(entity.columns) # type: ignore + cols: tuple[str] = tuple(entity.columns) # type: ignore while rows := entity.fetchmany(batch_size): - yield [dict(zip(cols, rw)) for rw in rows] + yield from (dict(zip(cols, rw)) for rw in rows) diff --git a/src/dve/core_engine/backends/implementations/duckdb/rules.py b/src/dve/core_engine/backends/implementations/duckdb/rules.py index 430f2dd..e556c6b 100644 --- a/src/dve/core_engine/backends/implementations/duckdb/rules.py +++ b/src/dve/core_engine/backends/implementations/duckdb/rules.py @@ -512,25 +512,23 @@ def notify(self, entities: DuckDBEntities, *, config: Notification) -> Messages: if config.excluded_columns: matched = matched.select(StarExpression(exclude=config.excluded_columns)) - for chunk in duckdb_rel_to_dictionaries(matched): - for record in chunk: - # NOTE: only templates using values directly accessible in record - nothing nested - # more complex extraction done in reporting module - messages.append( - FeedbackMessage( - entity=config.reporting.reporting_entity_override or config.entity_name, - original_entity=config.entity_name, - record=record, # type: ignore - error_location=config.reporting.legacy_location, - error_message=template_object(config.reporting.message, - record), # type: ignore - failure_type=config.reporting.legacy_error_type, - error_type=config.reporting.legacy_error_type, - error_code=config.reporting.code, - reporting_field=config.reporting.legacy_reporting_field, - reporting_field_name=config.reporting.reporting_field_override, - is_informational=config.reporting.emit in ("warning", "info"), - category=config.reporting.category, - ) + for record in duckdb_rel_to_dictionaries(matched): + # NOTE: only templates using values directly accessible in record - nothing nested + # more complex extraction done in reporting module + messages.append( + FeedbackMessage( + entity=config.reporting.reporting_entity_override or config.entity_name, + original_entity=config.entity_name, + record=record, # type: ignore + error_location=config.reporting.legacy_location, + error_message=template_object(config.reporting.message, record), # type: ignore + failure_type=config.reporting.legacy_error_type, + error_type=config.reporting.legacy_error_type, + error_code=config.reporting.code, + reporting_field=config.reporting.legacy_reporting_field, + reporting_field_name=config.reporting.reporting_field_override, + is_informational=config.reporting.emit in ("warning", "info"), + category=config.reporting.category, ) + ) return messages diff --git a/tests/test_core_engine/test_backends/test_implementations/test_duckdb/test_duckdb_helpers.py b/tests/test_core_engine/test_backends/test_implementations/test_duckdb/test_duckdb_helpers.py index 5b68de4..5c39e36 100644 --- a/tests/test_core_engine/test_backends/test_implementations/test_duckdb/test_duckdb_helpers.py +++ b/tests/test_core_engine/test_backends/test_implementations/test_duckdb/test_duckdb_helpers.py @@ -90,8 +90,8 @@ def test_duckdb_rel_to_dictionaries(temp_ddb_conn: DuckDBPyConnection, test_rel = con.query("select dta.* from (select unnest($data) as dta)", params={"data": data}) res: list = [] - for chunk in duckdb_rel_to_dictionaries(test_rel): - res.extend(chunk) + for chunk in duckdb_rel_to_dictionaries(test_rel, 1): + res.append(chunk) assert res == data diff --git a/tests/testdata/movies/movies.dischema.json b/tests/testdata/movies/movies.dischema.json index 781f4ac..aa55882 100644 --- a/tests/testdata/movies/movies.dischema.json +++ b/tests/testdata/movies/movies.dischema.json @@ -21,7 +21,7 @@ }, "duration_minutes": "int", "ratings": { - "type": "float", + "type": "NonNegativeFloat", "is_array": true }, "cast": {