From faf7708552ffbda1a839811a28c0576d3e5e136b Mon Sep 17 00:00:00 2001 From: Kevin Yang Date: Sat, 14 Feb 2026 21:42:56 -0500 Subject: [PATCH 1/7] create audit log records in ti_run and ti_update_state --- .../execution_api/routes/task_instances.py | 26 ++- .../versions/head/test_task_instances.py | 192 +++++++++++++++++- 2 files changed, 215 insertions(+), 3 deletions(-) diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py index f22d7c125853d..d86a0189a268e 100644 --- a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py +++ b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py @@ -64,6 +64,7 @@ from airflow.models.asset import AssetActive from airflow.models.dag import DagModel from airflow.models.dagrun import DagRun as DR +from airflow.models.log import Log from airflow.models.taskinstance import TaskInstance as TI, _stop_remaining_tasks from airflow.models.taskreschedule import TaskReschedule from airflow.models.trigger import Trigger @@ -195,6 +196,16 @@ def ti_run( ) else: log.info("Task started", previous_state=previous_state, hostname=ti_run_payload.hostname) + session.add( + Log( + event=TaskInstanceState.RUNNING.value, + task_id=ti.task_id, + dag_id=ti.dag_id, + run_id=ti.run_id, + map_index=ti.map_index, + try_number=ti.try_number, + ) + ) # Ensure there is no end date set. query = query.values( end_date=None, @@ -297,7 +308,7 @@ def ti_update_state( log.debug("Updating task instance state", new_state=ti_patch_payload.state) old = ( - select(TI.state, TI.try_number, TI.max_tries, TI.dag_id) + select(TI.state, TI.try_number, TI.max_tries, TI.dag_id, TI.task_id, TI.run_id, TI.map_index) .where(TI.id == task_instance_id) .with_for_update() ) @@ -307,6 +318,9 @@ def ti_update_state( try_number, max_tries, dag_id, + task_id, + run_id, + map_index, ) = session.execute(old).one() log.debug( "Retrieved current task instance state", @@ -373,6 +387,16 @@ def ti_update_state( new_state=updated_state, rows_affected=getattr(result, "rowcount", 0), ) + session.add( + Log( + event=updated_state.value, + task_id=task_id, + dag_id=dag_id, + run_id=run_id, + map_index=map_index, + try_number=try_number, + ) + ) except SQLAlchemyError as e: log.error("Error updating Task Instance state", error=str(e)) raise HTTPException( diff --git a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py index ea1153f01cba5..f93b215bad3c7 100644 --- a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py +++ b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py @@ -34,6 +34,7 @@ from airflow.exceptions import AirflowSkipException from airflow.models import RenderedTaskInstanceFields, TaskReschedule, Trigger from airflow.models.asset import AssetActive, AssetAliasModel, AssetEvent, AssetModel +from airflow.models.log import Log from airflow.models.taskinstance import TaskInstance from airflow.models.taskinstancehistory import TaskInstanceHistory from airflow.providers.standard.operators.empty import EmptyOperator @@ -44,6 +45,7 @@ from tests_common.test_utils.db import ( clear_db_assets, clear_db_dags, + clear_db_logs, clear_db_runs, clear_db_serialized_dags, clear_rendered_ti_fields, @@ -127,11 +129,13 @@ def side_effect(cred, validators): class TestTIRunState: def setup_method(self): + clear_db_logs() clear_db_runs() clear_db_serialized_dags() clear_db_dags() def teardown_method(self): + clear_db_logs() clear_db_runs() clear_db_serialized_dags() clear_db_dags() @@ -248,6 +252,13 @@ def test_ti_run_state_to_running( ) assert response.status_code == 409 + # Test that no audit log was created on the second request when resulting in conflict + logs = session.scalars(select(Log).where(Log.dag_id == ti.dag_id)).all() + assert len(logs) == 1 + assert logs[0].event == TaskInstanceState.RUNNING.value + assert logs[0].task_id == ti.task_id + assert logs[0].run_id == ti.run_id + def test_dynamic_task_mapping_with_parse_time_value(self, client, dag_maker): """Test that dynamic task mapping works correctly with parse-time values.""" with dag_maker("test_dynamic_task_mapping_with_parse_time_value", serialized=True): @@ -684,6 +695,9 @@ def test_ti_run_state_conflict_if_not_queued( assert session.scalar(select(TaskInstance.state).where(TaskInstance.id == ti.id)) == initial_ti_state + # Test that no audit log was created on conflict + assert session.scalars(select(Log)).all() == [] + def test_xcom_not_cleared_for_deferral(self, client, session, create_task_instance, time_machine): """ Test that the xcoms are not cleared when the Task Instance state is re-running after deferral. @@ -793,14 +807,54 @@ def test_ti_run_with_triggering_user_name( assert dag_run["run_id"] == "test" assert dag_run["state"] == "running" + def test_ti_run_creates_audit_log(self, client, session, create_task_instance, time_machine): + """Test that transitioning to RUNNING creates an audit log record.""" + instant_str = "2024-09-30T12:00:00Z" + instant = timezone.parse(instant_str) + time_machine.move_to(instant, tick=False) + + ti = create_task_instance( + task_id="test_ti_run_creates_audit_log", + state=State.QUEUED, + dagrun_state=DagRunState.RUNNING, + session=session, + start_date=instant, + dag_id=str(uuid4()), + ) + session.commit() + + response = client.patch( + f"/execution/task-instances/{ti.id}/run", + json={ + "state": "running", + "hostname": "random-hostname", + "unixname": "random-unixname", + "pid": 100, + "start_date": instant_str, + }, + ) + + assert response.status_code == 200 + + logs = session.scalars(select(Log).where(Log.dag_id == ti.dag_id)).all() + assert len(logs) == 1 + assert logs[0].event == TaskInstanceState.RUNNING.value + assert logs[0].task_id == ti.task_id + assert logs[0].dag_id == ti.dag_id + assert logs[0].run_id == ti.run_id + assert logs[0].map_index == ti.map_index + assert logs[0].try_number == ti.try_number + class TestTIUpdateState: def setup_method(self): clear_db_assets() + clear_db_logs() clear_db_runs() def teardown_method(self): clear_db_assets() + clear_db_logs() clear_db_runs() @pytest.mark.parametrize( @@ -838,6 +892,127 @@ def test_ti_update_state_to_terminal( assert ti.state == expected_state assert ti.end_date == end_date + @pytest.mark.parametrize( + ("state", "end_date"), + [ + (State.SUCCESS, DEFAULT_END_DATE), + (State.FAILED, DEFAULT_END_DATE), + (State.SKIPPED, DEFAULT_END_DATE), + ], + ) + def test_ti_update_state_creates_audit_log(self, client, session, create_task_instance, state, end_date): + """Test that transitioning to a terminal state creates an audit log record.""" + ti = create_task_instance( + task_id="test_ti_update_state_creates_audit_log", + start_date=DEFAULT_START_DATE, + state=State.RUNNING, + ) + session.commit() + + response = client.patch( + f"/execution/task-instances/{ti.id}/state", + json={ + "state": state, + "end_date": end_date.isoformat(), + }, + ) + + assert response.status_code == 204 + + logs = session.scalars(select(Log).where(Log.dag_id == ti.dag_id)).all() + assert len(logs) == 1 + assert logs[0].event == state + assert logs[0].task_id == ti.task_id + assert logs[0].dag_id == ti.dag_id + assert logs[0].run_id == ti.run_id + assert logs[0].map_index == ti.map_index + assert logs[0].try_number == ti.try_number + + def test_ti_update_state_to_deferred_creates_audit_log( + self, client, session, create_task_instance, time_machine + ): + """Test that transitioning to DEFERRED creates an audit log record.""" + ti = create_task_instance( + task_id="test_ti_update_state_to_deferred_creates_audit_log", + state=State.RUNNING, + session=session, + ) + session.commit() + + instant = timezone.datetime(2024, 11, 22) + time_machine.move_to(instant, tick=False) + + payload = { + "state": "deferred", + "trigger_kwargs": {"key": "value", "moment": "2024-12-18T00:00:00Z"}, + "trigger_timeout": "P1D", + "classpath": "my-classpath", + "next_method": "execute_callback", + } + + response = client.patch(f"/execution/task-instances/{ti.id}/state", json=payload) + assert response.status_code == 204 + + logs = session.scalars(select(Log).where(Log.dag_id == ti.dag_id)).all() + assert len(logs) == 1 + assert logs[0].event == TaskInstanceState.DEFERRED.value + assert logs[0].task_id == ti.task_id + assert logs[0].dag_id == ti.dag_id + + def test_ti_update_state_to_reschedule_creates_audit_log( + self, client, session, create_task_instance, time_machine + ): + """Test that transitioning to UP_FOR_RESCHEDULE creates an audit log record.""" + instant = timezone.datetime(2024, 10, 30) + time_machine.move_to(instant, tick=False) + + ti = create_task_instance( + task_id="test_ti_update_state_to_reschedule_creates_audit_log", + state=State.RUNNING, + session=session, + ) + ti.start_date = instant + session.commit() + + payload = { + "state": "up_for_reschedule", + "reschedule_date": "2024-10-31T11:03:00+00:00", + "end_date": DEFAULT_END_DATE.isoformat(), + } + + response = client.patch(f"/execution/task-instances/{ti.id}/state", json=payload) + assert response.status_code == 204 + + logs = session.scalars(select(Log).where(Log.dag_id == ti.dag_id)).all() + assert len(logs) == 1 + assert logs[0].event == TaskInstanceState.UP_FOR_RESCHEDULE.value + assert logs[0].task_id == ti.task_id + assert logs[0].dag_id == ti.dag_id + + def test_ti_update_state_to_retry_creates_audit_log(self, client, session, create_task_instance): + """Test that transitioning to UP_FOR_RETRY creates an audit log record.""" + ti = create_task_instance( + task_id="test_ti_update_state_to_retry_creates_audit_log", + state=State.RUNNING, + ) + session.commit() + + response = client.patch( + f"/execution/task-instances/{ti.id}/state", + json={ + "state": State.UP_FOR_RETRY, + "end_date": DEFAULT_END_DATE.isoformat(), + }, + ) + + assert response.status_code == 204 + + logs = session.scalars(select(Log).where(Log.dag_id == ti.dag_id)).all() + assert len(logs) == 1 + assert logs[0].event == TaskInstanceState.UP_FOR_RETRY.value + assert logs[0].task_id == ti.task_id + assert logs[0].dag_id == ti.dag_id + @pytest.mark.parametrize( ("state", "end_date", "expected_state", "rendered_map_index"), [ @@ -1026,6 +1201,9 @@ def test_ti_update_state_not_found(self, client, session): "message": "Task Instance not found", } + # Test that no audit log was created when TI not found + assert session.scalars(select(Log)).all() == [] + def test_ti_update_state_running_errors(self, client, session, create_task_instance, time_machine): """ Test that a 422 error is returned when the Task Instance state is RUNNING in the payload. @@ -1063,8 +1241,12 @@ def test_ti_update_state_database_error(self, client, session, create_task_insta mock.patch( "airflow.api_fastapi.common.db.common.Session.execute", side_effect=[ - mock.Mock(one=lambda: ("running", 1, 0, "dag")), # First call returns "queued" - mock.Mock(one=lambda: ("running", 1, 0, "dag")), # Second call returns "queued" + mock.Mock( + one=lambda: ("running", 1, 0, "dag", "task", "run", -1) + ), # First call returns "queued" + mock.Mock( + one=lambda: ("running", 1, 0, "dag", "task", "run", -1) + ), # Second call returns "queued" SQLAlchemyError("Database error"), # Last call raises an error ], ), @@ -1077,6 +1259,9 @@ def test_ti_update_state_database_error(self, client, session, create_task_insta assert response.status_code == 500 assert response.json()["detail"] == "Database error occurred" + # Test that no audit log was created when database error occurred + assert session.scalars(select(Log)).all() == [] + @pytest.mark.parametrize("queues_enabled", [False, True]) def test_ti_update_state_to_deferred( self, client, session, create_task_instance, time_machine, queues_enabled: bool @@ -1396,6 +1581,9 @@ def test_ti_update_state_not_running(self, client, session, create_task_instance session.refresh(ti) assert ti.state == State.SUCCESS + # Test that no audit log was created when TI state is not RUNNING + assert session.scalars(select(Log)).all() == [] + def test_ti_update_state_to_failed_without_fail_fast(self, client, session, dag_maker): """Test that SerializedDAG is NOT loaded when fail_fast=False (default).""" with dag_maker(dag_id="test_dag_no_fail_fast", serialized=True): From 37424cc5595f4731371abf3350cd546a11c50b19 Mon Sep 17 00:00:00 2001 From: Kevin Yang Date: Wed, 18 Feb 2026 23:20:18 -0500 Subject: [PATCH 2/7] cleanup and merge test cases --- .../versions/head/test_task_instances.py | 162 +++++------------- 1 file changed, 47 insertions(+), 115 deletions(-) diff --git a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py index f93b215bad3c7..ec991e6fedffb 100644 --- a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py +++ b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py @@ -252,13 +252,6 @@ def test_ti_run_state_to_running( ) assert response.status_code == 409 - # Test that no audit log was created on the second request when resulting in conflict - logs = session.scalars(select(Log).where(Log.dag_id == ti.dag_id)).all() - assert len(logs) == 1 - assert logs[0].event == TaskInstanceState.RUNNING.value - assert logs[0].task_id == ti.task_id - assert logs[0].run_id == ti.run_id - def test_dynamic_task_mapping_with_parse_time_value(self, client, dag_maker): """Test that dynamic task mapping works correctly with parse-time values.""" with dag_maker("test_dynamic_task_mapping_with_parse_time_value", serialized=True): @@ -695,9 +688,6 @@ def test_ti_run_state_conflict_if_not_queued( assert session.scalar(select(TaskInstance.state).where(TaskInstance.id == ti.id)) == initial_ti_state - # Test that no audit log was created on conflict - assert session.scalars(select(Log)).all() == [] - def test_xcom_not_cleared_for_deferral(self, client, session, create_task_instance, time_machine): """ Test that the xcoms are not cleared when the Task Instance state is re-running after deferral. @@ -893,15 +883,54 @@ def test_ti_update_state_to_terminal( assert ti.end_date == end_date @pytest.mark.parametrize( - ("state", "end_date"), + ("payload", "expected_event"), [ - (State.SUCCESS, DEFAULT_END_DATE), - (State.FAILED, DEFAULT_END_DATE), - (State.SKIPPED, DEFAULT_END_DATE), + pytest.param( + {"state": State.SUCCESS, "end_date": DEFAULT_END_DATE.isoformat()}, + State.SUCCESS, + id="success", + ), + pytest.param( + {"state": State.FAILED, "end_date": DEFAULT_END_DATE.isoformat()}, + State.FAILED, + id="failed", + ), + pytest.param( + {"state": State.SKIPPED, "end_date": DEFAULT_END_DATE.isoformat()}, + State.SKIPPED, + id="skipped", + ), + pytest.param( + {"state": State.UP_FOR_RETRY, "end_date": DEFAULT_END_DATE.isoformat()}, + TaskInstanceState.UP_FOR_RETRY.value, + id="up_for_retry", + ), + pytest.param( + { + "state": "deferred", + "trigger_kwargs": {"key": "value", "moment": "2026-02-18T00:00:00Z"}, + "trigger_timeout": "P1D", + "classpath": "my-classpath", + "next_method": "execute_callback", + }, + TaskInstanceState.DEFERRED.value, + id="deferred", + ), + pytest.param( + { + "state": "up_for_reschedule", + "reschedule_date": "2026-02-18T11:03:00+00:00", + "end_date": DEFAULT_END_DATE.isoformat(), + }, + TaskInstanceState.UP_FOR_RESCHEDULE.value, + id="up_for_reschedule", + ), ], ) - def test_ti_update_state_creates_audit_log(self, client, session, create_task_instance, state, end_date): - """Test that transitioning to a terminal state creates an audit log record.""" + def test_ti_update_state_creates_audit_log( + self, client, session, create_task_instance, payload, expected_event + ): + """Test that state transition creates an audit log record.""" ti = create_task_instance( task_id="test_ti_update_state_creates_audit_log", start_date=DEFAULT_START_DATE, @@ -911,108 +940,20 @@ def test_ti_update_state_creates_audit_log(self, client, session, create_task_in response = client.patch( f"/execution/task-instances/{ti.id}/state", - json={ - "state": state, - "end_date": end_date.isoformat(), - }, + json=payload, ) assert response.status_code == 204 logs = session.scalars(select(Log).where(Log.dag_id == ti.dag_id)).all() assert len(logs) == 1 - assert logs[0].event == state + assert logs[0].event == expected_event assert logs[0].task_id == ti.task_id assert logs[0].dag_id == ti.dag_id assert logs[0].run_id == ti.run_id assert logs[0].map_index == ti.map_index assert logs[0].try_number == ti.try_number - def test_ti_update_state_to_deferred_creates_audit_log( - self, client, session, create_task_instance, time_machine - ): - """Test that transitioning to DEFERRED creates an audit log record.""" - ti = create_task_instance( - task_id="test_ti_update_state_to_deferred_creates_audit_log", - state=State.RUNNING, - session=session, - ) - session.commit() - - instant = timezone.datetime(2024, 11, 22) - time_machine.move_to(instant, tick=False) - - payload = { - "state": "deferred", - "trigger_kwargs": {"key": "value", "moment": "2024-12-18T00:00:00Z"}, - "trigger_timeout": "P1D", - "classpath": "my-classpath", - "next_method": "execute_callback", - } - - response = client.patch(f"/execution/task-instances/{ti.id}/state", json=payload) - assert response.status_code == 204 - - logs = session.scalars(select(Log).where(Log.dag_id == ti.dag_id)).all() - assert len(logs) == 1 - assert logs[0].event == TaskInstanceState.DEFERRED.value - assert logs[0].task_id == ti.task_id - assert logs[0].dag_id == ti.dag_id - - def test_ti_update_state_to_reschedule_creates_audit_log( - self, client, session, create_task_instance, time_machine - ): - """Test that transitioning to UP_FOR_RESCHEDULE creates an audit log record.""" - instant = timezone.datetime(2024, 10, 30) - time_machine.move_to(instant, tick=False) - - ti = create_task_instance( - task_id="test_ti_update_state_to_reschedule_creates_audit_log", - state=State.RUNNING, - session=session, - ) - ti.start_date = instant - session.commit() - - payload = { - "state": "up_for_reschedule", - "reschedule_date": "2024-10-31T11:03:00+00:00", - "end_date": DEFAULT_END_DATE.isoformat(), - } - - response = client.patch(f"/execution/task-instances/{ti.id}/state", json=payload) - assert response.status_code == 204 - - logs = session.scalars(select(Log).where(Log.dag_id == ti.dag_id)).all() - assert len(logs) == 1 - assert logs[0].event == TaskInstanceState.UP_FOR_RESCHEDULE.value - assert logs[0].task_id == ti.task_id - assert logs[0].dag_id == ti.dag_id - - def test_ti_update_state_to_retry_creates_audit_log(self, client, session, create_task_instance): - """Test that transitioning to UP_FOR_RETRY creates an audit log record.""" - ti = create_task_instance( - task_id="test_ti_update_state_to_retry_creates_audit_log", - state=State.RUNNING, - ) - session.commit() - - response = client.patch( - f"/execution/task-instances/{ti.id}/state", - json={ - "state": State.UP_FOR_RETRY, - "end_date": DEFAULT_END_DATE.isoformat(), - }, - ) - - assert response.status_code == 204 - - logs = session.scalars(select(Log).where(Log.dag_id == ti.dag_id)).all() - assert len(logs) == 1 - assert logs[0].event == TaskInstanceState.UP_FOR_RETRY.value - assert logs[0].task_id == ti.task_id - assert logs[0].dag_id == ti.dag_id - @pytest.mark.parametrize( ("state", "end_date", "expected_state", "rendered_map_index"), [ @@ -1201,9 +1142,6 @@ def test_ti_update_state_not_found(self, client, session): "message": "Task Instance not found", } - # Test that no audit log was created when TI not found - assert session.scalars(select(Log)).all() == [] - def test_ti_update_state_running_errors(self, client, session, create_task_instance, time_machine): """ Test that a 422 error is returned when the Task Instance state is RUNNING in the payload. @@ -1259,9 +1197,6 @@ def test_ti_update_state_database_error(self, client, session, create_task_insta assert response.status_code == 500 assert response.json()["detail"] == "Database error occurred" - # Test that no audit log was created when database error occurred - assert session.scalars(select(Log)).all() == [] - @pytest.mark.parametrize("queues_enabled", [False, True]) def test_ti_update_state_to_deferred( self, client, session, create_task_instance, time_machine, queues_enabled: bool @@ -1581,9 +1516,6 @@ def test_ti_update_state_not_running(self, client, session, create_task_instance session.refresh(ti) assert ti.state == State.SUCCESS - # Test that no audit log was created when TI state is not RUNNING - assert session.scalars(select(Log)).all() == [] - def test_ti_update_state_to_failed_without_fail_fast(self, client, session, dag_maker): """Test that SerializedDAG is NOT loaded when fail_fast=False (default).""" with dag_maker(dag_id="test_dag_no_fail_fast", serialized=True): From 49cb7dbe70e51d8e1bbf61f75058eea07711c9a9 Mon Sep 17 00:00:00 2001 From: Kevin Yang Date: Fri, 20 Feb 2026 00:00:08 -0500 Subject: [PATCH 3/7] retrive from dag and from dag run, and add extra with only host_name --- .../execution_api/routes/task_instances.py | 35 ++++++++++++++++--- .../versions/head/test_task_instances.py | 33 +++++++++++++++-- 2 files changed, 62 insertions(+), 6 deletions(-) diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py index d86a0189a268e..e963770b240e0 100644 --- a/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py +++ b/airflow-core/src/airflow/api_fastapi/execution_api/routes/task_instances.py @@ -30,7 +30,7 @@ from cadwyn import VersionedAPIRouter from fastapi import Body, HTTPException, Query, status from pydantic import JsonValue -from sqlalchemy import func, or_, tuple_, update +from sqlalchemy import and_, func, or_, tuple_, update from sqlalchemy.engine import CursorResult from sqlalchemy.exc import NoResultFound, SQLAlchemyError from sqlalchemy.orm import joinedload @@ -137,10 +137,14 @@ def ti_run( # This selects the raw JSON value, by-passing the deserialization -- we want that to happen on the # client column("next_kwargs", JSON), + DR.logical_date, + DagModel.owners, ) .select_from(TI) + .join(DR, and_(TI.dag_id == DR.dag_id, TI.run_id == DR.run_id)) + .join(DagModel, TI.dag_id == DagModel.dag_id) .where(TI.id == task_instance_id) - .with_for_update() + .with_for_update(of=TI) ) try: ti = session.execute(old).one() @@ -204,6 +208,9 @@ def ti_run( run_id=ti.run_id, map_index=ti.map_index, try_number=ti.try_number, + logical_date=ti.logical_date, + owner=ti.owners, + extra=json.dumps({"host_name": ti_run_payload.hostname}) if ti_run_payload.hostname else None, ) ) # Ensure there is no end date set. @@ -308,9 +315,23 @@ def ti_update_state( log.debug("Updating task instance state", new_state=ti_patch_payload.state) old = ( - select(TI.state, TI.try_number, TI.max_tries, TI.dag_id, TI.task_id, TI.run_id, TI.map_index) + select( + TI.state, + TI.try_number, + TI.max_tries, + TI.dag_id, + TI.task_id, + TI.run_id, + TI.map_index, + TI.hostname, + DR.logical_date, + DagModel.owners, + ) + .select_from(TI) + .join(DR, and_(TI.dag_id == DR.dag_id, TI.run_id == DR.run_id)) + .join(DagModel, TI.dag_id == DagModel.dag_id) .where(TI.id == task_instance_id) - .with_for_update() + .with_for_update(of=TI) ) try: ( @@ -321,6 +342,9 @@ def ti_update_state( task_id, run_id, map_index, + hostname, + logical_date, + owners, ) = session.execute(old).one() log.debug( "Retrieved current task instance state", @@ -395,6 +419,9 @@ def ti_update_state( run_id=run_id, map_index=map_index, try_number=try_number, + logical_date=logical_date, + owner=owners, + extra=json.dumps({"host_name": hostname}) if hostname else None, ) ) except SQLAlchemyError as e: diff --git a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py index ec991e6fedffb..0fe582cb921b3 100644 --- a/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py +++ b/airflow-core/tests/unit/api_fastapi/execution_api/versions/head/test_task_instances.py @@ -834,6 +834,9 @@ def test_ti_run_creates_audit_log(self, client, session, create_task_instance, t assert logs[0].run_id == ti.run_id assert logs[0].map_index == ti.map_index assert logs[0].try_number == ti.try_number + assert logs[0].logical_date == instant + assert logs[0].owner == ti.task.owner + assert logs[0].extra == '{"host_name": "random-hostname"}' class TestTIUpdateState: @@ -935,6 +938,7 @@ def test_ti_update_state_creates_audit_log( task_id="test_ti_update_state_creates_audit_log", start_date=DEFAULT_START_DATE, state=State.RUNNING, + hostname="random-hostname", ) session.commit() @@ -953,6 +957,9 @@ def test_ti_update_state_creates_audit_log( assert logs[0].run_id == ti.run_id assert logs[0].map_index == ti.map_index assert logs[0].try_number == ti.try_number + assert logs[0].logical_date == ti.dag_run.logical_date + assert logs[0].owner == ti.task.owner + assert logs[0].extra == '{"host_name": "random-hostname"}' @pytest.mark.parametrize( ("state", "end_date", "expected_state", "rendered_map_index"), @@ -1180,10 +1187,32 @@ def test_ti_update_state_database_error(self, client, session, create_task_insta "airflow.api_fastapi.common.db.common.Session.execute", side_effect=[ mock.Mock( - one=lambda: ("running", 1, 0, "dag", "task", "run", -1) + one=lambda: ( + "running", + 1, + 0, + "dag", + "task", + "run", + -1, + "localhost", + timezone.utcnow(), + "test_owner", + ) ), # First call returns "queued" mock.Mock( - one=lambda: ("running", 1, 0, "dag", "task", "run", -1) + one=lambda: ( + "running", + 1, + 0, + "dag", + "task", + "run", + -1, + "localhost", + timezone.utcnow(), + "test_owner", + ) ), # Second call returns "queued" SQLAlchemyError("Database error"), # Last call raises an error ], From c2e17371294a024555b7a7d300125976bfae5b4b Mon Sep 17 00:00:00 2001 From: Kevin Yang Date: Wed, 25 Feb 2026 12:08:41 -0500 Subject: [PATCH 4/7] update ui end-to-end test to be more stable get latest changes from main to remove sorting test --- .../src/airflow/ui/tests/e2e/specs/dag-audit-log.spec.ts | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/airflow-core/src/airflow/ui/tests/e2e/specs/dag-audit-log.spec.ts b/airflow-core/src/airflow/ui/tests/e2e/specs/dag-audit-log.spec.ts index 6b6989ad1a8c1..a3986e60c9dbf 100644 --- a/airflow-core/src/airflow/ui/tests/e2e/specs/dag-audit-log.spec.ts +++ b/airflow-core/src/airflow/ui/tests/e2e/specs/dag-audit-log.spec.ts @@ -22,10 +22,13 @@ import { DagsPage } from "tests/e2e/pages/DagsPage"; import { EventsPage } from "tests/e2e/pages/EventsPage"; test.describe("DAG Audit Log", () => { + // Serial mode ensures all tests run on one worker, preventing parallel beforeAll conflicts + test.describe.configure({ mode: "serial" }); + let eventsPage: EventsPage; const testDagId = testConfig.testDag.id; - const triggerCount = 3; + const triggerCount = 2; const expectedEventCount = triggerCount + 1; test.setTimeout(60_000); @@ -38,7 +41,9 @@ test.describe("DAG Audit Log", () => { const setupEventsPage = new EventsPage(page); for (let i = 0; i < triggerCount; i++) { - await setupDagsPage.triggerDag(testDagId); + const dagRunId = await setupDagsPage.triggerDag(testDagId); + + await setupDagsPage.verifyDagRunStatus(testDagId, dagRunId); } await setupEventsPage.navigateToAuditLog(testDagId); From 51c2b55dafbe6f8e115e3049b01720d5ab8dc71e Mon Sep 17 00:00:00 2001 From: Kevin Yang Date: Wed, 25 Feb 2026 12:24:05 -0500 Subject: [PATCH 5/7] clean up --- .../src/airflow/ui/tests/e2e/specs/dag-audit-log.spec.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/airflow-core/src/airflow/ui/tests/e2e/specs/dag-audit-log.spec.ts b/airflow-core/src/airflow/ui/tests/e2e/specs/dag-audit-log.spec.ts index a3986e60c9dbf..f5c788017e14f 100644 --- a/airflow-core/src/airflow/ui/tests/e2e/specs/dag-audit-log.spec.ts +++ b/airflow-core/src/airflow/ui/tests/e2e/specs/dag-audit-log.spec.ts @@ -22,7 +22,6 @@ import { DagsPage } from "tests/e2e/pages/DagsPage"; import { EventsPage } from "tests/e2e/pages/EventsPage"; test.describe("DAG Audit Log", () => { - // Serial mode ensures all tests run on one worker, preventing parallel beforeAll conflicts test.describe.configure({ mode: "serial" }); let eventsPage: EventsPage; From c013aaf683a9674a3859c2f63dcd6f82e0687057 Mon Sep 17 00:00:00 2001 From: Kevin Yang Date: Wed, 25 Feb 2026 20:55:52 -0500 Subject: [PATCH 6/7] remove serial config for tests --- .../src/airflow/ui/tests/e2e/specs/dag-audit-log.spec.ts | 2 -- 1 file changed, 2 deletions(-) diff --git a/airflow-core/src/airflow/ui/tests/e2e/specs/dag-audit-log.spec.ts b/airflow-core/src/airflow/ui/tests/e2e/specs/dag-audit-log.spec.ts index f5c788017e14f..c58562ae32c00 100644 --- a/airflow-core/src/airflow/ui/tests/e2e/specs/dag-audit-log.spec.ts +++ b/airflow-core/src/airflow/ui/tests/e2e/specs/dag-audit-log.spec.ts @@ -22,8 +22,6 @@ import { DagsPage } from "tests/e2e/pages/DagsPage"; import { EventsPage } from "tests/e2e/pages/EventsPage"; test.describe("DAG Audit Log", () => { - test.describe.configure({ mode: "serial" }); - let eventsPage: EventsPage; const testDagId = testConfig.testDag.id; From 9d89ebe32afa757ef444bb84591b5abc61af0257 Mon Sep 17 00:00:00 2001 From: Kevin Yang Date: Thu, 26 Feb 2026 21:31:38 -0500 Subject: [PATCH 7/7] revert change made to the UI end-to-end test as sequential trigger/wait dag run cause timeout --- .../src/airflow/ui/tests/e2e/specs/dag-audit-log.spec.ts | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/airflow-core/src/airflow/ui/tests/e2e/specs/dag-audit-log.spec.ts b/airflow-core/src/airflow/ui/tests/e2e/specs/dag-audit-log.spec.ts index c58562ae32c00..6b6989ad1a8c1 100644 --- a/airflow-core/src/airflow/ui/tests/e2e/specs/dag-audit-log.spec.ts +++ b/airflow-core/src/airflow/ui/tests/e2e/specs/dag-audit-log.spec.ts @@ -25,7 +25,7 @@ test.describe("DAG Audit Log", () => { let eventsPage: EventsPage; const testDagId = testConfig.testDag.id; - const triggerCount = 2; + const triggerCount = 3; const expectedEventCount = triggerCount + 1; test.setTimeout(60_000); @@ -38,9 +38,7 @@ test.describe("DAG Audit Log", () => { const setupEventsPage = new EventsPage(page); for (let i = 0; i < triggerCount; i++) { - const dagRunId = await setupDagsPage.triggerDag(testDagId); - - await setupDagsPage.verifyDagRunStatus(testDagId, dagRunId); + await setupDagsPage.triggerDag(testDagId); } await setupEventsPage.navigateToAuditLog(testDagId);