Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions docs/json_schemas/contract/components/field_error_type.schema.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
{
"$schema": "https://json-schema.org/draft-07/schema",
"$id": "data-ingest:contract/components/field_error_type.schema.json",
"title": "field_error_detail",
"description": "The error type for a field when a validation error is raised during the data contract phase",
"type": "object",
"properties": {
"error_type": {
"description": "The type of error the details are for",
"type": "string",
"enum": [
"Blank",
"Bad value",
"Wrong format"
],
"additionalProperties": {
"$ref": "field_error_detail.schema.json"
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -273,3 +273,16 @@ def get_all_registered_udfs(connection: DuckDBPyConnection) -> set[str]:
"""
connection.sql("CREATE TEMP TABLE IF NOT EXISTS dve_udfs (function_name VARCHAR)")
return {rw[0] for rw in connection.sql("SELECT * FROM dve_udfs").fetchall()}


def duckdb_rel_to_dictionaries(
entity: DuckDBPyRelation, batch_size=1000
) -> Iterator[dict[str, Any]]:
"""Iterator converting DuckDBPyRelation to lists of dictionaries.
Avoids issues where dates are getting converted to datetimes using polars as intermediate."""
# TODO - look into float conversion - floats that can't be stored exactly in binary
# TODO - are given to nearest approximation. Tried Decimal, causes issues in arrays
# TODO - with templating (as in complex fields, repr used when str called in jinja templating).
cols: tuple[str] = tuple(entity.columns) # type: ignore
while rows := entity.fetchmany(batch_size):
yield from (dict(zip(cols, rw)) for rw in rows)
3 changes: 2 additions & 1 deletion src/dve/core_engine/backends/implementations/duckdb/rules.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from dve.core_engine.backends.implementations.duckdb.duckdb_helpers import (
DDBStruct,
duckdb_read_parquet,
duckdb_rel_to_dictionaries,
duckdb_write_parquet,
get_all_registered_udfs,
get_duckdb_type_from_annotation,
Expand Down Expand Up @@ -511,7 +512,7 @@ def notify(self, entities: DuckDBEntities, *, config: Notification) -> Messages:
if config.excluded_columns:
matched = matched.select(StarExpression(exclude=config.excluded_columns))

for record in matched.df().to_dict(orient="records"):
for record in duckdb_rel_to_dictionaries(matched):
# NOTE: only templates using values directly accessible in record - nothing nested
# more complex extraction done in reporting module
messages.append(
Expand Down
4 changes: 2 additions & 2 deletions tests/features/movies.feature
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ Feature: Pipeline tests using the movies dataset
Then The rules restrict "movies" to 4 qualifying records
And there are errors with the following details and associated error_count from the business_rules phase
| ErrorCode | ErrorMessage | error_count |
| LIMITED_RATINGS | Movie has too few ratings ([6.1]) | 1 |
| LIMITED_RATINGS | Movie has too few ratings ([6.5]) | 1 |
| RUBBISH_SEQUEL | The movie The Greatest Movie Ever has a rubbish sequel | 1 |
And the latest audit record for the submission is marked with processing status error_report
When I run the error report phase
Expand Down Expand Up @@ -67,7 +67,7 @@ Feature: Pipeline tests using the movies dataset
Then The rules restrict "movies" to 4 qualifying records
And there are errors with the following details and associated error_count from the business_rules phase
| ErrorCode | ErrorMessage | error_count |
| LIMITED_RATINGS | Movie has too few ratings ([6.1]) | 1 |
| LIMITED_RATINGS | Movie has too few ratings ([6.5]) | 1 |
| RUBBISH_SEQUEL | The movie The Greatest Movie Ever has a rubbish sequel | 1 |
And the latest audit record for the submission is marked with processing status error_report
When I run the error report phase
Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,26 @@
"""Test Duck DB helpers"""

import datetime
import tempfile
from pathlib import Path
from typing import Any

import pytest
import pyspark.sql.types as pst
from duckdb import DuckDBPyRelation, DuckDBPyConnection
from pyspark.sql import Row, SparkSession

from dve.core_engine.backends.implementations.duckdb.duckdb_helpers import _ddb_read_parquet
from dve.core_engine.backends.implementations.duckdb.duckdb_helpers import (
_ddb_read_parquet,
duckdb_rel_to_dictionaries)


class TempConnection:
"""
Full object would be a DataContract object but this simplified down to meet min requirements
of the test.
"""

def __init__(self, connection: DuckDBPyConnection) -> None:
self._connection = connection

Expand All @@ -25,7 +31,7 @@ def __init__(self, connection: DuckDBPyConnection) -> None:
("movie_ratings"),
("movie_ratings/"),
("file://movie_ratings/"),
]
],
)
def test__ddb_read_parquet_with_hive_format(
spark: SparkSession, temp_ddb_conn: DuckDBPyConnection, outpath: str
Expand All @@ -38,11 +44,13 @@ def test__ddb_read_parquet_with_hive_format(
Row(movie_name="Hot Fuzz", avg_user_rating=7.7, avg_critic_rating=6.5),
Row(movie_name="Nemo", avg_user_rating=8.8, avg_critic_rating=7.6),
],
pst.StructType([
pst.StructField("movie_name", pst.StringType()),
pst.StructField("avg_user_rating", pst.FloatType()),
pst.StructField("avg_critic_rating", pst.FloatType()),
])
pst.StructType(
[
pst.StructField("movie_name", pst.StringType()),
pst.StructField("avg_user_rating", pst.FloatType()),
pst.StructField("avg_critic_rating", pst.FloatType()),
]
),
)
out_path = str(Path(temp_dir_path, outpath))
test_data_df.coalesce(1).write.parquet(out_path)
Expand All @@ -51,3 +59,39 @@ def test__ddb_read_parquet_with_hive_format(

assert isinstance(ddby_relation, DuckDBPyRelation)
assert ddby_relation.count("*").fetchone()[0] == 2 # type: ignore


@pytest.mark.parametrize(
"data",
(

[
{
"str_field": "hi",
"int_field": 5,
"array_float_field": [6.5, 7.25],
"date_field": datetime.date(2021, 5, 3),
"timestamp_field": datetime.datetime(2022, 6, 7, 1, 2, 3),
},
{
"str_field": "bye",
"int_field": 3,
"array_float_field": None,
"date_field": datetime.date(2021, 8, 11),
"timestamp_field": datetime.datetime(2022, 4, 3, 1, 2, 3),
},
],

),
)
def test_duckdb_rel_to_dictionaries(temp_ddb_conn: DuckDBPyConnection,
data: list[dict[str, Any]]):
_, con = temp_ddb_conn
test_rel = con.query("select dta.* from (select unnest($data) as dta)",
params={"data": data})
res: list = []
for chunk in duckdb_rel_to_dictionaries(test_rel, 1):
res.append(chunk)

assert res == data

2 changes: 1 addition & 1 deletion tests/testdata/movies/movies.json
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
"year": 2020,
"genre": ["Fantasy", "Family"],
"duration_minutes": 110,
"ratings": [6.1],
"ratings": [6.5],
"cast": [
{ "name": "R. Williams", "role": "Cat", "date_joined": "2016-05-06" },
{ "name": "T. Brown", "role": "Dog", "date_joined": "2016-05-07" }
Expand Down