Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions examples/airflow-migration/basic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
from pathlib import Path

import flyteplugins.airflow.task # noqa: F401 — triggers DAG + operator monkey-patches
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator

import flyte


def hello_python():
print("Hello from PythonOperator!")


# Standard Airflow DAG definition — no Flyte-specific changes needed inside the block.
# Pass flyte_env so the generated workflow task uses the right container image.
with DAG(
dag_id="simple_airflow_workflow",
) as dag:
t1 = BashOperator(
task_id="say_hello",
bash_command='echo "Hello Airflow!"',
)
t2 = BashOperator(
task_id="say_goodbye",
bash_command='echo "Goodbye Airflow!"',
)
t3 = PythonOperator(
task_id="hello_python",
python_callable=hello_python,
)
t1 >> t2 # t2 runs after t1


if __name__ == "__main__":
flyte.init_from_config(root_dir=Path(__file__).parent.parent.parent)
run = flyte.with_runcontext(mode="remote", log_level="10").run(dag)
print(run.url)
2 changes: 1 addition & 1 deletion examples/connectors/bigquery_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
query_template="SELECT * from dataset.flyte_table3;",
)

flyte.TaskEnvironment.from_task("bigquery_env", bigquery_task)
bigquery_env = flyte.TaskEnvironment.from_task("bigquery_env", bigquery_task)


if __name__ == "__main__":
Expand Down
41 changes: 41 additions & 0 deletions plugins/airflow/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# Flyte Airflow Plugin

Run existing Airflow DAGs on Flyte with minimal code changes. The plugin
monkey-patches `airflow.DAG` and `BaseOperator` so that standard Airflow
definitions are transparently converted into Flyte tasks.

## Features

- Write a normal `with DAG(...) as dag:` block — the plugin intercepts
operator construction and wires everything into a Flyte workflow.
- Supports `BashOperator` (`AirflowShellTask`) and `PythonOperator`
(`AirflowPythonFunctionTask`).
- Dependency arrows (`>>`, `<<`) are preserved as execution order.
- Runs locally or remotely — no Airflow cluster required.

## Installation

```bash
pip install flyteplugins-airflow
```

## Quick start

```python
import flyteplugins.airflow.task # triggers DAG + operator monkey-patches
from airflow import DAG
from airflow.operators.bash import BashOperator
import flyte

with DAG(dag_id="my_dag") as dag:
t1 = BashOperator(task_id="step1", bash_command="echo step1")
t2 = BashOperator(task_id="step2", bash_command="echo step2")
t1 >> t2

if __name__ == "__main__":
flyte.init_from_config()
run = flyte.with_runcontext(mode="remote").run(dag)
print(run.url)
```

See `examples/airflow-migration/` for a full example including `PythonOperator`.
79 changes: 79 additions & 0 deletions plugins/airflow/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
[project]
name = "flyteplugins-airflow"
dynamic = ["version"]
description = "Airflow plugin for flyte"
readme = "README.md"
authors = [{ name = "Kevin Su", email = "pingsutw@users.noreply.github.com" }]
requires-python = ">=3.10,<3.13"
dependencies = [
"apache-airflow",
"flyte",
"jsonpickle"
]

[dependency-groups]
dev = [
"pytest>=8.3.5",
"pytest-asyncio>=0.26.0",
]

[build-system]
requires = ["setuptools", "setuptools_scm"]
build-backend = "setuptools.build_meta"

[tool.setuptools]
include-package-data = true
license-files = ["licenses/*.txt", "LICENSE"]

[tool.setuptools.packages.find]
where = ["src"]
include = ["flyteplugins*"]

[tool.setuptools_scm]
root = "../../"

[tool.pytest.ini_options]
norecursedirs = []
log_cli = true
log_cli_level = 20
markers = []
asyncio_default_fixture_loop_scope = "function"

[tool.coverage.run]
branch = true

[tool.ruff]
line-length = 120

[tool.ruff.lint]
select = [
"E",
"W",
"F",
"I",
"PLW",
"YTT",
"ASYNC",
"C4",
"T10",
"EXE",
"ISC",
"LOG",
"PIE",
"Q",
"RSE",
"FLY",
"PGH",
"PLC",
"PLE",
"PLW",
"FURB",
"RUF",
]
ignore = ["PGH003", "PLC0415"]

[tool.ruff.lint.per-file-ignores]
"examples/*" = ["E402"]

[tool.uv.sources]
flyte = { path = "../../", editable = true }
1 change: 1 addition & 0 deletions plugins/airflow/src/flyteplugins/airflow/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@

Loading
Loading