Skip to content

Commit 3b2e501

Browse files
committed
style: Ran black, isort, pylint and mypy. Fixes made to resolve linting and type checking issues. Added test for new get_submission_status pipeline method
1 parent 4f63ea9 commit 3b2e501

File tree

15 files changed

+402
-180
lines changed

15 files changed

+402
-180
lines changed

docs/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -234,10 +234,10 @@ audit_manager = SparkAuditingManager(
234234

235235
# Setting up the Pipeline (in this case the Spark implemented one)
236236
pipeline = SparkDVEPipeline(
237+
processed_files_path="path/where/my/processed_files/should_go/",
237238
audit_tables=audit_manager,
238239
job_run_id=1,
239240
rules_path="path/to/my_dischema",
240-
processed_files_path="path/where/my/processed_files/should_go/",
241241
submitted_files_path="path/to/my/cwt_files/",
242242
reference_data_loader=SparkParquetRefDataLoader,
243243
spark=spark

src/dve/core_engine/backends/base/auditing.py

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -383,7 +383,7 @@ def mark_failed(self, submissions: list[str], **kwargs):
383383
submission_id=submission_id,
384384
processing_status="failed",
385385
submission_result="processing_failed",
386-
**kwargs
386+
**kwargs,
387387
)
388388
for submission_id in submissions
389389
]
@@ -497,26 +497,32 @@ def get_submission_statistics(self, submission_id: str) -> Optional[SubmissionSt
497497
)
498498
except StopIteration:
499499
return None
500+
500501
def get_submission_status(self, submission_id: str) -> Optional[SubmissionStatus]:
501502
"""Get the latest submission status for a submission"""
502-
503+
503504
try:
504-
processing_rec: ProcessingStatusRecord = next(self._processing_status.conv_to_records(
505-
self._processing_status.get_most_recent_records(
506-
order_criteria=[OrderCriteria("time_updated", True)],
507-
pre_filter_criteria=[FilterCriteria("submission_id", submission_id)]
508-
)))
505+
processing_rec: ProcessingStatusRecord = next( # type: ignore
506+
self._processing_status.conv_to_records(
507+
self._processing_status.get_most_recent_records(
508+
order_criteria=[OrderCriteria("time_updated", True)],
509+
pre_filter_criteria=[FilterCriteria("submission_id", submission_id)],
510+
)
511+
)
512+
)
509513
except StopIteration:
510514
return None
511515
sub_status = SubmissionStatus()
512-
sub_stats_rec: Optional[SubmissionStatisticsRecord] = self.get_submission_statistics(submission_id)
516+
sub_stats_rec: Optional[SubmissionStatisticsRecord] = self.get_submission_statistics(
517+
submission_id
518+
)
513519
if processing_rec.submission_result == "processing_failed":
514520
sub_status.processing_failed = True
515521
if processing_rec.submission_result == "validation_failed":
516522
sub_status.validation_failed = True
517523
if sub_stats_rec:
518524
sub_status.number_of_records = sub_stats_rec.record_count
519-
525+
520526
return sub_status
521527

522528
def __enter__(self):

src/dve/core_engine/backends/utilities.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,18 @@
11
"""Necessary, otherwise uncategorised backend functionality."""
22

3-
import json
43
import sys
54
from dataclasses import is_dataclass
65
from datetime import date, datetime, time
76
from decimal import Decimal
87
from typing import GenericAlias # type: ignore
9-
from typing import Any, ClassVar, Optional, Union
8+
from typing import Any, ClassVar, Union
109

1110
import polars as pl # type: ignore
1211
from polars.datatypes.classes import DataTypeClass as PolarsType
1312
from pydantic import BaseModel, create_model
1413

15-
import dve.parser.file_handling as fh
1614
from dve.core_engine.backends.base.utilities import _get_non_heterogenous_type
17-
from dve.core_engine.type_hints import URI, Messages
15+
from dve.core_engine.type_hints import Messages
1816

1917
# We need to rely on a Python typing implementation detail in Python <= 3.7.
2018
if sys.version_info[:2] <= (3, 7):

src/dve/core_engine/exceptions.py

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
"""Exceptions emitted by the pipeline."""
22

33
from collections.abc import Iterator
4+
from typing import Optional
45

