Skip to content

Commit 9d5e5c3

Browse files
refactor: display key fields in error reports as individual columns
1 parent cb728ca commit 9d5e5c3

File tree

6 files changed

+108
-7
lines changed

6 files changed

+108
-7
lines changed

src/dve/core_engine/backends/utilities.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,3 +176,8 @@ def get_polars_type_from_annotation(type_annotation: Any) -> PolarsType:
176176
if polars_type:
177177
return polars_type
178178
raise ValueError(f"No equivalent DuckDB type for {type_annotation!r}")
179+
180+
181+
def pl_row_count(df: pl.DataFrame) -> int: # type: ignore
182+
"""Return row count from a polars DataFrame object."""
183+
return df.select(pl.len()).to_dicts()[0]["len"] # type: ignore

src/dve/pipeline/pipeline.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434
from dve.parser.file_handling.service import _get_implementation
3535
from dve.pipeline.utils import SubmissionStatus, deadletter_file, load_config, load_reader
3636
from dve.reporting.error_report import ERROR_SCHEMA, calculate_aggregates
37-
from dve.reporting.utils import dump_feedback_errors, dump_processing_errors
37+
from dve.reporting.utils import dump_feedback_errors, dump_processing_errors, extract_and_pivot_keys
3838

3939
PERMISSIBLE_EXCEPTIONS: tuple[type[Exception]] = (
4040
FileNotFoundError, # type: ignore
@@ -718,6 +718,7 @@ def _get_error_dataframes(self, submission_id: str):
718718
.otherwise(pl.lit("Warning")) # type: ignore
719719
.alias("error_type")
720720
)
721+
df = extract_and_pivot_keys(df)
721722
df = df.select(
722723
pl.col("Entity").alias("Table"), # type: ignore
723724
pl.col("error_type").alias("Type"), # type: ignore
@@ -729,7 +730,7 @@ def _get_error_dataframes(self, submission_id: str):
729730
pl.col("Category"), # type: ignore
730731
)
731732
df = df.select(
732-
pl.col(column).cast(ERROR_SCHEMA[column]) # type: ignore
733+
pl.col(column).cast(ERROR_SCHEMA.get(column, pl.Utf8())) # type: ignore
733734
for column in df.columns
734735
)
735736
df = df.sort("Type", descending=False) # type: ignore

src/dve/reporting/utils.py

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,10 @@
33
import json
44
from typing import Optional
55

