Skip to content

Commit fc10691

Browse files
fix: fix issues and implementation logic for xsd validation
1 parent c0b62bc commit fc10691

File tree

21 files changed

+164
-468
lines changed

21 files changed

+164
-468
lines changed

src/dve/core_engine/backends/implementations/duckdb/readers/csv.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,12 @@ class DuckDBCSVReader(BaseFileReader):
3030
# TODO - stringify or not
3131
def __init__(
3232
self,
33+
*,
3334
header: bool = True,
3435
delim: str = ",",
3536
quotechar: str = '"',
3637
connection: Optional[DuckDBPyConnection] = None,
38+
**_,
3739
):
3840
self.header = header
3941
self.delim = delim

src/dve/core_engine/backends/implementations/duckdb/readers/json.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,12 @@
2020
class DuckDBJSONReader(BaseFileReader):
2121
"""A reader for JSON files"""
2222

23-
def __init__(self, json_format: Optional[str] = "array"):
23+
def __init__(
24+
self,
25+
*,
26+
json_format: Optional[str] = "array",
27+
**_,
28+
):
2429
self._json_format = json_format
2530

2631
super().__init__()

src/dve/core_engine/backends/implementations/duckdb/readers/xml.py

Lines changed: 8 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -8,21 +8,18 @@
88
from pydantic import BaseModel
99

1010
from dve.core_engine.backends.base.reader import read_function
11+
from dve.core_engine.backends.exceptions import MessageBearingError
1112
from dve.core_engine.backends.implementations.duckdb.duckdb_helpers import duckdb_write_parquet
1213
from dve.core_engine.backends.readers.xml import XMLStreamReader
1314
from dve.core_engine.backends.utilities import get_polars_type_from_annotation, stringify_model
1415
from dve.core_engine.type_hints import URI
15-
from dve.parser.file_handling.service import get_parent
16-
from dve.pipeline.utils import dump_errors
1716

1817

1918
@duckdb_write_parquet
2019
class DuckDBXMLStreamReader(XMLStreamReader):
2120
"""A reader for XML files"""
2221

23-
def __init__(self,
24-
ddb_connection: Optional[DuckDBPyConnection] = None,
25-
**kwargs):
22+
def __init__(self, *, ddb_connection: Optional[DuckDBPyConnection] = None, **kwargs):
2623
self.ddb_connection = ddb_connection if ddb_connection else default_connection
2724
super().__init__(**kwargs)
2825