56
from dve.core_engine.backends.implementations.spark.types import SparkEntities
67
from dve.core_engine.message import FeedbackMessage
@@ -11,7 +12,11 @@ class CriticalProcessingError(ValueError):
1112
"""An exception emitted if critical errors are received."""
1213

1314
def __init__(
14-
self, error_message: str, *args: object, messages: Messages, entities: SparkEntities
15+
self,
16+
error_message: str,
17+
*args: object,
18+
messages: Optional[Messages],
19+
entities: Optional[SparkEntities] = None
1520
) -> None:
1621
super().__init__(error_message, *args)
1722
self.error_message = error_message
@@ -24,13 +29,13 @@ def __init__(
2429
@property
2530
def critical_messages(self) -> Iterator[FeedbackMessage]:
2631
"""Critical messages which caused the processing error."""
27-
yield from filter(lambda message: message.is_critical, self.messages)
28-
32+
yield from filter(lambda message: message.is_critical, self.messages) # type: ignore
33+
2934
@classmethod
30-
def from_exception(cls, exc:Exception):
31-
return cls(error_message = repr(exc),
32-
entities=None,
33-
messages=[])
35+
def from_exception(cls, exc: Exception):
36+
"""Create from broader exception, for recording in processing errors"""
37+
return cls(error_message=repr(exc), entities=None, messages=[])
38+
3439

3540
class EntityTypeMismatch(TypeError):
3641
"""An exception emitted if entity type outputs from two collaborative objects are different."""

src/dve/core_engine/templating.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,13 @@
1111
from dve.core_engine.type_hints import JSONable, TemplateVariables
1212

1313

14-
class PreserveTemplateUndefined(jinja2.Undefined):
14+
class PreserveTemplateUndefined(jinja2.Undefined): # pylint: disable=too-few-public-methods
1515
"""
16-
Preserve the original template in instances where the value cannot be populated. Whilst this
16+
Preserve the original template in instances where the value cannot be populated. Whilst this
1717
may result in templates coming back in the FeedbackMessage object, it's more useful to know
1818
exactly what should have been populated rather than just returning blank values.
1919
"""
20+
2021
def __str__(self):
2122
return "{{" + self._undefined_name + "}}"
2223

src/dve/pipeline/duckdb_pipeline.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,21 +23,21 @@ class DDBDVEPipeline(BaseDVEPipeline):
2323

2424
def __init__(
2525
self,
26+
processed_files_path: URI,
2627
audit_tables: DDBAuditingManager,
2728
connection: DuckDBPyConnection,
2829
rules_path: Optional[URI],
29-
processed_files_path: Optional[URI],
3030
submitted_files_path: Optional[URI],
3131
reference_data_loader: Optional[type[BaseRefDataLoader]] = None,
3232
job_run_id: Optional[int] = None,
3333
):
3434
self._connection = connection
3535
super().__init__(
36+
processed_files_path,
3637
audit_tables,
3738
DuckDBDataContract(connection=self._connection),
3839
DuckDBStepImplementations.register_udfs(connection=self._connection),
3940
rules_path,
40-
processed_files_path,
4141
submitted_files_path,
4242
reference_data_loader,
4343
job_run_id,

src/dve/pipeline/foundry_ddb_pipeline.py

Lines changed: 58 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,23 @@
1+
# pylint: disable=W0223
12
"""A duckdb pipeline for running on Foundry platform"""
23

34
from typing import Optional
4-
from dve.core_engine.backends.implementations.duckdb.duckdb_helpers import duckdb_get_entity_count, duckdb_write_parquet
5+
6+
from dve.core_engine.backends.implementations.duckdb.duckdb_helpers import (
7+
duckdb_get_entity_count,
8+
duckdb_write_parquet,
9+
)
510
from dve.core_engine.exceptions import CriticalProcessingError
611
from dve.core_engine.models import SubmissionInfo
7-
from dve.core_engine.type_hints import URI, Failed
12+
from dve.core_engine.type_hints import URI
13+
from dve.parser import file_handling as fh
814
from dve.parser.file_handling.implementations.file import LocalFilesystemImplementation
915
from dve.parser.file_handling.service import _get_implementation
1016
from dve.pipeline.duckdb_pipeline import DDBDVEPipeline
1117
from dve.pipeline.utils import SubmissionStatus
12-
from dve.parser import file_handling as fh
1318
from dve.reporting.utils import dump_processing_errors
1419

20+
1521
@duckdb_get_entity_count
1622
@duckdb_write_parquet
1723
class FoundryDDBPipeline(DDBDVEPipeline):
@@ -24,12 +30,12 @@ def persist_audit_records(self, submission_info: SubmissionInfo) -> URI:
2430
write_to = fh.file_uri_to_local_path(write_to)
2531
write_to.parent.mkdir(parents=True, exist_ok=True)
2632
write_to = write_to.as_posix()
27-
self.write_parquet(
28-
self._audit_tables._processing_status.get_relation(),
33+
self.write_parquet( # type: ignore # pylint: disable=E1101
34+
self._audit_tables._processing_status.get_relation(), # pylint: disable=W0212
2935
fh.joinuri(write_to, "processing_status.parquet"),
3036
)
31-
self.write_parquet(
32-
self._audit_tables._submission_statistics.get_relation(),
37+
self.write_parquet( # type: ignore # pylint: disable=E1101
38+
self._audit_tables._submission_statistics.get_relation(), # pylint: disable=W0212
3339
fh.joinuri(write_to, "submission_statistics.parquet"),
3440
)
3541
return write_to
@@ -39,62 +45,70 @@ def file_transformation(
3945
) -> tuple[SubmissionInfo, SubmissionStatus]:
4046
try:
4147
return super().file_transformation(submission_info)
42-
except Exception as exc: # pylint: disable=W0718
48+
except Exception as exc: # pylint: disable=W0718
4349
self._logger.error(f"File transformation raised exception: {exc}")
4450
self._logger.exception(exc)
4551
dump_processing_errors(
46-
fh.joinuri(self.processed_files_path, submission_info.submission_id),
47-
"file_transformation",
48-
[CriticalProcessingError.from_exception(exc)]
49-
)
52+
fh.joinuri(self.processed_files_path, submission_info.submission_id),
53+
"file_transformation",
54+
[CriticalProcessingError.from_exception(exc)],
55+
)
5056
self._audit_tables.mark_failed(submissions=[submission_info.submission_id])
5157
return submission_info, SubmissionStatus(processing_failed=True)
5258

53-
def apply_data_contract(self, submission_info: SubmissionInfo, submission_status: Optional[SubmissionStatus] = None) -> tuple[SubmissionInfo | SubmissionStatus]:
59+
def apply_data_contract(
60+
self, submission_info: SubmissionInfo, submission_status: Optional[SubmissionStatus] = None
61+
) -> tuple[SubmissionInfo, SubmissionStatus]:
5462
try:
5563
return super().apply_data_contract(submission_info, submission_status)
56-
except Exception as exc: # pylint: disable=W0718
64+
except Exception as exc: # pylint: disable=W0718
5765
self._logger.error(f"Apply data contract raised exception: {exc}")
5866
self._logger.exception(exc)
5967
dump_processing_errors(
60-
fh.joinuri(self.processed_files_path, submission_info.submission_id),
61-
"contract",
62-
[CriticalProcessingError.from_exception(exc)]
63-
)
68+
fh.joinuri(self.processed_files_path, submission_info.submission_id),
69+
"contract",
70+
[CriticalProcessingError.from_exception(exc)],
71+
)
6472
self._audit_tables.mark_failed(submissions=[submission_info.submission_id])
6573
return submission_info, SubmissionStatus(processing_failed=True)
6674

67-
def apply_business_rules(self, submission_info: SubmissionInfo, submission_status: Optional[SubmissionStatus] = None):
75+
def apply_business_rules(
76+
self, submission_info: SubmissionInfo, submission_status: Optional[SubmissionStatus] = None
77+
):
6878
try:
6979
return super().apply_business_rules(submission_info, submission_status)
70-
except Exception as exc: # pylint: disable=W0718
80+
except Exception as exc: # pylint: disable=W0718
7181
self._logger.error(f"Apply business rules raised exception: {exc}")
7282
self._logger.exception(exc)
7383
dump_processing_errors(
74-
fh.joinuri(self.processed_files_path, submission_info.submission_id),
75-
"business_rules",
76-
[CriticalProcessingError.from_exception(exc)]
77-
)
84+
fh.joinuri(self.processed_files_path, submission_info.submission_id),
85+
"business_rules",
86+
[CriticalProcessingError.from_exception(exc)],
87+
)
7888
self._audit_tables.mark_failed(submissions=[submission_info.submission_id])
7989
return submission_info, SubmissionStatus(processing_failed=True)
80-
81-
def error_report(self, submission_info: SubmissionInfo, submission_status: Optional[SubmissionStatus] = None):
90+
91+
def error_report(
92+
self, submission_info: SubmissionInfo, submission_status: Optional[SubmissionStatus] = None
93+
):
8294
try:
8395
return super().error_report(submission_info, submission_status)
84-
except Exception as exc: # pylint: disable=W0718
96+
except Exception as exc: # pylint: disable=W0718
8597
self._logger.error(f"Error reports raised exception: {exc}")
8698
self._logger.exception(exc)
8799
sub_stats = None
88100
report_uri = None
89101
dump_processing_errors(
90-
fh.joinuri(self.processed_files_path, submission_info.submission_id),
91-
"error_report",
92-
[CriticalProcessingError.from_exception(exc)]
93-
)
102+
fh.joinuri(self.processed_files_path, submission_info.submission_id),
103+
"error_report",
104+
[CriticalProcessingError.from_exception(exc)],
105+
)
94106
self._audit_tables.mark_failed(submissions=[submission_info.submission_id])
95107
return submission_info, submission_status, sub_stats, report_uri
96108

