Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,7 @@ local*.*
!local_spark_session.py
!local_file_system_storage_configuration.py
!local_cred_utils.py
!src/corvus_python/storage/local_file_storage.py
test-reports/
pytest-test-results.xml
behave-test-results.xml
Expand Down
1,088 changes: 1,087 additions & 1 deletion poetry.lock

Large diffs are not rendered by default.

8 changes: 8 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,14 @@ pyodbc = "^5.1.0"
pyspark = { version = "^3.3.1", optional = true }
delta-spark = { version = ">=2.2.0,<4", optional = true }
azure-communication-email = "^1.0.0"
polars = "^1.38.1"
pandera = {extras = ["polars"], version = "^0.29.0"}
deltalake = "^1.4.2"
opentelemetry-api = "^1.40.0"
azure-storage-blob = "^12.28.0"
fastexcel = "^0.19.0"
fsspec = "^2026.2.0"
azure-data-tables = "^12.7.0"

[tool.poetry.extras]
pyspark = ["pyspark", "delta-spark"]
Expand Down
6 changes: 6 additions & 0 deletions src/corvus_python/repositories/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from .database_definition import DatabaseDefinition, TableDefinition # noqa: F401
from .delta_table_repository import DeltaTableRepository # noqa: F401
from .polars_csv_data_repository import PolarsCsvDataRepository # noqa: F401
from .polars_azure_table_repository import PolarsAzureTableRepository # noqa: F401
from .polars_excel_data_repository import PolarsExcelDataRepository # noqa: F401
from .polars_json_data_repository import PolarsJsonDataRepository # noqa: F401
18 changes: 18 additions & 0 deletions src/corvus_python/repositories/database_definition.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from dataclasses import dataclass
from typing import Optional

from pandera.polars import DataFrameSchema


@dataclass
class TableDefinition:
name: str
schema: DataFrameSchema
title: Optional[str] = None
db_schema: Optional[str] = None


@dataclass
class DatabaseDefinition:
name: str
tables: list[TableDefinition]
225 changes: 225 additions & 0 deletions src/corvus_python/repositories/delta_table_repository.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
import polars as pl
from deltalake import DeltaTable, write_deltalake
from opentelemetry import trace

from ..storage import (
DataLakeLayer,
StorageConfiguration,
)
from ..tracing import all_methods_start_new_current_span_with_method_name
from ..repositories import DatabaseDefinition, TableDefinition
from ..schema import pandera_polars_to_deltalake_schema

tracer = trace.get_tracer(__name__)


@all_methods_start_new_current_span_with_method_name(tracer)
class DeltaTableRepository:
"""
A repository for managing Delta Lake tables.

This class provides methods for reading, writing, and managing Delta tables
within a specified data lake layer. It handles schema validation using Pandera
and integrates with OpenTelemetry for tracing.
"""

def __init__(
self,
storage_configuration: StorageConfiguration,
data_lake_layer: DataLakeLayer,
base_path: str,
database_definition: DatabaseDefinition,
):
"""
Initializes the DeltaTableRepository.

Args:
storage_configuration: Configuration for accessing storage.
data_lake_layer: The data lake layer (e.g., Bronze, Silver, Gold).
base_path: The base path within the data lake layer.
database_definition: The definition of the database and its tables.
"""
self.storage_configuration = storage_configuration
self.data_lake_layer = data_lake_layer
self.base_path = base_path
self.database_definition = database_definition
self.initialised = False
self.storage_options = self.storage_configuration.storage_options
self.pandera_schemas = {table.name: table.schema for table in self.database_definition.tables}

def read_data(self, table_name: str) -> pl.DataFrame | None:
"""
Reads data from a Delta table into a Polars DataFrame.

Args:
table_name: The name of the table to read.

Returns:
A Polars DataFrame containing the table data, or None if the table is empty.
"""
path = self._get_table_path(table_name)

# TODO: Potential perf improvement: Use `scan_delta` and return LazyFrame instead?
df = pl.read_delta(path, storage_options=self.storage_options)

return df

def overwrite_table(
self,
table_name: str,
data: pl.DataFrame | pl.LazyFrame,
overwrite_schema: bool = False
):
span = trace.get_current_span()

if isinstance(data, pl.LazyFrame):
data = data.collect()

span.set_attributes(
{
"row_count": data.height,
"database_name": self.database_definition.name,
"table_name": table_name,
}
)

self.ensure_initialised()

path = self._get_table_path(table_name)

schema = self.pandera_schemas[table_name]

schema.validate(data, lazy=False)

data.write_delta(
path,
mode="overwrite",
overwrite_schema=overwrite_schema,
storage_options=self.storage_options,
)

def overwrite_table_with_condition(
self,
table_name: str,
data: pl.DataFrame | pl.LazyFrame,
predicate: str,
overwrite_schema: bool = False
):
span = trace.get_current_span()

if isinstance(data, pl.LazyFrame):
data = data.collect()

span.set_attributes(
{
"row_count": data.height,
"database_name": self.database_definition.name,
"table_name": table_name,
"predicate": predicate,
}
)

self.ensure_initialised()

schema = self.pandera_schemas[table_name]

schema.validate(data, lazy=False)

path = self._get_table_path(table_name)

