Skip to content

Commit 5d4442f

Browse files
feature: more duckdb csv readers (#6)
* feat: add new duckdb csv readers * style: fix polars typing issue and white space in docstrings
1 parent 7d2f71f commit 5d4442f

File tree

5 files changed

+202
-14
lines changed

5 files changed

+202
-14
lines changed

src/dve/core_engine/backends/implementations/duckdb/__init__.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,20 @@
33
from dve.core_engine.backends.readers import register_reader
44

55
from .contract import DuckDBDataContract
6-
from .readers import DuckDBCSVReader, DuckDBXMLStreamReader
6+
from .readers import (
7+
DuckDBCSVReader,
8+
DuckDBCSVRepeatingHeaderReader,
9+
DuckDBXMLStreamReader,
10+
PolarsToDuckDBCSVReader
11+
)
712
from .reference_data import DuckDBRefDataLoader
813
from .rules import DuckDBStepImplementations
914

1015
register_reader(DuckDBCSVReader)
16+
register_reader(DuckDBCSVRepeatingHeaderReader)
1117
register_reader(DuckDBJSONReader)
1218
register_reader(DuckDBXMLStreamReader)
19+
register_reader(PolarsToDuckDBCSVReader)
1320

1421
__all__ = [
1522
"DuckDBDataContract",
Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
"""Readers for use with duckdb backend"""
22

3-
from .csv import DuckDBCSVReader
3+
from .csv import DuckDBCSVReader, DuckDBCSVRepeatingHeaderReader, PolarsToDuckDBCSVReader
44
from .json import DuckDBJSONReader
55
from .xml import DuckDBXMLStreamReader
66

77
__all__ = [
88
"DuckDBCSVReader",
9+
"DuckDBCSVRepeatingHeaderReader",
910
"DuckDBJSONReader",
1011
"DuckDBXMLStreamReader",
12+
"PolarsToDuckDBCSVReader",
1113
]

src/dve/core_engine/backends/implementations/duckdb/readers/csv.py

Lines changed: 107 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,24 @@
11
"""A csv reader to create duckdb relations"""
22

33
# pylint: disable=arguments-differ
4-
from typing import Any, Dict, Iterator, Type
4+
from typing import Any, Dict, Iterator, Optional, Type
55

6+
import duckdb as ddb
7+
import polars as pl
68
from duckdb import DuckDBPyConnection, DuckDBPyRelation, default_connection, read_csv
79
from pydantic import BaseModel
810

911
from dve.core_engine.backends.base.reader import BaseFileReader, read_function
12+
from dve.core_engine.backends.exceptions import EmptyFileError, MessageBearingError
1013
from dve.core_engine.backends.implementations.duckdb.duckdb_helpers import (
1114
duckdb_write_parquet,
1215
get_duckdb_type_from_annotation,
1316
)
1417
from dve.core_engine.backends.implementations.duckdb.types import SQLType
18+
from dve.core_engine.backends.utilities import get_polars_type_from_annotation
19+
from dve.core_engine.message import FeedbackMessage
1520
from dve.core_engine.type_hints import URI, EntityName
21+
from dve.parser.file_handling import get_content_length
1622

1723

1824
@duckdb_write_parquet
@@ -25,10 +31,12 @@ def __init__(
2531
self,
2632
header: bool = True,
2733
delim: str = ",",
28-
connection: DuckDBPyConnection = None,
34+
quotechar: str = '"',
35+
connection: Optional[DuckDBPyConnection] = None,
2936
):
3037
self.header = header
3138
self.delim = delim
39+
self.quotechar = quotechar
3240
self._connection = connection if connection else default_connection
3341

3442
super().__init__()
@@ -44,9 +52,13 @@ def read_to_relation( # pylint: disable=unused-argument
4452
self, resource: URI, entity_name: EntityName, schema: Type[BaseModel]
4553
) -> DuckDBPyRelation:
4654
"""Returns a relation object from the source csv"""
55+
if get_content_length(resource) == 0:
56+
raise EmptyFileError(f"File at {resource} is empty.")
57+
4758
reader_options: Dict[str, Any] = {
4859
"header": self.header,
4960
"delimiter": self.delim,
61+
"quotechar": self.quotechar,
5062
}
5163

5264
ddb_schema: Dict[str, SQLType] = {
@@ -56,3 +68,96 @@ def read_to_relation( # pylint: disable=unused-argument
5668

5769
reader_options["columns"] = ddb_schema
5870
return read_csv(resource, **reader_options)
71+
72+
73+
class PolarsToDuckDBCSVReader(DuckDBCSVReader):
74+
"""
75+
Utilises the polars lazy csv reader which is then converted into a DuckDBPyRelation object.
76+
77+
The primary reason this reader exists is due to the limitation within duckdb csv reader and
78+
it not being able to read partial content from a csv (i.e. select a, b NOT y).
79+
"""
80+
81+
@read_function(DuckDBPyRelation)
82+
def read_to_relation( # pylint: disable=unused-argument
83+
self, resource: URI, entity_name: EntityName, schema: Type[BaseModel]
84+
) -> DuckDBPyRelation:
85+
"""Returns a relation object from the source csv"""
86+
if get_content_length(resource) == 0:
87+
raise EmptyFileError(f"File at {resource} is empty.")
88+
89+
reader_options: Dict[str, Any] = {
90+
"has_header": self.header,
91+
"separator": self.delim,
92+
"quote_char": self.quotechar,
93+
}
94+
95+
polars_types = {
96+
fld.name: get_polars_type_from_annotation(fld.annotation) # type: ignore
97+
for fld in schema.__fields__.values()
98+
}
99+
reader_options["dtypes"] = polars_types
100+
101+
# there is a raise_if_empty arg for 0.18+. Future reference when upgrading. Makes L85
102+
# redundant
103+
df = pl.scan_csv(resource, **reader_options).select(list(polars_types.keys())) # type: ignore # pylint: disable=W0612
104+
105+
return ddb.sql("SELECT * FROM df")
106+
107+
108+
class DuckDBCSVRepeatingHeaderReader(PolarsToDuckDBCSVReader):
109+
"""A Reader for files with a `.csv` extension and where there are repeating "header" values
110+
within the file. Header in this case is not the column names at the top of a csv, rather a
111+
collection of unique records that would usually be structured in another entity. However, due
112+
to the fact that `csv` is a semi-structured data format, you cannot define complex entities,
113+
hence the values are then repeated on all rows.
114+
115+
Example of a repeating header data may look like this...
116+
117+
| headerCol1 | headerCol2 | headerCol3 | nonHeaderCol1 | nonHeaderCol2 |
118+
| ---------- | ---------- | ---------- | ------------- | ------------- |
119+
| shop 1 | clothes | 2025-01-01 | jeans | 20.39 |
120+
| shop 1 | clothes | 2025-01-01 | shirt | 14.99 |
121+
122+
This reader will just pull out the distinct values from the header column. Where there are
123+
more/less than one distinct value per column, the reader will produce a
124+
`NonDistinctHeaderError`.
125+
126+
So using the example above, the expected entity would look like this...
127+
| headerCol1 | headerCol2 | headerCol3 |
128+
| ---------- | ---------- | ---------- |
129+
| shop1 | clothes | 2025-01-01 |
130+
"""
131+
@read_function(DuckDBPyRelation)
132+
def read_to_relation( # pylint: disable=unused-argument
133+
self, resource: URI, entity_name: EntityName, schema: Type[BaseModel]
134+
) -> DuckDBPyRelation:
135+
entity = super().read_to_relation(resource=resource, entity_name=entity_name, schema=schema)
136+
entity = entity.distinct()
137+
no_records = entity.shape[0]
138+
139+
if no_records != 1:
140+
rows = entity.pl().to_dicts()
141+
differing_values = [
142+
f"{key}: {', '.join(sorted(str(val) for val in values))}"
143+
for key, *values in zip(rows[0], *map(dict.values, rows)) # type: ignore
144+
if len(set(values)) > 1
145+
]
146+
raise MessageBearingError(
147+
"More than one set of Headers found in CSV file",
148+
messages=[
149+
FeedbackMessage(
150+
record={entity_name: differing_values},
151+
entity="Pre-validation",
152+
failure_type="submission",
153+
error_message=(
154+
f"Found {no_records} distinct combination of header values."
155+
),
156+
error_location=entity_name,
157+
category="Bad file",
158+
error_code="NonUniqueHeader",
159+
)
160+
],
161+
)
162+
163+
return entity

tests/test_core_engine/test_backends/test_implementations/test_duckdb/test_data_contract.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ def test_duckdb_data_contract_csv(temp_csv_file):
7373
entities: Dict[str, DuckDBPyRelation] = {
7474
"test_ds": DuckDBCSVReader(
7575
header=True, delim=",", connection=connection
76-
).read_to_entity_type(DuckDBPyRelation, uri, "test_ds", stringify_model(mdl))
76+
).read_to_entity_type(DuckDBPyRelation, str(uri), "test_ds", stringify_model(mdl))
7777
}
7878

7979
data_contract: DuckDBDataContract = DuckDBDataContract(connection)

tests/test_core_engine/test_backends/test_readers/test_ddb_csv.py

Lines changed: 83 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,20 @@
66
from duckdb import DuckDBPyRelation, default_connection
77
from pydantic import BaseModel
88

9+
from dve.core_engine.backends.exceptions import EmptyFileError, MessageBearingError
910
from dve.core_engine.backends.implementations.duckdb.duckdb_helpers import (
1011
get_duckdb_type_from_annotation,
1112
)
12-
from dve.core_engine.backends.implementations.duckdb.readers.csv import DuckDBCSVReader, SQLType
13+
from dve.core_engine.backends.implementations.duckdb.readers.csv import (
14+
DuckDBCSVReader,
15+
DuckDBCSVRepeatingHeaderReader,
16+
PolarsToDuckDBCSVReader,
17+
)
1318
from dve.core_engine.backends.utilities import stringify_model
1419
from tests.test_core_engine.test_backends.fixtures import duckdb_connection
1520

21+
# pylint: disable=C0116
22+
1623

1724
class SimpleModel(BaseModel):
1825
varchar_field: str
@@ -21,6 +28,11 @@ class SimpleModel(BaseModel):
2128
timestamp_field: datetime
2229

2330

31+
class SimpleHeaderModel(BaseModel):
32+
header_1: str
33+
header_2: str
34+
35+
2436
@pytest.fixture
2537
def temp_dir():
2638
with TemporaryDirectory(prefix="ddb_test_csv_reader") as temp_dir:
@@ -43,18 +55,19 @@ def temp_csv_file(temp_dir: Path):
4355
yield temp_dir.joinpath("dummy.csv"), header, typed_data, SimpleModel
4456

4557

46-
class SimpleModel(BaseModel):
47-
varchar_field: str
48-
bigint_field: int
49-
date_field: date
50-
timestamp_field: datetime
58+
@pytest.fixture
59+
def temp_empty_csv_file(temp_dir: Path):
60+
with open(temp_dir.joinpath("empty.csv"), mode="w"):
61+
pass
62+
63+
yield temp_dir.joinpath("empty.csv"), SimpleModel
5164

5265

5366
def test_ddb_csv_reader_all_str(temp_csv_file):
5467
uri, header, data, mdl = temp_csv_file
5568
reader = DuckDBCSVReader(header=True, delim=",", connection=default_connection)
5669
rel: DuckDBPyRelation = reader.read_to_entity_type(
57-
DuckDBPyRelation, uri, "test", stringify_model(mdl)
70+
DuckDBPyRelation, str(uri), "test", stringify_model(mdl)
5871
)
5972
assert rel.columns == header.split(",")
6073
assert dict(zip(rel.columns, rel.dtypes)) == {fld: "VARCHAR" for fld in header.split(",")}
@@ -64,7 +77,7 @@ def test_ddb_csv_reader_all_str(temp_csv_file):
6477
def test_ddb_csv_reader_cast(temp_csv_file):
6578
uri, header, data, mdl = temp_csv_file
6679
reader = DuckDBCSVReader(header=True, delim=",", connection=default_connection)
67-
rel: DuckDBPyRelation = reader.read_to_entity_type(DuckDBPyRelation, uri, "test", mdl)
80+
rel: DuckDBPyRelation = reader.read_to_entity_type(DuckDBPyRelation, str(uri), "test", mdl)
6881
assert rel.columns == header.split(",")
6982
assert dict(zip(rel.columns, rel.dtypes)) == {
7083
fld.name: str(get_duckdb_type_from_annotation(fld.annotation))
@@ -77,9 +90,70 @@ def test_ddb_csv_write_parquet(temp_csv_file):
7790
uri, header, data, mdl = temp_csv_file
7891
reader = DuckDBCSVReader(header=True, delim=",", connection=default_connection)
7992
rel: DuckDBPyRelation = reader.read_to_entity_type(
80-
DuckDBPyRelation, uri, "test", stringify_model(mdl)
93+
DuckDBPyRelation, str(uri), "test", stringify_model(mdl)
8194
)
8295
target_loc: Path = uri.parent.joinpath("test_parquet.parquet").as_posix()
8396
reader.write_parquet(rel, target_loc)
8497
parquet_rel = reader._connection.read_parquet(target_loc)
8598
assert parquet_rel.df().to_dict(orient="records") == rel.df().to_dict(orient="records")
99+
100+
101+
def test_ddb_csv_read_empty_file(temp_empty_csv_file):
102+
uri, mdl = temp_empty_csv_file
103+
reader = DuckDBCSVReader(header=True, delim=",", connection=default_connection)
104+
105+
with pytest.raises(EmptyFileError):
106+
reader.read_to_relation(str(uri), "test", mdl)
107+
108+
109+
def test_polars_to_ddb_csv_reader(temp_csv_file):
110+
uri, header, data, mdl = temp_csv_file
111+
reader = PolarsToDuckDBCSVReader(
112+
header=True, delim=",", quotechar='"', connection=default_connection
113+
)
114+
entity = reader.read_to_relation(str(uri), "test", mdl)
115+
116+
assert entity.shape[0] == 2
117+
118+
119+
def test_ddb_csv_repeating_header_reader_non_duplicate(temp_dir):
120+
header = "header_1,header_2,non_header_1"
121+
typed_data = [
122+
["hvalue1", "hvalue1", "nhvalue1"],
123+
["hvalue1", "hvalue1", "nhvalue2"],
124+
["hvalue1", "hvalue1", "nhvalue3"],
125+
]
126+
with open(temp_dir.joinpath("test_header.csv"), mode="w") as csv_file:
127+
csv_file.write(header + "\n")
128+
for rw in typed_data:
129+
csv_file.write(",".join([str(val) for val in rw]) + "\n")
130+
131+
file_uri = temp_dir.joinpath("test_header.csv")
132+
133+
reader = DuckDBCSVRepeatingHeaderReader(
134+
header=True, delim=",", quotechar='"', connection=default_connection
135+
)
136+
entity = reader.read_to_relation(str(file_uri), "test", SimpleHeaderModel)
137+
138+
assert entity.shape[0] == 1
139+
140+
141+
def test_ddb_csv_repeating_header_reader_with_more_than_one_set_of_distinct_values(temp_dir):
142+
header = "header_1,header_2,non_header_1"
143+
typed_data = [
144+
["hvalue1", "hvalue2", "nhvalue1"],
145+
["hvalue2", "hvalue2", "nhvalue2"],
146+
["hvalue1", "hvalue1", "nhvalue3"],
147+
]
148+
with open(temp_dir.joinpath("test_header.csv"), mode="w") as csv_file:
149+
csv_file.write(header + "\n")
150+
for rw in typed_data:
151+
csv_file.write(",".join([str(val) for val in rw]) + "\n")
152+
153+
file_uri = temp_dir.joinpath("test_header.csv")
154+
reader = DuckDBCSVRepeatingHeaderReader(
155+
header=True, delim=",", quotechar='"', connection=default_connection
156+
)
157+
158+
with pytest.raises(MessageBearingError):
159+
reader.read_to_relation(str(file_uri), "test", SimpleHeaderModel)

0 commit comments

Comments
 (0)