97-
def run_pipeline(self, submission_info: SubmissionInfo) -> tuple[Optional[URI], Optional[URI], URI]:
109+
def run_pipeline(
110+
self, submission_info: SubmissionInfo
111+
) -> tuple[Optional[URI], Optional[URI], URI]:
98112
"""Sequential single submission pipeline runner"""
99113
try:
100114
sub_id: str = submission_info.submission_id
@@ -104,8 +118,12 @@ def run_pipeline(self, submission_info: SubmissionInfo) -> tuple[Optional[URI],
104118
sub_info, sub_status = self.file_transformation(submission_info=submission_info)
105119
if not (sub_status.validation_failed or sub_status.processing_failed):
106120
self._audit_tables.mark_data_contract(submission_ids=[sub_id])
107-
sub_info, sub_status = self.apply_data_contract(submission_info=sub_info, submission_status=sub_status)
108-
self._audit_tables.mark_business_rules(submissions=[(sub_id, sub_status.validation_failed)])
121+
sub_info, sub_status = self.apply_data_contract(
122+
submission_info=sub_info, submission_status=sub_status
123+
)
124+
self._audit_tables.mark_business_rules(
125+
submissions=[(sub_id, sub_status.validation_failed)]
126+
)
109127
sub_info, sub_status = self.apply_business_rules(
110128
submission_info=submission_info, submission_status=sub_status
111129
)
@@ -118,15 +136,15 @@ def run_pipeline(self, submission_info: SubmissionInfo) -> tuple[Optional[URI],
118136
submission_info=submission_info, submission_status=sub_status
119137
)
120138
self._audit_tables.add_submission_statistics_records(sub_stats=[sub_stats])
121-
except Exception as err: # pylint: disable=W0718
139+
except Exception as err: # pylint: disable=W0718
122140
self._logger.error(
123-
f"During processing of submission_id: {sub_id}, the following exception was raised: {err}"
141+
f"During processing of submission_id: {sub_id}, this exception was raised: {err}"
124142
)
125143
dump_processing_errors(
126-
fh.joinuri(self.processed_files_path, submission_info.submission_id),
127-
"run_pipeline",
128-
[CriticalProcessingError.from_exception(err)]
129-
)
144+
fh.joinuri(self.processed_files_path, submission_info.submission_id),
145+
"run_pipeline",
146+
[CriticalProcessingError.from_exception(err)],
147+
)
130148
self._audit_tables.mark_failed(submissions=[sub_id])
131149
finally:
132150
audit_files_uri = self.persist_audit_records(submission_info=submission_info)

0 commit comments

Comments
 (0)