Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 63 additions & 0 deletions plugins/pandera/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# Flyte Pandera Plugin

This plugin integrates [Pandera](https://pandera.readthedocs.io/) with [Flyte](https://flyte.org/), enabling automatic runtime validation of pandas DataFrames against Pandera schemas as data flows between tasks.

## Installation

```bash
pip install flyteplugins-pandera
```

## Usage

Define a Pandera schema using `DataFrameModel` and use `pandera.typing.DataFrame` as your task's type annotation:

```python
import flyte
import pandas as pd
import pandera as pa
from pandera.typing import DataFrame, Series

env = flyte.TaskEnvironment(name="my-env")


class UserSchema(pa.DataFrameModel):
name: Series[str]
age: Series[int] = pa.Field(ge=0, le=120)
email: Series[str]


@env.task
async def generate_users() -> DataFrame[UserSchema]:
return pd.DataFrame({
"name": ["Alice", "Bob"],
"age": [25, 30],
"email": ["alice@example.com", "bob@example.com"],
})


@env.task
async def process_users(df: DataFrame[UserSchema]) -> DataFrame[UserSchema]:
df["age"] = df["age"] + 1
return df
```

DataFrames are automatically validated against the schema on both input and output. If validation fails, a detailed error report is generated.

## Configuration

Control validation behavior using `ValidationConfig` with `typing.Annotated`:

```python
from typing import Annotated
from flyteplugins.pandera import ValidationConfig

@env.task
async def lenient_task(
df: Annotated[DataFrame[UserSchema], ValidationConfig(on_error="warn")]
) -> DataFrame[UserSchema]:
return df
```

- `on_error="raise"` (default): Raises an exception on validation failure
- `on_error="warn"`: Logs a warning and continues with the original data
73 changes: 73 additions & 0 deletions plugins/pandera/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
[project]
name = "flyteplugins-pandera"
dynamic = ["version"]
description = "Pandera data validation plugin for Flyte"
readme = "README.md"
authors = [{ name = "Flyte Contributors", email = "admin@flyte.org" }]
requires-python = ">=3.10"
dependencies = [
"pandera",
"flyte",
"great_tables",
]

[project.entry-points."flyte.plugins.types"]
pandera = "flyteplugins.pandera.transformer:register_pandera_transformer"

[build-system]
requires = ["setuptools", "setuptools_scm"]
build-backend = "setuptools.build_meta"

[tool.setuptools]
include-package-data = true
license-files = ["licenses/*.txt", "LICENSE"]

[tool.setuptools.packages.find]
where = ["src"]
include = ["flyteplugins*"]

[tool.setuptools_scm]
root = "../../"

[tool.pytest.ini_options]
norecursedirs = []
log_cli = true
log_cli_level = 20
markers = []
asyncio_default_fixture_loop_scope = "function"

[tool.coverage.run]
branch = true

[tool.ruff]
line-length = 120

[tool.ruff.lint]
select = [
"E",
"W",
"F",
"I",
"PLW",
"YTT",
"ASYNC",
"C4",
"T10",
"EXE",
"ISC",
"LOG",
"PIE",
"Q",
"RSE",
"FLY",
"PGH",
"PLC",
"PLE",
"PLW",
"FURB",
"RUF",
]
ignore = ["PGH003", "PLC0415", "ASYNC240"]

[tool.ruff.lint.per-file-ignores]
"examples/*" = ["E402"]
19 changes: 19 additions & 0 deletions plugins/pandera/src/flyteplugins/pandera/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
"""
Pandera data validation plugin for Flyte.

This plugin integrates Pandera's runtime data validation with Flyte's type system,
enabling automatic validation of pandas DataFrames against Pandera schemas when
data flows between tasks.

.. autosummary::
:template: custom.rst
:toctree: generated/

PanderaTransformer
PandasReportRenderer
ValidationConfig
"""

from .config import ValidationConfig as ValidationConfig
from .renderer import PandasReportRenderer as PandasReportRenderer
from .transformer import PanderaTransformer as PanderaTransformer
17 changes: 17 additions & 0 deletions plugins/pandera/src/flyteplugins/pandera/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
"""Pandera validation configuration."""

from dataclasses import dataclass
from typing import Literal


@dataclass
class ValidationConfig:
"""Configuration for Pandera validation behavior.

Attributes:
on_error: Determines how validation errors are handled.
"raise" will raise the SchemaError/SchemaErrors exception.
"warn" will log a warning and continue with the original data.
"""

on_error: Literal["raise", "warn"] = "raise"
Loading
Loading