Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 104 additions & 0 deletions examples/notification/slack_webhook.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
"""Slack + Email notification example.

Sends a rich Slack notification and an email when a task succeeds or fails.

Usage:
export SLACK_WEBHOOK_URL="https://hooks.slack.com/services/YOUR/WEBHOOK/URL"
export NOTIFICATION_EMAIL="oncall@example.com"
python examples/notification/slack_webhook.py

To receive emails locally, start a debug SMTP server before running:

# Python < 3.12
sudo python -m smtpd -n -c DebuggingServer localhost:25

# Python >= 3.12 (smtpd was removed; use aiosmtpd)
pip install aiosmtpd
sudo python -m aiosmtpd -n -l localhost:25

Both print received emails to stdout. Port 25 requires root (sudo);
alternatively use port 1025, but update the SMTP port in _sender.py.
"""

import os

import flyte
from flyte import notify
from flyte.models import ActionPhase

env = flyte.TaskEnvironment(name="notify_example")

SLACK_WEBHOOK_URL = os.environ["SLACK_WEBHOOK_URL"]
NOTIFICATION_EMAIL = os.environ["NOTIFICATION_EMAIL"]


@env.task
def compute(x: int, y: int) -> int:
return x + y


slack_success = notify.Slack(
on_phase=ActionPhase.SUCCEEDED,
webhook_url=SLACK_WEBHOOK_URL,
blocks=[
{
"type": "header",
"text": {"type": "plain_text", "text": "Task Succeeded"},
},
{
"type": "section",
"fields": [
{"type": "mrkdwn", "text": "*Task:*\n{task.name}"},
{"type": "mrkdwn", "text": "*Run:*\n{run.name}"},
{"type": "mrkdwn", "text": "*Duration:*\n{run.duration}"},
{"type": "mrkdwn", "text": "*Phase:*\n{run.phase}"},
],
},
{"type": "divider"},
{
"type": "context",
"elements": [
{"type": "mrkdwn", "text": "{project}/{domain}"},
],
},
],
)

slack_failure = notify.Slack(
on_phase=ActionPhase.FAILED,
webhook_url=SLACK_WEBHOOK_URL,
blocks=[
{
"type": "header",
"text": {"type": "plain_text", "text": "Task Failed"},
},
{
"type": "section",
"fields": [
{"type": "mrkdwn", "text": "*Task:*\n{task.name}"},
{"type": "mrkdwn", "text": "*Error:*\n{run.error}"},
],
},
],
)

email_success = notify.Email(
on_phase=ActionPhase.SUCCEEDED,
recipients=[NOTIFICATION_EMAIL],
subject="Task {task.name} succeeded",
body=("Task: {task.name}\nRun: {run.name}\nDuration: {run.duration}\nPhase: {run.phase}\n"),
)

email_failure = notify.Email(
on_phase=ActionPhase.FAILED,
recipients=[NOTIFICATION_EMAIL],
subject="ALERT: Task {task.name} failed",
body=("Task: {task.name}\nRun: {run.name}\nError: {run.error}\n"),
)

if __name__ == "__main__":
result = flyte.with_runcontext(
mode="local",
notifications=(slack_success, slack_failure, email_success, email_failure),
).run(compute, x=3, y=7)
print(f"Result: {result}")
94 changes: 94 additions & 0 deletions examples/triggers/notifications.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
from datetime import datetime

import flyte.notify
from flyte.models import ActionPhase

env = flyte.TaskEnvironment(
name="example_task",
)

