Skip to content

Commit 5b1aff9

Browse files
committed
fix: Amend relation to python dictionaries approach as using polars (via arrow) is leading to dates to be transformed to datetimes
1 parent cb728ca commit 5b1aff9

File tree

8 files changed

+111
-29
lines changed

8 files changed

+111
-29
lines changed
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
{
2+
"$schema": "https://json-schema.org/draft-07/schema",
3+
"$id": "data-ingest:contract/components/field_error_type.schema.json",
4+
"title": "field_error_detail",
5+
"description": "The error type for a field when a validation error is raised during the data contract phase",
6+
"type": "object",
7+
"properties": {
8+
"error_type": {
9+
"description": "The type of error the details are for",
10+
"type": "string",
11+
"enum": [
12+
"Blank",
13+
"Bad value",
14+
"Wrong format"
15+
],
16+
"additionalProperties": {
17+
"$ref": "field_error_detail.schema.json"
18+
}
19+
}
20+
}
21+
}

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ include-groups = [
5252
[tool.poetry.group.dev.dependencies]
5353
commitizen = "4.9.1"
5454
pre-commit = "4.3.0"
55+
ipykernel = "^7.1.0"
5556

5657
[tool.poetry.group.test]
5758
optional = true

src/dve/core_engine/backends/implementations/duckdb/duckdb_helpers.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -273,3 +273,16 @@ def get_all_registered_udfs(connection: DuckDBPyConnection) -> set[str]:
273273
"""
274274
connection.sql("CREATE TEMP TABLE IF NOT EXISTS dve_udfs (function_name VARCHAR)")
275275
return {rw[0] for rw in connection.sql("SELECT * FROM dve_udfs").fetchall()}
276+
277+
278+
def duckdb_rel_to_dictionaries(
279+
entity: DuckDBPyRelation, batch_size=1000
280+
) -> Iterator[list[dict[str, Any]]]:
281+
"""Iterator converting DuckDBPyRelation to lists of dictionaries.
282+
Avoids issues where dates are getting converted to datetimes using polars as intermediate."""
283+
# TODO - look into float conversion - floats that can't be stored exactly in binary
284+
# TODO - are given to nearest approximation. Tried Decimal, causes issues in arrays
285+
# TODO - with templating (as in complex fields, repr used when str called in jinja templating).
286+
cols: tuple[str] = tuple(entity.columns) # type: ignore
287+
while rows := entity.fetchmany(batch_size):
288+
yield [dict(zip(cols, rw)) for rw in rows]

src/dve/core_engine/backends/implementations/duckdb/rules.py

Lines changed: 21 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
from dve.core_engine.backends.implementations.duckdb.duckdb_helpers import (
2424
DDBStruct,
2525
duckdb_read_parquet,
26+
duckdb_rel_to_dictionaries,
2627
duckdb_write_parquet,
2728
get_all_registered_udfs,
2829
get_duckdb_type_from_annotation,
@@ -511,23 +512,25 @@ def notify(self, entities: DuckDBEntities, *, config: Notification) -> Messages:
511512
if config.excluded_columns:
512513
matched = matched.select(StarExpression(exclude=config.excluded_columns))
513514

514-
for record in matched.df().to_dict(orient="records"):
515-
# NOTE: only templates using values directly accessible in record - nothing nested
516-
# more complex extraction done in reporting module
517-
messages.append(
518-
FeedbackMessage(
519-
entity=config.reporting.reporting_entity_override or config.entity_name,
520-
original_entity=config.entity_name,
521-
record=record, # type: ignore
522-
error_location=config.reporting.legacy_location,
523-
error_message=template_object(config.reporting.message, record), # type: ignore
524-
failure_type=config.reporting.legacy_error_type,
525-
error_type=config.reporting.legacy_error_type,
526-
error_code=config.reporting.code,
527-
reporting_field=config.reporting.legacy_reporting_field,
528-
reporting_field_name=config.reporting.reporting_field_override,
529-
is_informational=config.reporting.emit in ("warning", "info"),
530-
category=config.reporting.category,
515+
for chunk in duckdb_rel_to_dictionaries(matched):
516+
for record in chunk:
517+
# NOTE: only templates using values directly accessible in record - nothing nested
518+
# more complex extraction done in reporting module
519+
messages.append(
520+
FeedbackMessage(
521+
entity=config.reporting.reporting_entity_override or config.entity_name,
522+
original_entity=config.entity_name,
523+
record=record, # type: ignore
524+
error_location=config.reporting.legacy_location,
525+
error_message=template_object(config.reporting.message,
526+
record), # type: ignore
527+
failure_type=config.reporting.legacy_error_type,
528+
error_type=config.reporting.legacy_error_type,
529+
error_code=config.reporting.code,
530+
reporting_field=config.reporting.legacy_reporting_field,
531+
reporting_field_name=config.reporting.reporting_field_override,
532+
is_informational=config.reporting.emit in ("warning", "info"),
533+
category=config.reporting.category,
534+
)
531535
)
532-
)
533536
return messages

tests/features/movies.feature

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ Feature: Pipeline tests using the movies dataset
3131
Then The rules restrict "movies" to 4 qualifying records
3232
And there are errors with the following details and associated error_count from the business_rules phase
3333
| ErrorCode | ErrorMessage | error_count |
34-
| LIMITED_RATINGS | Movie has too few ratings ([6.1]) | 1 |
34+
| LIMITED_RATINGS | Movie has too few ratings ([6.5]) | 1 |
3535
| RUBBISH_SEQUEL | The movie The Greatest Movie Ever has a rubbish sequel | 1 |
3636
And the latest audit record for the submission is marked with processing status error_report
3737
When I run the error report phase
@@ -67,7 +67,7 @@ Feature: Pipeline tests using the movies dataset
6767
Then The rules restrict "movies" to 4 qualifying records
6868
And there are errors with the following details and associated error_count from the business_rules phase
6969
| ErrorCode | ErrorMessage | error_count |
70-
| LIMITED_RATINGS | Movie has too few ratings ([6.1]) | 1 |
70+
| LIMITED_RATINGS | Movie has too few ratings ([6.5]) | 1 |
7171
| RUBBISH_SEQUEL | The movie The Greatest Movie Ever has a rubbish sequel | 1 |
7272
And the latest audit record for the submission is marked with processing status error_report
7373
When I run the error report phase

tests/test_core_engine/test_backends/test_implementations/test_duckdb/test_duckdb_helpers.py

Lines changed: 51 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,26 @@
11
"""Test Duck DB helpers"""
2+
3+
import datetime
24
import tempfile
35
from pathlib import Path
6+
from typing import Any
47

58
import pytest
69
import pyspark.sql.types as pst
710
from duckdb import DuckDBPyRelation, DuckDBPyConnection
811
from pyspark.sql import Row, SparkSession
912

10-
from dve.core_engine.backends.implementations.duckdb.duckdb_helpers import _ddb_read_parquet
13+
from dve.core_engine.backends.implementations.duckdb.duckdb_helpers import (
14+
_ddb_read_parquet,
15+
duckdb_rel_to_dictionaries)
1116

1217

1318
class TempConnection:
1419
"""
1520
Full object would be a DataContract object but this simplified down to meet min requirements
1621
of the test.
1722
"""
23+
1824
def __init__(self, connection: DuckDBPyConnection) -> None:
1925
self._connection = connection
2026

@@ -25,7 +31,7 @@ def __init__(self, connection: DuckDBPyConnection) -> None:
2531
("movie_ratings"),
2632
("movie_ratings/"),
2733
("file://movie_ratings/"),
28-
]
34+
],
2935
)
3036
def test__ddb_read_parquet_with_hive_format(
3137
spark: SparkSession, temp_ddb_conn: DuckDBPyConnection, outpath: str
@@ -38,11 +44,13 @@ def test__ddb_read_parquet_with_hive_format(
3844
Row(movie_name="Hot Fuzz", avg_user_rating=7.7, avg_critic_rating=6.5),
3945
Row(movie_name="Nemo", avg_user_rating=8.8, avg_critic_rating=7.6),
4046
],
41-
pst.StructType([
42-
pst.StructField("movie_name", pst.StringType()),
43-
pst.StructField("avg_user_rating", pst.FloatType()),
44-
pst.StructField("avg_critic_rating", pst.FloatType()),
45-
])
47+
pst.StructType(
48+
[
49+
pst.StructField("movie_name", pst.StringType()),
50+
pst.StructField("avg_user_rating", pst.FloatType()),
51+
pst.StructField("avg_critic_rating", pst.FloatType()),
52+
]
53+
),
4654
)
4755
out_path = str(Path(temp_dir_path, outpath))
4856
test_data_df.coalesce(1).write.parquet(out_path)
@@ -51,3 +59,39 @@ def test__ddb_read_parquet_with_hive_format(
5159

5260
assert isinstance(ddby_relation, DuckDBPyRelation)
5361
assert ddby_relation.count("*").fetchone()[0] == 2 # type: ignore
62+
63+
64+
@pytest.mark.parametrize(
65+
"data",
66+
(
67+
68+
[
69+
{
70+
"str_field": "hi",
71+
"int_field": 5,
72+
"array_float_field": [6.5, 7.25],
73+
"date_field": datetime.date(2021, 5, 3),
74+
"timestamp_field": datetime.datetime(2022, 6, 7, 1, 2, 3),
75+
},
76+
{
77+
"str_field": "bye",
78+
"int_field": 3,
79+
"array_float_field": None,
80+
"date_field": datetime.date(2021, 8, 11),
81+
"timestamp_field": datetime.datetime(2022, 4, 3, 1, 2, 3),
82+
},
83+
],
84+
85+
),
86+
)
87+
def test_duckdb_rel_to_dictionaries(temp_ddb_conn: DuckDBPyConnection,
88+
data: list[dict[str, Any]]):
89+
_, con = temp_ddb_conn
90+
test_rel = con.query("select dta.* from (select unnest($data) as dta)",
91+
params={"data": data})
92+
res: list = []
93+
for chunk in duckdb_rel_to_dictionaries(test_rel):
94+
res.extend(chunk)
95+
96+
assert res == data
97+

tests/testdata/movies/movies.dischema.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
},
2222
"duration_minutes": "int",
2323
"ratings": {
24-
"type": "NonNegativeFloat",
24+
"type": "float",
2525
"is_array": true
2626
},
2727
"cast": {

tests/testdata/movies/movies.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
"year": 2020,
3737
"genre": ["Fantasy", "Family"],
3838
"duration_minutes": 110,
39-
"ratings": [6.1],
39+
"ratings": [6.5],
4040
"cast": [
4141
{ "name": "R. Williams", "role": "Cat", "date_joined": "2016-05-06" },
4242
{ "name": "T. Brown", "role": "Dog", "date_joined": "2016-05-07" }

0 commit comments

Comments
 (0)