write_deltalake(
path,
data.to_arrow(), # type: ignore
mode="overwrite",
predicate=predicate,
schema_mode=None,
overwrite_schema=overwrite_schema,
storage_options=self.storage_options,
)

def append_to_table(self, table_name: str, data: pl.DataFrame | pl.LazyFrame):
span = trace.get_current_span()

if isinstance(data, pl.LazyFrame):
data = data.collect()

span.set_attributes(
{
"row_count": data.height,
"database_name": self.database_definition.name,
"table_name": table_name,
}
)

self.ensure_initialised()

self.pandera_schemas[table_name].validate(data, lazy=False)

path = self._get_table_path(table_name)

data.write_delta(
path,
mode="append",
storage_options=self.storage_options,
)

def ensure_all_rows_match(self, data: pl.DataFrame, column_name: str, value: str):
span = trace.get_current_span()
span.set_attributes({"column_name": column_name, "value": value})

if data.filter(pl.col(column_name) != value).height > 0:
raise ValueError(f"Column '{column_name}' must have value '{value}' for all rows")

def ensure_initialised(self):
if not self.initialised:
self.initialise_database()
self.initialised = True

def initialise_database(self):
database_location = self.storage_configuration.get_full_path(
self.data_lake_layer,
f"{self.base_path}/{self.database_definition.name}",
)

span = trace.get_current_span()

if (self.database_definition.tables is None) or (len(self.database_definition.tables) == 0):
span.set_attribute("initialisation_required", False)
return

span.set_attributes(
{
"initialisation_required": True,
"database_location": database_location,
"database_name": self.database_definition.name,
}
)

for table in self.database_definition.tables:
self.initialise_table(table)

def initialise_table(self, table: TableDefinition):
span = trace.get_current_span()
span.set_attributes({"database_name": self.database_definition.name, "table_name": table.name})

table_path = self._get_table_path(table.name)

try:
DeltaTable(table_path, storage_options=self.storage_options)
table_exists = True
except Exception:
table_exists = False

if not table_exists:
schema = table.schema
_: DeltaTable = DeltaTable.create(
table_path,
schema=pandera_polars_to_deltalake_schema(schema),
storage_options=self.storage_options,
)

def _get_table_path(self, table_name: str) -> str:
return self.storage_configuration.get_full_path(
self.data_lake_layer,
f"{self.base_path}/{self.database_definition.name}/{table_name}",
)
81 changes: 81 additions & 0 deletions src/corvus_python/repositories/polars_azure_table_repository.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
import logging
from typing import Dict
import polars as pl
from azure.data.tables import TableServiceClient, EntityProperty
from azure.identity import DefaultAzureCredential


class PolarsAzureTableRepository:
"""
Repository for interacting with Azure Table Storage using Polars DataFrames.
"""

def __init__(self, storage_account_name: str):
"""
Initializes the repository with the given storage account name.

Args:
storage_account_name (str): The name of the Azure storage account.
"""
self.logger: logging.Logger = logging.getLogger(__name__)
self.table_service_client = TableServiceClient(
endpoint=f"https://{storage_account_name}.table.core.windows.net",
credential=DefaultAzureCredential(),
)

def query(
self, table_name: str, query_filter: str, parameters: Dict[str, str], schema: dict[str, pl.DataType] = None
) -> pl.DataFrame:
"""
Queries data from the specified Azure Table and loads it into a Polars DataFrame.

Args:
table_name (str): The name of the Azure Table to load data from.
query_filter (str): The query to filter the data.
parameters (Dict[str, str]): Parameters for the query filter.
schema (dict[str, pl.DataType]): Optional schema for the resulting DataFrame.
Returns:
pl.DataFrame: The data loaded from the Azure Table as a Polars DataFrame.
"""
self.logger.info("query_table - Table name: %s - Query: %s", table_name, query_filter)

table_client = self.table_service_client.get_table_client(table_name)
entities = list(table_client.query_entities(query_filter, parameters=parameters))

if not entities:
self.logger.warning("query_table - No data found in table: %s", table_name)
return pl.DataFrame(schema=schema)

# Some types have their values wrapped in an EntityProperty (GUID, INT64, BINARY)
for entity in entities:
for key, value in list(entity.items()):
if isinstance(value, EntityProperty):
entity[key] = value.value

df = pl.DataFrame(entities, schema=schema)
self.logger.info("query_table - Loaded %d records from table: %s", df.height, table_name)
return df

def get_entities_partition_key_starts_with(
self, table_name: str, partition_key_prefix: str, schema: dict[str, pl.DataType] = None
) -> pl.DataFrame:
"""
Retrieves entities from the specified Azure Table where the PartitionKey starts with the given prefix.

Args:
table_name (str): The name of the Azure Table to query.
partition_key_prefix (str): The prefix to filter PartitionKeys.
schema (dict[str, pl.DataType]): Optional schema for the resulting DataFrame.

Returns:
pl.DataFrame: The data loaded from the Azure Table as a Polars DataFrame.
"""
query_filter = "PartitionKey ge @prefix and PartitionKey lt @next_prefix"
next_prefix = partition_key_prefix[:-1] + chr(ord(partition_key_prefix[-1]) + 1)

parameters = {
"prefix": partition_key_prefix,
"next_prefix": next_prefix,
}

return self.query(table_name, query_filter, parameters, schema=schema)
Loading
Loading