@@ -32,14 +29,12 @@ def read_to_relation(self, resource: URI, entity_name: str, schema: type[BaseMod
3229
if self.xsd_location:
3330
msg = self._run_xmllint(file_uri=resource)
3431
if msg:
35-
working_folder = get_parent(resource)
36-
dump_errors(
37-
working_folder=working_folder,
38-
step_name="file_transformation",
39-
messages=[msg]
40-
)
41-
42-
polars_schema: Dict[str, pl.DataType] = { # type: ignore
32+
raise MessageBearingError(
33+
"Submitted file failed XSD validation.",
34+
messages=[msg],
35+
)
36+
37+
polars_schema: dict[str, pl.DataType] = { # type: ignore
4338
fld.name: get_polars_type_from_annotation(fld.annotation)
4439
for fld in stringify_model(schema).__fields__.values()
4540
}

src/dve/core_engine/backends/implementations/spark/readers/csv.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ def __init__(
3131
multi_line: bool = False,
3232
encoding: str = "utf-8-sig",
3333
spark_session: Optional[SparkSession] = None,
34+
**_,
3435
) -> None:
3536

3637
self.delimiter = delimiter

src/dve/core_engine/backends/implementations/spark/readers/json.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ def __init__(
2727
encoding: Optional[str] = "utf-8",
2828
multi_line: Optional[bool] = True,
2929
spark_session: Optional[SparkSession] = None,
30+
**_,
3031
) -> None:
3132

3233
self.encoding = encoding

src/dve/core_engine/backends/implementations/spark/readers/xml.py

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -12,19 +12,18 @@
1212
from pyspark.sql.utils import AnalysisException
1313
from typing_extensions import Literal
1414

15-
from dve.core_engine.backends.base.reader import BaseFileReader, read_function
15+
from dve.core_engine.backends.base.reader import read_function
1616
from dve.core_engine.backends.exceptions import EmptyFileError
1717
from dve.core_engine.backends.implementations.spark.spark_helpers import (
1818
df_is_empty,
1919
get_type_from_annotation,
2020
spark_write_parquet,
2121
)
2222
from dve.core_engine.backends.readers.xml import BasicXMLFileReader, XMLStreamReader
23-
from dve.core_engine.backends.readers.xml_linting import run_xmllint
23+
from dve.core_engine.backends.utilities import dump_errors
2424
from dve.core_engine.type_hints import URI, EntityName
2525
from dve.parser.file_handling import get_content_length, get_parent
2626
from dve.parser.file_handling.service import open_stream
27-
from dve.pipeline.utils import dump_errors
2827

2928
SparkXMLMode = Literal["PERMISSIVE", "FAILFAST", "DROPMALFORMED"]
3029
"""The mode to use when parsing XML files with Spark."""
@@ -74,10 +73,11 @@ def __init__(
7473
trim_cells=True,
7574
xsd_location: Optional[URI] = None,
7675
xsd_error_code: Optional[str] = None,
77-
xsd_error_message: Optional[str] = None
76+
xsd_error_message: Optional[str] = None,
77+
rules_location: Optional[URI] = None,
7878
**_,
7979
) -> None:
80-
80+
8181
super().__init__(
8282
record_tag=record_tag,
8383
root_tag=root_tag,
@@ -86,7 +86,8 @@ def __init__(
8686
sanitise_multiline=sanitise_multiline,
8787
xsd_location=xsd_location,
8888
xsd_error_code=xsd_error_code,
89-
xsd_error_message=xsd_error_message
89+
xsd_error_message=xsd_error_message,
90+
rules_location=rules_location,
9091
)
9192

9293
self.spark_session = spark_session or SparkSession.builder.getOrCreate()
@@ -117,16 +118,14 @@ def read_to_dataframe(
117118
"""
118119
if get_content_length(resource) == 0:
119120
raise EmptyFileError(f"File at {resource} is empty.")
120-
121+
121122
if self.xsd_location:
122123
msg = self._run_xmllint(file_uri=resource)
123124
if msg:
124125
working_folder = get_parent(resource)
125126
dump_errors(
126-
working_folder=working_folder,
127-
step_name="file_transformation",
128-
messages=[msg]
129-
)
127+
working_folder=working_folder, step_name="file_transformation", messages=[msg]
128+
)
130129

131130
spark_schema: StructType = get_type_from_annotation(schema)
132131
kwargs = {
@@ -165,7 +164,7 @@ def read_to_dataframe(
165164
kwargs["rowTag"] = f"{namespace}:{self.record_tag}"
166165
df = (
167166
self.spark_session.read.format("xml")
168-
.options(**kwargs)
167+
.options(**kwargs) # type: ignore
169168
.load(resource, schema=read_schema)
170169
)
171170
if self.root_tag and df.columns:

src/dve/core_engine/backends/readers/csv.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ def __init__(
3636
trim_cells: bool = True,
3737
null_values: Collection[str] = frozenset({"NULL", "null", ""}),
3838
encoding: str = "utf-8-sig",
39+
**_,
3940
):
4041
"""Init function for the base CSV reader.
4142

src/dve/core_engine/backends/readers/xml.py

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ def clear(self) -> None:
103103
def __iter__(self) -> Iterator["XMLElement"]: ...
104104

105105

106-
class BasicXMLFileReader(BaseFileReader):
106+
class BasicXMLFileReader(BaseFileReader): # pylint: disable=R0902
107107
"""A reader for XML files built atop LXML."""
108108

109109
def __init__(
@@ -119,6 +119,7 @@ def __init__(
119119
xsd_location: Optional[URI] = None,
120120
xsd_error_code: Optional[str] = None,
121121
xsd_error_message: Optional[str] = None,
122+
rules_location: Optional[URI] = None,
122123
**_,
123124
):
124125
"""Init function for the base XML reader.
@@ -153,8 +154,11 @@ def __init__(
153154
"""Encoding of the XML file."""
154155
self.n_records_to_read = n_records_to_read
155156
"""The maximum number of records to read from a document."""
156-
self.xsd_location = xsd_location
157-
"""The relative URI of the xsd file if wishing to perform xsd validation"""
157+
if rules_location is not None and xsd_location is not None:
158+
self.xsd_location = rules_location + xsd_location
159+
else:
160+
self.xsd_location = xsd_location # type: ignore
161+
"""The URI of the xsd file if wishing to perform xsd validation."""
158162
self.xsd_error_code = xsd_error_code
159163
"""The error code to be reported if xsd validation fails (if xsd)"""
160164
self.xsd_error_message = xsd_error_message
@@ -269,12 +273,22 @@ def _parse_xml(
269273

270274
for element in elements:
271275
yield self._parse_element(element, template_row)
272-
273-
def _run_xmllint(self, file_uri: URI) -> FeedbackMessage:
274-
return run_xmllint(file_uri=file_uri,
275-
schema_uri=self.xsd_location,
276-
error_code=self.xsd_error_code,
277-
error_message=self.xsd_error_message)
276+
277+
def _run_xmllint(self, file_uri: URI) -> FeedbackMessage | None:
278+
"""Run xmllint package to validate against a given xsd. Requires xmlint to be installed
279+
onto the system to run succesfully."""
280+
if self.xsd_location is None:
281+
raise AttributeError("Trying to run XML lint with no `xsd_location` provided.")
282+
if self.xsd_error_code is None:
283+
raise AttributeError("Trying to run XML with no `xsd_error_code` provided.")
284+
if self.xsd_error_message is None:
285+
raise AttributeError("Trying to run XML with no `xsd_error_message` provided.")
286+
return run_xmllint(
287+
file_uri=file_uri,
288+
schema_uri=self.xsd_location,
289+
error_code=self.xsd_error_code,
290+
error_message=self.xsd_error_message,
291+
)
278292

279293
def read_to_py_iterator(
280294
self,

src/dve/core_engine/backends/readers/xml_linting.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
1-
"""Implement XML linting for files."""
1+
"""Implement XML linting for files. Please note that xml linting requires xmllint to be installed
2+
onto your system."""
23

34
import shutil
45
import tempfile
6+
from collections.abc import Sequence
57
from contextlib import ExitStack
68
from pathlib import Path
79
from subprocess import PIPE, STDOUT, Popen
8-
from typing import Sequence, Union
10+
from typing import Union
911
from uuid import uuid4
1012

1113
from dve.core_engine.message import FeedbackMessage
@@ -87,7 +89,7 @@ def run_xmllint(
8789
8890
"""
8991
if not shutil.which("xmllint"):
90-
raise OSError("Unable to find `xmllint` binary")
92+
raise OSError("Unable to find `xmllint` binary. Please install to use this functionality.")
9193

9294
if not get_resource_exists(file_uri):
9395
raise IOError(f"No resource accessible at file URI {file_uri!r}")
@@ -128,8 +130,7 @@ def run_xmllint(
128130
# Close the input stream and await the response code.
129131
# Output will be written to the message file.
130132
process.stdin.close()
131-
# TODO: Identify an appropriate timeout.
132-
return_code = process.wait()
133+
return_code = process.wait(10)
133134

134135
if return_code == 0:
135136
return None

src/dve/core_engine/backends/utilities.py

Lines changed: 40 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,21 @@
11
"""Necessary, otherwise uncategorised backend functionality."""
22

3+
import json
34
import sys
45
from dataclasses import is_dataclass
56
from datetime import date, datetime, time
67
from decimal import Decimal
78
from typing import GenericAlias # type: ignore
8-
from typing import Any, ClassVar, Union
9+
from typing import Any, ClassVar, Optional, Union
910

1011
import polars as pl # type: ignore
1112
from polars.datatypes.classes import DataTypeClass as PolarsType
1213
from pydantic import BaseModel, create_model
1314

15+
import dve.parser.file_handling as fh
1416
from dve.core_engine.backends.base.utilities import _get_non_heterogenous_type
15-
from dve.core_engine.type_hints import Messages
17+
from dve.core_engine.type_hints import URI, Messages
18+
from dve.reporting.error_report import conditional_cast
1619

1720
# We need to rely on a Python typing implementation detail in Python <= 3.7.
1821
if sys.version_info[:2] <= (3, 7):
@@ -176,3 +179,38 @@ def get_polars_type_from_annotation(type_annotation: Any) -> PolarsType:
176179
if polars_type:
177180
return polars_type
178181
raise ValueError(f"No equivalent DuckDB type for {type_annotation!r}")
182+
183+
184+
def dump_errors(
185+
working_folder: URI,
186+
step_name: str,
187+
messages: Messages,
188+
key_fields: Optional[dict[str, list[str]]] = None,
189+
):
190+
"""Write out to disk captured feedback error messages."""
191+
if not working_folder:
192+
raise AttributeError("processed files path not passed")
193+
194+
if not key_fields:
195+
key_fields = {}
196+
197+
errors = fh.joinuri(working_folder, "errors", f"{step_name}_errors.json")
198+
processed = []
199+
200+
for message in messages:
201+
primary_keys: list[str] = key_fields.get(message.entity if message.entity else "", [])
202+
error = message.to_dict(
203+
key_field=primary_keys,
204+
value_separator=" -- ",
205+
max_number_of_values=10,
206+
record_converter=None,
207+
)
208+
error["Key"] = conditional_cast(error["Key"], primary_keys, value_separator=" -- ")
209+
processed.append(error)
210+
211+
with fh.open_stream(errors, "a+") as f:
212+
json.dump(
213+
processed,
214+
f,
215+
default=str,
216+
)

0 commit comments

Comments
 (0)