6+
import polars as pl
7+
68
import dve.parser.file_handling as fh
9+
from dve.core_engine.backends.utilities import pl_row_count
710
from dve.core_engine.exceptions import CriticalProcessingError
811
from dve.core_engine.type_hints import URI, Messages
912
from dve.reporting.error_report import conditional_cast
@@ -80,3 +83,56 @@ def dump_processing_errors(
8083
f,
8184
default=str,
8285
)
86+
87+
88+
def extract_and_pivot_keys(
89+
df: pl.DataFrame, key_field: str = "Key" # type: ignore
90+
) -> pl.DataFrame: # type: ignore
91+
"""
92+
Extract key pair values from a key fields column (str) and pivot the keys into new columns.
93+
94+
Where no keys exist for a given field, the unmodified dataframe will be returned and instances
95+
of a mixture of actual keys and non valid values (null, None & "") a new column will not be
96+
generated.
97+
98+
Args:
99+
df (pl.DataFrame): dataframe to manipulate
100+
key_field (str): name of column to extract key, value pairs from
101+
102+
Returns:
103+
pl.DataFrame: Polars DataFrame with pivoted keys
104+
"""
105+
original_columns = df.columns
106+
index_columns = [c for c in original_columns if c != key_field]
107+
108+
if pl_row_count(
109+
df.select(key_field)
110+
.filter(
111+
(pl.col(key_field).str.lengths() > 0) # type: ignore
112+
& ~(pl.col(key_field).eq("None")) # type: ignore
113+
)
114+
) == 0:
115+
return df
116+
117+
return (
118+
df
119+
.with_columns(pl.col(key_field).str.extract_all(r"(\w+): (\w+)")) # type: ignore
120+
.explode(key_field)
121+
.with_columns(
122+
pl.col(key_field).str.split_exact(":", 1) # type: ignore
123+
.struct.rename_fields(["pivot_key", "pivot_values"])
124+
.alias("ids")
125+
)
126+
.unnest("ids")
127+
.select(
128+
*[pl.col(c) for c in original_columns], # type: ignore
129+
(pl.col("pivot_key") + pl.lit("_Identifier")).alias("pivot_key"), # type: ignore
130+
(pl.col("pivot_values").str.strip(" ")).alias("pivot_values"), # type: ignore
131+
)
132+
.pivot(
133+
values="pivot_values",
134+
index=index_columns,
135+
columns="pivot_key"
136+
)
137+
.drop(["null"])
138+
)
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
"""Test objects in dve.reporting.utility"""
2+
# pylint: disable=missing-function-docstring
3+
4+
import polars as pl
5+
6+
from dve.core_engine.backends.utilities import pl_row_count
7+
from dve.reporting.utils import extract_and_pivot_keys
8+
9+
10+
def test_extract_and_pivot_keys():
11+
df = pl.DataFrame({
12+
"entity": ["test1", "test2", "test3", "test4"],
13+
"FailureType": ["submission1", "submission2", "submission3", "submission4"],
14+
"id": [
15+
"Key1: Value1 -- Key2: Value2 -- Key3: Value3",
16+
"Key1: Value1 -- Key2: Value2",
17+
"",
18+
None,
19+
]
20+
})
21+
result_df = extract_and_pivot_keys(df, key_field="id")
22+
expected_df = pl.DataFrame({
23+
"entity": ["test1", "test2", "test3", "test4"],
24+
"FailureType": ["submission1", "submission2", "submission3", "submission4"],
25+
"Key1_Identifier": ["Value1", "Value1", None, None],
26+
"Key2_Identifier": ["Value2", "Value2", None, None],
27+
"Key3_Identifier": ["Value3", None, None, None],
28+
})
29+
30+
assert pl_row_count(result_df) == pl_row_count(df)
31+
assert result_df.equals(expected_df)
32+
33+
34+
def test_extract_and_pivot_keys_with_empty_key_field():
35+
df = pl.DataFrame({
36+
"entity": ["test1", "test2", "test3"],
37+
"FailureType": ["submission1", "submission2", "submission3"],
38+
"Key": ["", "None", None]
39+
})
40+
result_df = extract_and_pivot_keys(df)
41+
42+
assert pl_row_count(result_df) == pl_row_count(df)
43+
assert result_df.equals(df)

tests/test_pipeline/pipeline_helpers.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -403,7 +403,3 @@ def error_data_after_business_rules() -> Iterator[Tuple[SubmissionInfo, str]]:
403403
json.dump(error_data, f)
404404

405405
yield submitted_file_info, tdir
406-
407-
408-
def pl_row_count(df: pl.DataFrame) -> int:
409-
return df.select(pl.len()).to_dicts()[0]["len"]

tests/test_pipeline/test_duckdb_pipeline.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
from duckdb import DuckDBPyConnection
1616

1717
from dve.core_engine.backends.base.auditing import FilterCriteria
18+
from dve.core_engine.backends.utilities import pl_row_count
1819
from dve.core_engine.backends.implementations.duckdb.auditing import DDBAuditingManager
1920
from dve.core_engine.backends.implementations.duckdb.reference_data import DuckDBRefDataLoader
2021
from dve.core_engine.models import ProcessingStatusRecord, SubmissionInfo, SubmissionStatisticsRecord
@@ -26,7 +27,6 @@
2627
from ..fixtures import temp_ddb_conn # pylint: disable=unused-import
2728
from .pipeline_helpers import ( # pylint: disable=unused-import
2829
PLANETS_RULES_PATH,
29-
pl_row_count,
3030
planet_data_after_file_transformation,
3131
planet_test_files,
3232
planets_data_after_business_rules,

0 commit comments

Comments
 (0)