Deferred TI's next_method and next_kwargs not cleared on retries in airflow 3.x #62342
Unanswered
ronak-sirwani
asked this question in
Q&A
Replies: 0 comments
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Uh oh!
There was an error while loading. Please reload this page.
Uh oh!
There was an error while loading. Please reload this page.
-
I have been exploring the new Task SDK and Deferrable Operator lifecycle in Airflow 3.0.6, I’ve observed a behavior regarding task state during retries that I wanted to humbly raise for discussion. It appears that next_method and next_kwargs are persisting across attempts, which seems to differ from the established behavior in Airflow 2.x.
Observation:
When a Deferrable Operator fails during its resumption callback and enters an Up For Retry state, the subsequent attempt (Try 2) appears to incorrectly persist the next_method and next_kwargs from the previous failed attempt (Try 1).
Instead of starting fresh with the standard execute() entry point, the Worker immediately invokes the resumption callback using the stale metadata. This causes the task to bypass the initial setup logic in execute(), leading to "zombie resumptions" where the second attempt tries to process data intended for the first.
When a Deferrable Operator fails during a trigger resumption and enters a Retry (following an Up For Retry state), the subsequent attempt incorrectly persists the
next_methodandnext_kwargsfrom the previous failed attempt.Instead of starting fresh with the standard execute() entry point, the Worker attempts to immediately resume the task using the stale next_method(**next_kwargs) data. This causes the task to either process "stale/zombie" events from the previous attempt or fail prematurely because the initial setup logic in execute() was bypassed.
next_methodandnext_kwargsnot cleared on retries #18146. The fix ensured that when a task was reset for a retry, the next_method and next_kwargs columns in taskisntance table were explicitly nullified.How to reproduce:
My Current Workaround:
To mitigate this in my custom operators, I’ve had to implement a "Single-Entry" pattern to force idempotency:
Always setting method_name="execute" in the self.defer() call.
Passing a unique attempt_id (from ti.try_number) into the Trigger.
Manually validating the attempt_id inside execute() to detect if the event is fresh or stale.
While this works, it adds significant boilerplate and complexity to every custom operator.
I understand the TaskInstance lifecycle has been significantly refactored for Airflow 3.0. I wanted to ask:
Beta Was this translation helpful? Give feedback.
All reactions