Skip to content

Commit e745050

Browse files
authored
Merge pull request #16 from NHSDigital/develop_v04
merge v0.4.0
2 parents 62b573e + baf6589 commit e745050

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

48 files changed

+1479
-624
lines changed

.github/workflows/ci_testing.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ jobs:
1717
- name: Install extra dependencies for a python install
1818
run: |
1919
sudo apt-get update
20-
sudo apt -y install --no-install-recommends liblzma-dev libbz2-dev libreadline-dev
20+
sudo apt -y install --no-install-recommends liblzma-dev libbz2-dev libreadline-dev libxml2-utils
2121
2222
- name: Install asdf cli
2323
uses: asdf-vm/actions/setup@v4

CHANGELOG.md

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,18 @@
1+
## v0.4.0 (2025-12-17)
2+
3+
### Feat
4+
5+
- add persistance of error aggregates to pipeline
6+
- add Foundry pipeline
7+
8+
### Fix
9+
10+
- issue where templated error messages would not correctly format when passing in parameter values
11+
12+
### Refactor
13+
14+
- include submission status for services passthrough
15+
116
## v0.3.0 (2025-11-19)
217

318
### Feat

docs/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -234,10 +234,10 @@ audit_manager = SparkAuditingManager(
234234

235235
# Setting up the Pipeline (in this case the Spark implemented one)
236236
pipeline = SparkDVEPipeline(
237+
processed_files_path="path/where/my/processed_files/should_go/",
237238
audit_tables=audit_manager,
238239
job_run_id=1,
239240
rules_path="path/to/my_dischema",
240-
processed_files_path="path/where/my/processed_files/should_go/",
241241
submitted_files_path="path/to/my/cwt_files/",
242242
reference_data_loader=SparkParquetRefDataLoader,
243243
spark=spark

pyproject.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[tool.poetry]
22
name = "nhs_dve"
3-
version = "0.3.0"
3+
version = "0.4.0"
44
description = "`nhs data validation engine` is a framework used to validate data"
55
authors = ["NHS England <england.contactus@nhs.net>"]
66
readme = "README.md"
@@ -39,7 +39,7 @@ requests = "2.32.4" # Mitigates security vuln in < 2.31.0
3939
schedula = "1.2.19"
4040
sqlalchemy = "2.0.19"
4141
typing_extensions = "4.6.2"
42-
urllib3 = "2.5.0" # Mitigates security vuln in < 1.26.19
42+
urllib3 = "2.6.0" # Mitigates security vuln in < 2.5.0
4343
xmltodict = "0.13.0"
4444

4545
[tool.poetry.group.dev]

src/dve/core_engine/backends/base/auditing.py

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
QueueType,
3232
SubmissionResult,
3333
)
34+
from dve.pipeline.utils import SubmissionStatus
3435

3536
AuditReturnType = TypeVar("AuditReturnType") # pylint: disable=invalid-name
3637

@@ -329,7 +330,7 @@ def mark_business_rules(self, submissions: list[tuple[str, bool]], **kwargs):
329330
ProcessingStatusRecord(
330331
submission_id=submission_id,
331332
processing_status="business_rules",
332-
submission_result="failed" if failed else None,
333+
submission_result="validation_failed" if failed else None,
333334
**kwargs,
334335
)
335336
for submission_id, failed in submissions
@@ -379,7 +380,10 @@ def mark_failed(self, submissions: list[str], **kwargs):
379380
"""Update submission processing_status to failed."""
380381
recs = [
381382
ProcessingStatusRecord(
382-
submission_id=submission_id, processing_status="failed", **kwargs
383+
submission_id=submission_id,
384+
processing_status="failed",
385+
submission_result="processing_failed",
386+
**kwargs,
383387
)
384388
for submission_id in submissions
385389
]
@@ -494,6 +498,33 @@ def get_submission_statistics(self, submission_id: str) -> Optional[SubmissionSt
494498
except StopIteration:
495499
return None
496500

501+
def get_submission_status(self, submission_id: str) -> Optional[SubmissionStatus]:
502+
"""Get the latest submission status for a submission"""
503+
504+
try:
505+
processing_rec: ProcessingStatusRecord = next( # type: ignore
506+
self._processing_status.conv_to_records(
507+
self._processing_status.get_most_recent_records(
508+
order_criteria=[OrderCriteria("time_updated", True)],
509+
pre_filter_criteria=[FilterCriteria("submission_id", submission_id)],
510+
)
511+
)
512+
)
513+
except StopIteration:
514+
return None
515+
sub_status = SubmissionStatus()
516+
sub_stats_rec: Optional[SubmissionStatisticsRecord] = self.get_submission_statistics(
517+
submission_id
518+
)
519+
if processing_rec.submission_result == "processing_failed":
520+
sub_status.processing_failed = True
521+
if processing_rec.submission_result == "validation_failed":
522+
sub_status.validation_failed = True
523+
if sub_stats_rec:
524+
sub_status.number_of_records = sub_stats_rec.record_count
525+
526+
return sub_status
527+
497528
def __enter__(self):
498529
"""Use audit table as context manager"""
499530
if self.pool and self.pool_result.done():

src/dve/core_engine/backends/implementations/duckdb/readers/csv.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,12 @@ class DuckDBCSVReader(BaseFileReader):
3030
# TODO - stringify or not
3131
def __init__(
3232
self,
33+
*,
3334
header: bool = True,
3435
delim: str = ",",
3536
quotechar: str = '"',
3637
connection: Optional[DuckDBPyConnection] = None,
38+
**_,
3739
):
3840
self.header = header
3941
self.delim = delim

src/dve/core_engine/backends/implementations/duckdb/readers/json.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,12 @@
2020
class DuckDBJSONReader(BaseFileReader):
2121
"""A reader for JSON files"""
2222

23-
def __init__(self, json_format: Optional[str] = "array"):
23+
def __init__(
24+
self,
25+
*,
26+
json_format: Optional[str] = "array",
27+
**_,
28+
):
2429
self._json_format = json_format
2530

2631
super().__init__()

src/dve/core_engine/backends/implementations/duckdb/readers/xml.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from pydantic import BaseModel
99

1010
from dve.core_engine.backends.base.reader import read_function
11+
from dve.core_engine.backends.exceptions import MessageBearingError
1112
from dve.core_engine.backends.implementations.duckdb.duckdb_helpers import duckdb_write_parquet
1213
from dve.core_engine.backends.readers.xml import XMLStreamReader
1314
from dve.core_engine.backends.utilities import get_polars_type_from_annotation, stringify_model
@@ -18,13 +19,21 @@
1819
class DuckDBXMLStreamReader(XMLStreamReader):
1920
"""A reader for XML files"""
2021

21-
def __init__(self, ddb_connection: Optional[DuckDBPyConnection] = None, **kwargs):
22+
def __init__(self, *, ddb_connection: Optional[DuckDBPyConnection] = None, **kwargs):
2223
self.ddb_connection = ddb_connection if ddb_connection else default_connection
2324
super().__init__(**kwargs)
2425

2526
@read_function(DuckDBPyRelation)
2627
def read_to_relation(self, resource: URI, entity_name: str, schema: type[BaseModel]):
2728
"""Returns a relation object from the source xml"""
29+
if self.xsd_location:
30+
msg = self._run_xmllint(file_uri=resource)
31+
if msg:
32+
raise MessageBearingError(
33+
"Submitted file failed XSD validation.",
34+
messages=[msg],
35+
)
36+
2837
polars_schema: dict[str, pl.DataType] = { # type: ignore
2938
fld.name: get_polars_type_from_annotation(fld.annotation)
3039
for fld in stringify_model(schema).__fields__.values()

src/dve/core_engine/backends/implementations/spark/readers/csv.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ def __init__(
3131
multi_line: bool = False,
3232
encoding: str = "utf-8-sig",
3333
spark_session: Optional[SparkSession] = None,
34+
**_,
3435
) -> None:
3536

3637
self.delimiter = delimiter

src/dve/core_engine/backends/implementations/spark/readers/json.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ def __init__(
2727
encoding: Optional[str] = "utf-8",
2828
multi_line: Optional[bool] = True,
2929
spark_session: Optional[SparkSession] = None,
30+
**_,
3031
) -> None:
3132

3233
self.encoding = encoding

0 commit comments

Comments
 (0)