trig1 = flyte.Trigger(
name="hourly",
auto_activate=True,
automation=flyte.Cron("* * * * *"),
notifications=flyte.notify.Slack(

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Is this saying that notifications will ONLY be sent for failed triggered runs?
    What if I want to configure notifications even for runs that are manually created(via Web or CLI). Lets say, its an important workflow on prod domain and everybody in the team must be aware about failures even If I'm the author of the run and I saw the failure.

  2. Are the notifications only attached to individual tasks?
    do we want to allow configuring notifications for entire domain(prod), project+domain(<important_project> on prod) or maybe project+domain+task_regex?
    What if I want to configure your notifications once, without repeating same configs on multiple task definitions?

As a follow up for 1 and 2, should notifications be a separate SDK entity in which you can configure multiple matching criteria independently? project, domain, task_regex, triggered vs manual, maybe others in future?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like it to be possible to notify at the organization, domain, and project-domain levels as this is a frequent request we got in v1 over the past few years. People want to be able to notify on any failure in the prod domain, for instance. I don't know that this makes sense to include in the SDK, though - I feel like this makes sense in the UI for sure, as well as the API/CLI (which can be used by a terraform provider).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to Votta. With scoping for now to be triggered runs only

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

given that we really only do email for now even for slack, should we just do email?

on_phase=ActionPhase.FAILED,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we just call this on?

webhook_url="https://webhook.site/",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we change the name maybe so that it doesn't cause any confusion with the Webhook notification? slack_endpoint or service_url or something?

message="Hello world! from {task.name}",
blocks=(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we drop this for now, until customers ask? just wary of supporting too many features out of the gate, and then we have to mark things deprecated if they ever change.

{"type": "header", "text": {"type": "plain_text", "text": "🚨 Run Failed"}},
{
"type": "section",
"fields": [
{"type": "mrkdwn", "text": "*Workflow:*\n{run.name}"},
{"type": "mrkdwn", "text": "*Error:*\n{run.error}"},
],
},
{
"type": "actions",
"elements": [
{
"type": "button",
"text": {"type": "plain_text", "text": "View Logs"},
"url": "https://logs.example.com/{run.name}",
}
],
},
),
),
)

trig2 = flyte.Trigger(
name="hourly",
auto_activate=True,
automation=flyte.Cron("* * * * *"),
notifications=(
flyte.notify.Slack(
on_phase=(ActionPhase.FAILED, ActionPhase.TIMED_OUT),
webhook_url="https://webhook.site/",
message="Hello world! from {task.name}",
),
flyte.notify.Webhook(
on_phase=ActionPhase.FAILED,
headers={"Content-Type": "application/json"},
Copy link

@iaroslav-ciupin iaroslav-ciupin Dec 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. should webhooks URLs be accessible without any auth credentials? if not, what should be the DevUX for customer to provide credentials for his webhook securely to Union? and I guess Union notifications service(workers) should be able to read those secure credentials during the notification delivery.
  2. same question about Slack API. Not familiar with it, but do we need some token to hit the API? who and how will provide this token to us?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we will stick to unauthenticated webhooks (or more specifically, webhooks which embed the auth into the webhook itself)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

url="https://webhook.site/",
body={
"xyz": "{run.name}",
},
),
flyte.notify.Email(
on_phase=ActionPhase.SUCCEEDED,
subject="Hello world! from {task.name}",
body="Hello world! from {task.name}",
Copy link

@iaroslav-ciupin iaroslav-ciupin Dec 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just want to double check the DevUX here: if I want the same email template to be sent for all my failed tasks with some variables like <task_name>, <domain>, <project>, etc. , I can simply store this template in a Python variable, and then re-use it in all Email notification configs?
not sure what email provider are we going to use or if its critical right now, but, for example, AWS SES supports both sending raw emails with entire HTML body and sending emails with template id, that refers to previously uploaded template.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just render on client side. As this may change all the time

recipients=("<EMAIL>",),
),
),
)


# Use a pre-defined named rule (configured by your Flyte admin)
trig3 = flyte.Trigger(
name="hourly-with-named-rule",
auto_activate=True,
automation=flyte.Cron("0 * * * *"),
notifications=flyte.notify.NamedRule("oncall-alerts"),
)

# Mix named delivery configs with inline notifications
trig4 = flyte.Trigger(
name="hourly-with-named-delivery",
auto_activate=True,
automation=flyte.Cron("0 * * * *"),
notifications=(
flyte.notify.NamedDelivery(on_phase=ActionPhase.FAILED, name="slack-oncall"),
flyte.notify.Email(
on_phase=ActionPhase.SUCCEEDED,
recipients=("team@example.com",),
),
),
)


@env.task(triggers=(trig1, trig2)) # Every hour
def example_task(trigger_time: datetime, x: int = 1) -> str:
return f"Task executed at {trigger_time.isoformat()} with x={x}"
Loading
Loading