From 5b1a1eafca7f6c80af15ff2cb1512115455d1631 Mon Sep 17 00:00:00 2001 From: HsiuChuanHsu Date: Tue, 2 Sep 2025 06:44:40 +0800 Subject: [PATCH 1/4] fix(k8s): Preserve task history during API rate limiting - Handle 429 errors in KubernetesExecutor task publishing retry logic - Detect orphaned tasks and record TaskInstanceHistory in failure handler - Add detailed logging for rate limiting scenarios --- .../src/airflow/models/taskinstance.py | 22 +++++++++++++++++++ .../executors/kubernetes_executor.py | 14 +++++++++++- 2 files changed, 35 insertions(+), 1 deletion(-) diff --git a/airflow-core/src/airflow/models/taskinstance.py b/airflow-core/src/airflow/models/taskinstance.py index 65c24ec687b65..d60c45969251b 100644 --- a/airflow-core/src/airflow/models/taskinstance.py +++ b/airflow-core/src/airflow/models/taskinstance.py @@ -1547,6 +1547,28 @@ def fetch_handle_failure_context( # about to retry so we record the task instance history. For other states, the task # instance was cleared and already recorded in the task instance history. ti.prepare_db_for_next_try(session) + elif ti.state is None and ti.start_date is not None and ti.end_date is None: + # If the task instance state is None but has a start_date without end_date, + # it likely means the task was running but became orphaned and its state was reset. + # This can happen during scheduler restarts when executors fail to adopt running tasks + # (e.g., due to Kubernetes API 429 errors). We should still record the task instance + # history to maintain complete log history for troubleshooting. + from airflow.models.taskinstancehistory import TaskInstanceHistory + + log.info( + "Recording task instance history for orphaned task %s that was previously running " + "(start_date: %s, state reset to None)", + ti.key, + ti.start_date, + ) + # Temporarily set state to RUNNING to trigger proper history recording + original_state = ti.state + ti.state = TaskInstanceState.RUNNING + try: + TaskInstanceHistory.record_ti(ti, session=session) + finally: + # Restore the original state + ti.state = original_state ti.state = State.UP_FOR_RETRY diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py index 130d37b546c3e..4e169ab5d890a 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py @@ -380,11 +380,12 @@ def sync(self) -> None: body = {"message": e.body} retries = self.task_publish_retries[key] - # In case of exceeded quota or conflict errors, requeue the task as per the task_publish_max_retries + # In case of exceeded quota, conflict errors, or rate limiting, requeue the task as per the task_publish_max_retries message = body.get("message", "") if ( (str(e.status) == "403" and "exceeded quota" in message) or (str(e.status) == "409" and "object has been modified" in message) + or str(e.status) == "429" # Add support for rate limiting errors ) and (self.task_publish_max_retries == -1 or retries < self.task_publish_max_retries): self.log.warning( "[Try %s of %s] Kube ApiException for Task: (%s). Reason: %r. Message: %s", @@ -682,6 +683,17 @@ def adopt_launched_task( ) except ApiException as e: self.log.info("Failed to adopt pod %s. Reason: %s", pod.metadata.name, e) + + # Log detailed information for rate limiting errors (429) which can cause task history loss + if str(e.status) == "429": + self.log.warning( + "Kubernetes API rate limiting (429) prevented adoption of pod %s for task %s. " + "This may cause task history loss if the task was previously running. " + "Consider implementing rate limiting backoff or increasing API quota.", + pod.metadata.name, + ti_key, + ) + return del tis_to_flush_by_key[ti_key] From d86f8b88337ee82330eca00ebf30fa55dc79fa8d Mon Sep 17 00:00:00 2001 From: HsiuChuanHsu Date: Tue, 2 Sep 2025 21:06:02 +0800 Subject: [PATCH 2/4] fix(k8s): Update tests to reflect new 429 error retry behavior --- .../executors/test_kubernetes_executor.py | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py index 7fb6f790d8792..cf0f7e5130c3d 100644 --- a/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py +++ b/providers/cncf/kubernetes/tests/unit/cncf/kubernetes/executors/test_kubernetes_executor.py @@ -396,9 +396,9 @@ def setup_method(self) -> None: pytest.param( HTTPResponse(body="Too many requests, please try again later.", status=429), 1, - False, - State.FAILED, - id="429 Too Many Requests (non-JSON body) (task_publish_max_retries=1)", + True, + State.SUCCESS, + id="429 Too Many Requests (non-JSON body) (task_publish_max_retries=1) (retry succeeded)", ), pytest.param( HTTPResponse(body="", status=429), @@ -407,6 +407,13 @@ def setup_method(self) -> None: State.FAILED, id="429 Too Many Requests (empty body)", ), + pytest.param( + HTTPResponse(body="", status=429), + 1, + True, + State.SUCCESS, + id="429 Too Many Requests (empty body) (task_publish_max_retries=1) (retry succeeded)", + ), ], ) @mock.patch("airflow.providers.cncf.kubernetes.executors.kubernetes_executor_utils.KubernetesJobWatcher") From 126133e55ae6ac64e8e43cca458e7e43a571721d Mon Sep 17 00:00:00 2001 From: HsiuChuanHsu Date: Tue, 2 Sep 2025 22:41:15 +0800 Subject: [PATCH 3/4] fix: Record history for orphaned tasks during K8s executor failures Move orphaned task detection before end_date assignment to ensure TaskInstanceHistory is recorded for tasks that become detached during scheduler restarts due to Kubernetes API 429 errors. --- .../src/airflow/models/taskinstance.py | 51 +++++----- .../tests/unit/models/test_taskinstance.py | 92 +++++++++++++++++++ 2 files changed, 121 insertions(+), 22 deletions(-) diff --git a/airflow-core/src/airflow/models/taskinstance.py b/airflow-core/src/airflow/models/taskinstance.py index d60c45969251b..09235fe9faf0a 100644 --- a/airflow-core/src/airflow/models/taskinstance.py +++ b/airflow-core/src/airflow/models/taskinstance.py @@ -1512,6 +1512,35 @@ def fetch_handle_failure_context( if not test_mode: ti.refresh_from_db(session) + # Check for orphaned task before setting end_date + if ( + ti.is_eligible_to_retry() + and ti.state is None + and ti.start_date is not None + and ti.end_date is None + ): + # If the task instance state is None but has a start_date without end_date, + # it likely means the task was running but became orphaned and its state was reset. + # This can happen during scheduler restarts when executors fail to adopt running tasks + # (e.g., due to Kubernetes API 429 errors). We should still record the task instance + # history to maintain complete log history for troubleshooting. + from airflow.models.taskinstancehistory import TaskInstanceHistory + + log.info( + "Recording task instance history for orphaned task %s that was previously running " + "(start_date: %s, state reset to None)", + ti.key, + ti.start_date, + ) + # Temporarily set state to RUNNING to trigger proper history recording + original_state = ti.state + ti.state = TaskInstanceState.RUNNING + try: + TaskInstanceHistory.record_ti(ti, session=session) + finally: + # Restore the original state + ti.state = original_state + ti.end_date = timezone.utcnow() ti.set_duration() @@ -1547,28 +1576,6 @@ def fetch_handle_failure_context( # about to retry so we record the task instance history. For other states, the task # instance was cleared and already recorded in the task instance history. ti.prepare_db_for_next_try(session) - elif ti.state is None and ti.start_date is not None and ti.end_date is None: - # If the task instance state is None but has a start_date without end_date, - # it likely means the task was running but became orphaned and its state was reset. - # This can happen during scheduler restarts when executors fail to adopt running tasks - # (e.g., due to Kubernetes API 429 errors). We should still record the task instance - # history to maintain complete log history for troubleshooting. - from airflow.models.taskinstancehistory import TaskInstanceHistory - - log.info( - "Recording task instance history for orphaned task %s that was previously running " - "(start_date: %s, state reset to None)", - ti.key, - ti.start_date, - ) - # Temporarily set state to RUNNING to trigger proper history recording - original_state = ti.state - ti.state = TaskInstanceState.RUNNING - try: - TaskInstanceHistory.record_ti(ti, session=session) - finally: - # Restore the original state - ti.state = original_state ti.state = State.UP_FOR_RETRY diff --git a/airflow-core/tests/unit/models/test_taskinstance.py b/airflow-core/tests/unit/models/test_taskinstance.py index fccac898d315b..72e00215810c9 100644 --- a/airflow-core/tests/unit/models/test_taskinstance.py +++ b/airflow-core/tests/unit/models/test_taskinstance.py @@ -2424,6 +2424,98 @@ def test_handle_failure_task_undefined(self, create_task_instance): del ti.task ti.handle_failure("test ti.task undefined") + @patch("airflow.models.taskinstancehistory.TaskInstanceHistory.record_ti") + def test_fetch_handle_failure_context_orphaned_task_records_history( + self, mock_record_ti, dag_maker, session + ): + """ + Test that orphaned tasks (state=None, start_date!=None, end_date=None) get their history recorded. + This scenario occurs when tasks are running but become orphaned due to executor failures + (e.g., Kubernetes API 429 errors causing scheduler restarts and pod adoption failures). + """ + with dag_maker(dag_id="test_orphaned_task"): + task = EmptyOperator(task_id="orphaned_task", retries=2) # Allow 2 retries + + dr = dag_maker.create_dagrun() + ti = dr.get_task_instance(task.task_id, session=session) + ti.task = task + + # Simulate an orphaned task: state=None but has start_date (was running) and no end_date + start_time = timezone.utcnow() - datetime.timedelta(minutes=5) + ti.state = None # State was reset during scheduler restart + ti.start_date = start_time # Task had started previously + ti.end_date = None # Task was still running when it became orphaned + ti.try_number = 1 # First attempt + ti.max_tries = 3 # 1 original + 2 retries = 3 total attempts + + session.merge(ti) + session.commit() + + # Call fetch_handle_failure_context which should detect and handle orphaned tasks + failure_context = TaskInstance.fetch_handle_failure_context( + ti=ti, + error="Test orphaned task error", + test_mode=False, + session=session, + fail_fast=False, + ) + + # Verify that TaskInstanceHistory.record_ti was called for the orphaned task + mock_record_ti.assert_called_once() + call_args = mock_record_ti.call_args + recorded_ti = call_args[0][0] # First positional argument (ti) + + # Verify the correct TaskInstance was recorded + assert recorded_ti.task_id == ti.task_id + assert recorded_ti.dag_id == ti.dag_id + assert recorded_ti.run_id == ti.run_id + assert recorded_ti.start_date == start_time + + # Verify the task instance state is set to UP_FOR_RETRY after failure handling + assert ti.state == State.UP_FOR_RETRY + assert failure_context["ti"] == ti + + @patch("airflow.models.taskinstancehistory.TaskInstanceHistory.record_ti") + def test_fetch_handle_failure_context_orphaned_task_without_start_date_no_history( + self, mock_record_ti, dag_maker, session + ): + """ + Test that tasks with state=None but no start_date do NOT trigger orphaned task history recording. + This ensures we only record history for tasks that were actually running. + """ + with dag_maker(dag_id="test_not_orphaned_task"): + task = EmptyOperator(task_id="not_orphaned_task", retries=2) + + dr = dag_maker.create_dagrun() + ti = dr.get_task_instance(task.task_id, session=session) + ti.task = task + + # Simulate a task that was never started: state=None and no start_date + ti.state = None + ti.start_date = None # Task never started + ti.end_date = None + ti.try_number = 1 + ti.max_tries = 3 # Allow retries + + session.merge(ti) + session.commit() + + # Call fetch_handle_failure_context + failure_context = TaskInstance.fetch_handle_failure_context( + ti=ti, + error="Test non-orphaned task error", + test_mode=False, + session=session, + fail_fast=False, + ) + + # Verify that TaskInstanceHistory.record_ti was NOT called + mock_record_ti.assert_not_called() + + # Verify the task instance state is set to UP_FOR_RETRY after failure handling + assert ti.state == State.UP_FOR_RETRY + assert failure_context["ti"] == ti + def test_handle_failure_fail_fast(self, dag_maker, session): start_date = timezone.datetime(2016, 6, 1) From 38ac7d13a5fb459ad50127b1b231c658119f0cf8 Mon Sep 17 00:00:00 2001 From: HsiuChuanHsu Date: Thu, 23 Oct 2025 09:03:48 +0800 Subject: [PATCH 4/4] Split original PR into cncf & taskinstance parts - Remove taskinstance part --- .../src/airflow/models/taskinstance.py | 29 ------ .../tests/unit/models/test_taskinstance.py | 92 ------------------- 2 files changed, 121 deletions(-) diff --git a/airflow-core/src/airflow/models/taskinstance.py b/airflow-core/src/airflow/models/taskinstance.py index 09235fe9faf0a..65c24ec687b65 100644 --- a/airflow-core/src/airflow/models/taskinstance.py +++ b/airflow-core/src/airflow/models/taskinstance.py @@ -1512,35 +1512,6 @@ def fetch_handle_failure_context( if not test_mode: ti.refresh_from_db(session) - # Check for orphaned task before setting end_date - if ( - ti.is_eligible_to_retry() - and ti.state is None - and ti.start_date is not None - and ti.end_date is None - ): - # If the task instance state is None but has a start_date without end_date, - # it likely means the task was running but became orphaned and its state was reset. - # This can happen during scheduler restarts when executors fail to adopt running tasks - # (e.g., due to Kubernetes API 429 errors). We should still record the task instance - # history to maintain complete log history for troubleshooting. - from airflow.models.taskinstancehistory import TaskInstanceHistory - - log.info( - "Recording task instance history for orphaned task %s that was previously running " - "(start_date: %s, state reset to None)", - ti.key, - ti.start_date, - ) - # Temporarily set state to RUNNING to trigger proper history recording - original_state = ti.state - ti.state = TaskInstanceState.RUNNING - try: - TaskInstanceHistory.record_ti(ti, session=session) - finally: - # Restore the original state - ti.state = original_state - ti.end_date = timezone.utcnow() ti.set_duration() diff --git a/airflow-core/tests/unit/models/test_taskinstance.py b/airflow-core/tests/unit/models/test_taskinstance.py index 72e00215810c9..fccac898d315b 100644 --- a/airflow-core/tests/unit/models/test_taskinstance.py +++ b/airflow-core/tests/unit/models/test_taskinstance.py @@ -2424,98 +2424,6 @@ def test_handle_failure_task_undefined(self, create_task_instance): del ti.task ti.handle_failure("test ti.task undefined") - @patch("airflow.models.taskinstancehistory.TaskInstanceHistory.record_ti") - def test_fetch_handle_failure_context_orphaned_task_records_history( - self, mock_record_ti, dag_maker, session - ): - """ - Test that orphaned tasks (state=None, start_date!=None, end_date=None) get their history recorded. - This scenario occurs when tasks are running but become orphaned due to executor failures - (e.g., Kubernetes API 429 errors causing scheduler restarts and pod adoption failures). - """ - with dag_maker(dag_id="test_orphaned_task"): - task = EmptyOperator(task_id="orphaned_task", retries=2) # Allow 2 retries - - dr = dag_maker.create_dagrun() - ti = dr.get_task_instance(task.task_id, session=session) - ti.task = task - - # Simulate an orphaned task: state=None but has start_date (was running) and no end_date - start_time = timezone.utcnow() - datetime.timedelta(minutes=5) - ti.state = None # State was reset during scheduler restart - ti.start_date = start_time # Task had started previously - ti.end_date = None # Task was still running when it became orphaned - ti.try_number = 1 # First attempt - ti.max_tries = 3 # 1 original + 2 retries = 3 total attempts - - session.merge(ti) - session.commit() - - # Call fetch_handle_failure_context which should detect and handle orphaned tasks - failure_context = TaskInstance.fetch_handle_failure_context( - ti=ti, - error="Test orphaned task error", - test_mode=False, - session=session, - fail_fast=False, - ) - - # Verify that TaskInstanceHistory.record_ti was called for the orphaned task - mock_record_ti.assert_called_once() - call_args = mock_record_ti.call_args - recorded_ti = call_args[0][0] # First positional argument (ti) - - # Verify the correct TaskInstance was recorded - assert recorded_ti.task_id == ti.task_id - assert recorded_ti.dag_id == ti.dag_id - assert recorded_ti.run_id == ti.run_id - assert recorded_ti.start_date == start_time - - # Verify the task instance state is set to UP_FOR_RETRY after failure handling - assert ti.state == State.UP_FOR_RETRY - assert failure_context["ti"] == ti - - @patch("airflow.models.taskinstancehistory.TaskInstanceHistory.record_ti") - def test_fetch_handle_failure_context_orphaned_task_without_start_date_no_history( - self, mock_record_ti, dag_maker, session - ): - """ - Test that tasks with state=None but no start_date do NOT trigger orphaned task history recording. - This ensures we only record history for tasks that were actually running. - """ - with dag_maker(dag_id="test_not_orphaned_task"): - task = EmptyOperator(task_id="not_orphaned_task", retries=2) - - dr = dag_maker.create_dagrun() - ti = dr.get_task_instance(task.task_id, session=session) - ti.task = task - - # Simulate a task that was never started: state=None and no start_date - ti.state = None - ti.start_date = None # Task never started - ti.end_date = None - ti.try_number = 1 - ti.max_tries = 3 # Allow retries - - session.merge(ti) - session.commit() - - # Call fetch_handle_failure_context - failure_context = TaskInstance.fetch_handle_failure_context( - ti=ti, - error="Test non-orphaned task error", - test_mode=False, - session=session, - fail_fast=False, - ) - - # Verify that TaskInstanceHistory.record_ti was NOT called - mock_record_ti.assert_not_called() - - # Verify the task instance state is set to UP_FOR_RETRY after failure handling - assert ti.state == State.UP_FOR_RETRY - assert failure_context["ti"] == ti - def test_handle_failure_fail_fast(self, dag_maker, session): start_date = timezone.datetime(2016, 6, 1)