Fix task-level audit logs missing success/running events in Airflow 3.1.x#61932
Fix task-level audit logs missing success/running events in Airflow 3.1.x#61932sjyangkevin wants to merge 7 commits intoapache:mainfrom
Conversation
amoghrajesh
left a comment
There was a problem hiding this comment.
Thanks for taking this on @sjyangkevin, some comments
airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py
Outdated
Show resolved
Hide resolved
airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py
Outdated
Show resolved
Hide resolved
airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py
Show resolved
Hide resolved
airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py
Outdated
Show resolved
Hide resolved
airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py
Outdated
Show resolved
Hide resolved
airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Hi @amoghrajesh , thanks for the feedback!
I wonder if I should create dedicated tests for those situations (e.g., on duplicate requests, conflict) instead of putting them into the existing tests. I feel those might be good to catch.
I will parameterize the payload and probably task instance param to reduce duplication in the state change test case.
Also, will check how to handle the e2e test.
Thanks!
eec8eee to
1d181d4
Compare
|
I've cleaned up the test cases, and merge them into a single parameterized one. I feel we don't actually need to set up the Still trying to understand the end-to-end testing failure as I am not very familiar with the context. At high-level understanding, it seems like the test expect the Thanks! |
02bf1a9 to
2fc81f4
Compare
7c74399 to
02f06df
Compare
There was a problem hiding this comment.
I updated the query to join with DagModel and DagRun to fetch logical_date and owner, and update with_for_update to lock only TI in the join query. Construct a JSON string only with host_name in extra, which is also present in earlier version of Airflow.
| assert ti.state == expected_state | ||
| assert ti.end_date == end_date | ||
|
|
||
| @pytest.mark.parametrize( |
There was a problem hiding this comment.
Consolidate all the test cases into a parameterized one.
| mock.Mock( | ||
| one=lambda: ( | ||
| "running", | ||
| 1, | ||
| 0, | ||
| "dag", | ||
| "task", | ||
| "run", | ||
| -1, | ||
| "localhost", | ||
| timezone.utcnow(), | ||
| "test_owner", | ||
| ) | ||
| ), # First call returns "queued" | ||
| mock.Mock( | ||
| one=lambda: ( | ||
| "running", | ||
| 1, | ||
| 0, | ||
| "dag", | ||
| "task", | ||
| "run", | ||
| -1, | ||
| "localhost", | ||
| timezone.utcnow(), | ||
| "test_owner", | ||
| ) | ||
| ), # Second call returns "queued" |
There was a problem hiding this comment.
because the TI query return more values, update the mock.
e453054 to
a77b2d1
Compare
get latest changes from main to remove sorting test
…it dag run cause timeout
a77b2d1 to
9d89ebe
Compare
|
LGTM, I would need more maintainers to confirm though |
closes: #58381
Issue
In Airflow 2, task instance state transition such as
RUNNING,SUCCESS,SKIPPED,FAILEDwere logged into thelogtable through theTaskInstancemodel methods. In Airflow 3, the task state update/management is moved to the execution API endpointsti_runandti_update_state. However, the endpoints were not wired up to createlogrecords, resulting in missing audit logs for task instance state transitions.Fix
Added
session.add(Log(...))calls to bothti_runandti_update_statewhen the task instance state is updated. The call will be executed in the same transaction as the state update in both endpoints.In
ti_run, the audit log is placed inside theelsebranch such that duplicated requests, or conflict request, due to network glitch, will not be logged.In
ti_update_state, theTIselect query is updated to fetchtask_id,run_id, andmap_index.Caveat
The following fields in the
logtable are missing. Require extra query/join to fetch the following information.logical_date(previouslyexecution_date)owner(the value isairflowin Airflow 2)owner_display_name(this field is also empty for task state transition)extra(full_commandwill not be available as it is not run/update through CLI;hostnameis not available inti_update_state)EmptyOperator or operators skipped by branch doesn't have audit log
DAG Test Samples
Edited on Feb 27, 2026
Gather data for
owner, andlogical_datefromDagModelandDagRun, and fillextrawithhost_name.Was generative AI tooling used to co-author this PR?
Generated-by: [Antigravity] following the guidelines
The tool is used to analyze the existing test cases to understand the expected behavior of both
ti_runandti_update_state. It is also used to generate new test cases.{pr_number}.significant.rstor{issue_number}.significant.rst, in airflow-core/newsfragments.