From 0071913952a0edb2a59c8d8acf338266669c7549 Mon Sep 17 00:00:00 2001 From: Philip Abernethy Date: Mon, 10 Nov 2025 16:01:56 +0000 Subject: [PATCH 1/2] Reinstate tih.id as pkey and auto-increment on downgrade --- .../0060_3_0_0_add_try_id_to_ti_and_tih.py | 28 +------------------ 1 file changed, 1 insertion(+), 27 deletions(-) diff --git a/airflow-core/src/airflow/migrations/versions/0060_3_0_0_add_try_id_to_ti_and_tih.py b/airflow-core/src/airflow/migrations/versions/0060_3_0_0_add_try_id_to_ti_and_tih.py index 82a80cc8903de..397e76ce20a16 100644 --- a/airflow-core/src/airflow/migrations/versions/0060_3_0_0_add_try_id_to_ti_and_tih.py +++ b/airflow-core/src/airflow/migrations/versions/0060_3_0_0_add_try_id_to_ti_and_tih.py @@ -127,34 +127,8 @@ def upgrade(): def downgrade(): """Unapply Add try_id to TI and TIH.""" - dialect_name = op.get_bind().dialect.name with op.batch_alter_table("task_instance_history", schema=None) as batch_op: batch_op.drop_constraint(batch_op.f("task_instance_history_pkey"), type_="primary") - batch_op.add_column(sa.Column("id", sa.INTEGER, nullable=True)) + batch_op.add_column(sa.Column("id", sa.INTEGER, primary_key=True, autoincrement=True)) batch_op.drop_column("task_instance_id") - if dialect_name == "postgresql": - op.execute( - """ - ALTER TABLE task_instance_history ADD COLUMN row_num SERIAL; - UPDATE task_instance_history SET id = row_num; - ALTER TABLE task_instance_history DROP COLUMN row_num; - """ - ) - elif dialect_name == "mysql": - op.execute( - """ - SET @row_number = 0; - UPDATE task_instance_history - SET id = (@row_number := @row_number + 1) - ORDER BY try_id; - """ - ) - else: - op.execute(""" - UPDATE task_instance_history - SET id = (SELECT COUNT(*) FROM task_instance_history t2 WHERE t2.id <= task_instance_history.id); - """) - with op.batch_alter_table("task_instance_history", schema=None) as batch_op: - batch_op.alter_column("id", nullable=False, existing_type=sa.INTEGER) batch_op.drop_column("try_id") - batch_op.create_primary_key("task_instance_history_pkey", ["id"]) From 77097d12d49d0e1697102565d0dc22fa14692952 Mon Sep 17 00:00:00 2001 From: Philip Abernethy Date: Fri, 23 Jan 2026 15:43:26 +0000 Subject: [PATCH 2/2] add_column does not support primary_key so execute instead --- .../versions/0060_3_0_0_add_try_id_to_ti_and_tih.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/airflow-core/src/airflow/migrations/versions/0060_3_0_0_add_try_id_to_ti_and_tih.py b/airflow-core/src/airflow/migrations/versions/0060_3_0_0_add_try_id_to_ti_and_tih.py index 397e76ce20a16..b0531ea34fdee 100644 --- a/airflow-core/src/airflow/migrations/versions/0060_3_0_0_add_try_id_to_ti_and_tih.py +++ b/airflow-core/src/airflow/migrations/versions/0060_3_0_0_add_try_id_to_ti_and_tih.py @@ -129,6 +129,12 @@ def downgrade(): """Unapply Add try_id to TI and TIH.""" with op.batch_alter_table("task_instance_history", schema=None) as batch_op: batch_op.drop_constraint(batch_op.f("task_instance_history_pkey"), type_="primary") - batch_op.add_column(sa.Column("id", sa.INTEGER, primary_key=True, autoincrement=True)) + match batch_op.get_bind().dialect: + case "mysql": + batch_op.execute("ALTER TABLE task_instance_history ADD COLUMN id INTEGER PRIMARY KEY AUTO_INCREMENT;") + case "postgresql": + batch_op.execute("ALTER TABLE task_instance_history ADD COLUMN id SERIAL PRIMARY KEY;") + case "sqlite": + batch_op.execute("ALTER TABLE task_instance_history ADD COLUMN id INTEGER PRIMARY KEY AUTOINCREMENT;") batch_op.drop_column("task_instance_id") batch_op.drop_column("try_id")