From 7323a785222ac0534c78d558cd1142e480197ef1 Mon Sep 17 00:00:00 2001 From: mrkaye97 Date: Fri, 29 Aug 2025 13:50:56 -0400 Subject: [PATCH 01/19] feat: first pass fix: clean up migration feat: migration --- .../migrations/20250829173445_v1_0_41.sql | 526 ++++++++++++++++++ sql/schema/v1-core.sql | 8 +- 2 files changed, 530 insertions(+), 4 deletions(-) create mode 100644 cmd/hatchet-migrate/migrate/migrations/20250829173445_v1_0_41.sql diff --git a/cmd/hatchet-migrate/migrate/migrations/20250829173445_v1_0_41.sql b/cmd/hatchet-migrate/migrate/migrations/20250829173445_v1_0_41.sql new file mode 100644 index 0000000000..f2626781ad --- /dev/null +++ b/cmd/hatchet-migrate/migrate/migrations/20250829173445_v1_0_41.sql @@ -0,0 +1,526 @@ +-- +goose no transaction +-- +goose Up +-- +goose StatementBegin +BEGIN; +CREATE TABLE IF NOT EXISTS v1_lookup_table_partitioned ( + tenant_id UUID NOT NULL, + external_id UUID NOT NULL, + task_id BIGINT, + dag_id BIGINT, + inserted_at TIMESTAMPTZ NOT NULL, + + PRIMARY KEY (external_id, inserted_at) +) PARTITION BY RANGE (inserted_at); + +SELECT create_v1_weekly_range_partition('v1_lookup_table_partitioned', NOW()::DATE); +SELECT create_v1_weekly_range_partition('v1_lookup_table_partitioned', (NOW() - INTERVAL '1 week')::DATE); + +DO $$ +DECLARE + startDateStr varchar; + endDateStr varchar; + targetTableName CONSTANT varchar := 'v1_lookup_table_partitioned'; + newTableName varchar; +BEGIN + SELECT '19700101' INTO startDateStr; + SELECT TO_CHAR(date_trunc('week', (NOW() - INTERVAL '8 days')::DATE), 'YYYYMMDD') INTO endDateStr; + SELECT LOWER(FORMAT('%s_%s', targetTableName, startDateStr)) INTO newTableName; + + EXECUTE + format('CREATE TABLE IF NOT EXISTS %s (LIKE %s INCLUDING INDEXES)', newTableName, targetTableName); + + EXECUTE + format('ALTER TABLE %s SET ( + autovacuum_vacuum_scale_factor = ''0.1'', + autovacuum_analyze_scale_factor=''0.05'', + autovacuum_vacuum_threshold=''25'', + autovacuum_analyze_threshold=''25'', + autovacuum_vacuum_cost_delay=''10'', + autovacuum_vacuum_cost_limit=''1000'' + )', newTableName); + EXECUTE + format('ALTER TABLE %s ATTACH PARTITION %s FOR VALUES FROM (''%s'') TO (''%s'')', targetTableName, newTableName, startDateStr, endDateStr); +END$$; + + + +CREATE OR REPLACE FUNCTION v1_lookup_table_partitioned_insert_function() +RETURNS TRIGGER AS +$$ +BEGIN + INSERT INTO v1_lookup_table_partitioned ( + tenant_id, + external_id, + task_id, + dag_id, + inserted_at + ) + SELECT + tenant_id, + external_id, + id, + dag_id, + inserted_at + FROM new_rows + ON CONFLICT (external_id, inserted_at) DO NOTHING; + + RETURN NULL; +END; +$$ +LANGUAGE plpgsql; + +CREATE OR REPLACE TRIGGER v1_lookup_table_partitioned_insert_trigger +AFTER INSERT ON v1_lookup_table +REFERENCING NEW TABLE AS new_rows +FOR EACH STATEMENT +EXECUTE FUNCTION v1_lookup_table_partitioned_insert_function(); + +COMMIT; +-- +goose StatementEnd + +-- +goose StatementBegin +INSERT INTO v1_lookup_table_partitioned ( + tenant_id, + external_id, + task_id, + dag_id, + inserted_at +) +SELECT + tenant_id, + external_id, + id, + NULL::BIGINT, + inserted_at +FROM v1_task +ON CONFLICT (external_id, inserted_at) DO NOTHING; + + +INSERT INTO v1_lookup_table_partitioned ( + tenant_id, + external_id, + task_id, + dag_id, + inserted_at +) +SELECT + tenant_id, + external_id, + NULL::BIGINT, + id, + inserted_at +FROM v1_dag +ON CONFLICT (external_id, inserted_at) DO NOTHING; +-- +goose StatementEnd + +-- +goose StatementBegin +BEGIN; +DROP TABLE IF EXISTS v1_lookup_table; +ALTER TABLE v1_lookup_table_partitioned + RENAME TO v1_lookup_table; + +CREATE OR REPLACE FUNCTION v1_dag_insert_function() +RETURNS TRIGGER AS +$$ +BEGIN + INSERT INTO v1_lookup_table ( + external_id, + tenant_id, + dag_id, + inserted_at + ) + SELECT + external_id, + tenant_id, + id, + inserted_at + FROM new_table + ON CONFLICT (external_id, inserted_at) DO NOTHING; + + RETURN NULL; +END; +$$ +LANGUAGE plpgsql; + + +CREATE OR REPLACE FUNCTION v1_task_insert_function() +RETURNS TRIGGER AS $$ +DECLARE + rec RECORD; +BEGIN + WITH new_slot_rows AS ( + SELECT + id, + inserted_at, + retry_count, + tenant_id, + priority, + concurrency_parent_strategy_ids[1] AS parent_strategy_id, + CASE + WHEN array_length(concurrency_parent_strategy_ids, 1) > 1 THEN concurrency_parent_strategy_ids[2:array_length(concurrency_parent_strategy_ids, 1)] + ELSE '{}'::bigint[] + END AS next_parent_strategy_ids, + concurrency_strategy_ids[1] AS strategy_id, + external_id, + workflow_run_id, + CASE + WHEN array_length(concurrency_strategy_ids, 1) > 1 THEN concurrency_strategy_ids[2:array_length(concurrency_strategy_ids, 1)] + ELSE '{}'::bigint[] + END AS next_strategy_ids, + concurrency_keys[1] AS key, + CASE + WHEN array_length(concurrency_keys, 1) > 1 THEN concurrency_keys[2:array_length(concurrency_keys, 1)] + ELSE '{}'::text[] + END AS next_keys, + workflow_id, + workflow_version_id, + queue, + CURRENT_TIMESTAMP + convert_duration_to_interval(schedule_timeout) AS schedule_timeout_at + FROM new_table + WHERE initial_state = 'QUEUED' AND concurrency_strategy_ids[1] IS NOT NULL + ) + INSERT INTO v1_concurrency_slot ( + task_id, + task_inserted_at, + task_retry_count, + external_id, + tenant_id, + workflow_id, + workflow_version_id, + workflow_run_id, + parent_strategy_id, + next_parent_strategy_ids, + strategy_id, + next_strategy_ids, + priority, + key, + next_keys, + queue_to_notify, + schedule_timeout_at + ) + SELECT + id, + inserted_at, + retry_count, + external_id, + tenant_id, + workflow_id, + workflow_version_id, + workflow_run_id, + parent_strategy_id, + next_parent_strategy_ids, + strategy_id, + next_strategy_ids, + COALESCE(priority, 1), + key, + next_keys, + queue, + schedule_timeout_at + FROM new_slot_rows; + + INSERT INTO v1_queue_item ( + tenant_id, + queue, + task_id, + task_inserted_at, + external_id, + action_id, + step_id, + workflow_id, + workflow_run_id, + schedule_timeout_at, + step_timeout, + priority, + sticky, + desired_worker_id, + retry_count + ) + SELECT + tenant_id, + queue, + id, + inserted_at, + external_id, + action_id, + step_id, + workflow_id, + workflow_run_id, + CURRENT_TIMESTAMP + convert_duration_to_interval(schedule_timeout), + step_timeout, + COALESCE(priority, 1), + sticky, + desired_worker_id, + retry_count + FROM new_table + WHERE initial_state = 'QUEUED' AND concurrency_strategy_ids[1] IS NULL; + + INSERT INTO v1_dag_to_task ( + dag_id, + dag_inserted_at, + task_id, + task_inserted_at + ) + SELECT + dag_id, + dag_inserted_at, + id, + inserted_at + FROM new_table + WHERE dag_id IS NOT NULL AND dag_inserted_at IS NOT NULL; + + INSERT INTO v1_lookup_table ( + external_id, + tenant_id, + task_id, + inserted_at + ) + SELECT + external_id, + tenant_id, + id, + inserted_at + FROM new_table + ON CONFLICT (external_id, inserted_at) DO NOTHING; + + RETURN NULL; +END; +$$ +LANGUAGE plpgsql; + + + +ALTER INDEX v1_lookup_table_partitioned_pkey + RENAME TO v1_lookup_table_pkey; + +DROP TRIGGER IF EXISTS v1_lookup_table_partitioned_insert_trigger ON v1_lookup_table_partitioned; +DROP FUNCTION IF EXISTS v1_lookup_table_partitioned_insert_function; +COMMIT; +-- +goose StatementEnd + +-- +goose Down +-- +goose StatementBegin +BEGIN; +CREATE TABLE IF NOT EXISTS v1_lookup_table_original ( + tenant_id UUID NOT NULL, + external_id UUID NOT NULL, + task_id BIGINT, + dag_id BIGINT, + inserted_at TIMESTAMPTZ NOT NULL, + + PRIMARY KEY (external_id) +); +COMMIT; +-- +goose StatementEnd + +-- +goose StatementBegin +INSERT INTO v1_lookup_table_original ( + tenant_id, + external_id, + task_id, + dag_id, + inserted_at +) +SELECT + tenant_id, + external_id, + id, + NULL::BIGINT, + inserted_at +FROM v1_task; + + +INSERT INTO v1_lookup_table_original ( + tenant_id, + external_id, + task_id, + dag_id, + inserted_at +) +SELECT + tenant_id, + external_id, + NULL::BIGINT, + id, + inserted_at +FROM v1_dag; +-- +goose StatementEnd + +-- +goose StatementBegin +BEGIN; +DROP TABLE IF EXISTS v1_lookup_table; +ALTER TABLE v1_lookup_table_original + RENAME TO v1_lookup_table; + +ALTER INDEX v1_lookup_table_original_pkey + RENAME TO v1_lookup_table_pkey; + +CREATE OR REPLACE FUNCTION v1_dag_insert_function() +RETURNS TRIGGER AS +$$ +BEGIN + INSERT INTO v1_lookup_table ( + external_id, + tenant_id, + dag_id, + inserted_at + ) + SELECT + external_id, + tenant_id, + id, + inserted_at + FROM new_table + ON CONFLICT (external_id) DO NOTHING; + + RETURN NULL; +END; +$$ +LANGUAGE plpgsql; + + +CREATE OR REPLACE FUNCTION v1_task_insert_function() +RETURNS TRIGGER AS $$ +DECLARE + rec RECORD; +BEGIN + WITH new_slot_rows AS ( + SELECT + id, + inserted_at, + retry_count, + tenant_id, + priority, + concurrency_parent_strategy_ids[1] AS parent_strategy_id, + CASE + WHEN array_length(concurrency_parent_strategy_ids, 1) > 1 THEN concurrency_parent_strategy_ids[2:array_length(concurrency_parent_strategy_ids, 1)] + ELSE '{}'::bigint[] + END AS next_parent_strategy_ids, + concurrency_strategy_ids[1] AS strategy_id, + external_id, + workflow_run_id, + CASE + WHEN array_length(concurrency_strategy_ids, 1) > 1 THEN concurrency_strategy_ids[2:array_length(concurrency_strategy_ids, 1)] + ELSE '{}'::bigint[] + END AS next_strategy_ids, + concurrency_keys[1] AS key, + CASE + WHEN array_length(concurrency_keys, 1) > 1 THEN concurrency_keys[2:array_length(concurrency_keys, 1)] + ELSE '{}'::text[] + END AS next_keys, + workflow_id, + workflow_version_id, + queue, + CURRENT_TIMESTAMP + convert_duration_to_interval(schedule_timeout) AS schedule_timeout_at + FROM new_table + WHERE initial_state = 'QUEUED' AND concurrency_strategy_ids[1] IS NOT NULL + ) + INSERT INTO v1_concurrency_slot ( + task_id, + task_inserted_at, + task_retry_count, + external_id, + tenant_id, + workflow_id, + workflow_version_id, + workflow_run_id, + parent_strategy_id, + next_parent_strategy_ids, + strategy_id, + next_strategy_ids, + priority, + key, + next_keys, + queue_to_notify, + schedule_timeout_at + ) + SELECT + id, + inserted_at, + retry_count, + external_id, + tenant_id, + workflow_id, + workflow_version_id, + workflow_run_id, + parent_strategy_id, + next_parent_strategy_ids, + strategy_id, + next_strategy_ids, + COALESCE(priority, 1), + key, + next_keys, + queue, + schedule_timeout_at + FROM new_slot_rows; + + INSERT INTO v1_queue_item ( + tenant_id, + queue, + task_id, + task_inserted_at, + external_id, + action_id, + step_id, + workflow_id, + workflow_run_id, + schedule_timeout_at, + step_timeout, + priority, + sticky, + desired_worker_id, + retry_count + ) + SELECT + tenant_id, + queue, + id, + inserted_at, + external_id, + action_id, + step_id, + workflow_id, + workflow_run_id, + CURRENT_TIMESTAMP + convert_duration_to_interval(schedule_timeout), + step_timeout, + COALESCE(priority, 1), + sticky, + desired_worker_id, + retry_count + FROM new_table + WHERE initial_state = 'QUEUED' AND concurrency_strategy_ids[1] IS NULL; + + INSERT INTO v1_dag_to_task ( + dag_id, + dag_inserted_at, + task_id, + task_inserted_at + ) + SELECT + dag_id, + dag_inserted_at, + id, + inserted_at + FROM new_table + WHERE dag_id IS NOT NULL AND dag_inserted_at IS NOT NULL; + + INSERT INTO v1_lookup_table ( + external_id, + tenant_id, + task_id, + inserted_at + ) + SELECT + external_id, + tenant_id, + id, + inserted_at + FROM new_table + ON CONFLICT (external_id) DO NOTHING; + + RETURN NULL; +END; +$$ +LANGUAGE plpgsql; + +COMMIT; +-- +goose StatementEnd \ No newline at end of file diff --git a/sql/schema/v1-core.sql b/sql/schema/v1-core.sql index 129cf64900..a697120541 100644 --- a/sql/schema/v1-core.sql +++ b/sql/schema/v1-core.sql @@ -313,8 +313,8 @@ CREATE TABLE v1_lookup_table ( dag_id BIGINT, inserted_at TIMESTAMPTZ NOT NULL, - PRIMARY KEY (external_id) -); + PRIMARY KEY (external_id, inserted_at) +) PARTITION BY RANGE (inserted_at); CREATE TYPE v1_task_event_type AS ENUM ( 'COMPLETED', @@ -1095,7 +1095,7 @@ BEGIN id, inserted_at FROM new_table - ON CONFLICT (external_id) DO NOTHING; + ON CONFLICT (external_id, inserted_at) DO NOTHING; RETURN NULL; END; @@ -1583,7 +1583,7 @@ BEGIN id, inserted_at FROM new_table - ON CONFLICT (external_id) DO NOTHING; + ON CONFLICT (external_id, inserted_at) DO NOTHING; RETURN NULL; END; From b9c83d8dae77b7d5ae995bece36ca66217cda5d1 Mon Sep 17 00:00:00 2001 From: mrkaye97 Date: Fri, 29 Aug 2025 15:57:35 -0400 Subject: [PATCH 02/19] fix: lint --- .../migrate/migrations/20250829173445_v1_0_41.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/hatchet-migrate/migrate/migrations/20250829173445_v1_0_41.sql b/cmd/hatchet-migrate/migrate/migrations/20250829173445_v1_0_41.sql index f2626781ad..6c4a41221a 100644 --- a/cmd/hatchet-migrate/migrate/migrations/20250829173445_v1_0_41.sql +++ b/cmd/hatchet-migrate/migrate/migrations/20250829173445_v1_0_41.sql @@ -523,4 +523,4 @@ $$ LANGUAGE plpgsql; COMMIT; --- +goose StatementEnd \ No newline at end of file +-- +goose StatementEnd From 1b68c7d3866403e6dba47dbfd03670e311f7350f Mon Sep 17 00:00:00 2001 From: mrkaye97 Date: Tue, 2 Sep 2025 10:00:36 -0400 Subject: [PATCH 03/19] fix: add some checks for if tables already exist --- .../migrations/20250829173445_v1_0_41.sql | 63 +++++++++++++++---- 1 file changed, 50 insertions(+), 13 deletions(-) diff --git a/cmd/hatchet-migrate/migrate/migrations/20250829173445_v1_0_41.sql b/cmd/hatchet-migrate/migrate/migrations/20250829173445_v1_0_41.sql index 6c4a41221a..39d8a27b94 100644 --- a/cmd/hatchet-migrate/migrate/migrations/20250829173445_v1_0_41.sql +++ b/cmd/hatchet-migrate/migrate/migrations/20250829173445_v1_0_41.sql @@ -12,22 +12,60 @@ CREATE TABLE IF NOT EXISTS v1_lookup_table_partitioned ( PRIMARY KEY (external_id, inserted_at) ) PARTITION BY RANGE (inserted_at); -SELECT create_v1_weekly_range_partition('v1_lookup_table_partitioned', NOW()::DATE); -SELECT create_v1_weekly_range_partition('v1_lookup_table_partitioned', (NOW() - INTERVAL '1 week')::DATE); +-- check if partitions already exist, and create them if not +DO $$ +DECLARE + partition_count INTEGER; +BEGIN + SELECT COUNT(*) + INTO partition_count + FROM pg_class c + JOIN pg_inherits i ON c.oid = i.inhrelid + JOIN pg_class parent ON i.inhparent = parent.oid + WHERE + parent.relname = 'v1_lookup_table_partitioned' + AND c.relkind = 'r'; + + IF partition_count > 0 THEN + RAISE NOTICE 'Table has % partitions. Exiting.', partition_count; + RETURN; + END IF; + + RAISE NOTICE 'No partitions found. Creating partitions...'; + + PERFORM create_v1_weekly_range_partition('v1_lookup_table_partitioned', NOW()::DATE); + PERFORM create_v1_weekly_range_partition('v1_lookup_table_partitioned', (NOW() - INTERVAL '1 week')::DATE); +END $$; + DO $$ DECLARE - startDateStr varchar; - endDateStr varchar; - targetTableName CONSTANT varchar := 'v1_lookup_table_partitioned'; - newTableName varchar; + partition_count INTEGER; + start_date_str varchar; + end_date_str varchar; + target_table_name CONSTANT varchar := 'v1_lookup_table_partitioned'; + new_table_name varchar; BEGIN - SELECT '19700101' INTO startDateStr; - SELECT TO_CHAR(date_trunc('week', (NOW() - INTERVAL '8 days')::DATE), 'YYYYMMDD') INTO endDateStr; - SELECT LOWER(FORMAT('%s_%s', targetTableName, startDateStr)) INTO newTableName; + -- if the partition containing the old data already exists, exit + SELECT COUNT(*) + INTO partition_count + FROM pg_class c + JOIN pg_inherits i ON c.oid = i.inhrelid + JOIN pg_class parent ON i.inhparent = parent.oid + WHERE + c.relname = 'v1_lookup_table_partitioned_19700101'; + + IF partition_count > 0 THEN + RAISE NOTICE 'Table has % partitions. Exiting.', partition_count; + RETURN; + END IF; + + SELECT '19700101' INTO start_date_str; + SELECT TO_CHAR(date_trunc('week', (NOW() - INTERVAL '8 days')::DATE), 'YYYYMMDD') INTO end_date_str; + SELECT LOWER(FORMAT('%s_%s', target_table_name, start_date_str)) INTO new_table_name; EXECUTE - format('CREATE TABLE IF NOT EXISTS %s (LIKE %s INCLUDING INDEXES)', newTableName, targetTableName); + format('CREATE TABLE IF NOT EXISTS %s (LIKE %s INCLUDING INDEXES)', new_table_name, target_table_name); EXECUTE format('ALTER TABLE %s SET ( @@ -37,13 +75,12 @@ BEGIN autovacuum_analyze_threshold=''25'', autovacuum_vacuum_cost_delay=''10'', autovacuum_vacuum_cost_limit=''1000'' - )', newTableName); + )', new_table_name); EXECUTE - format('ALTER TABLE %s ATTACH PARTITION %s FOR VALUES FROM (''%s'') TO (''%s'')', targetTableName, newTableName, startDateStr, endDateStr); + format('ALTER TABLE %s ATTACH PARTITION %s FOR VALUES FROM (''%s'') TO (''%s'')', target_table_name, new_table_name, start_date_str, end_date_str); END$$; - CREATE OR REPLACE FUNCTION v1_lookup_table_partitioned_insert_function() RETURNS TRIGGER AS $$ From 9abb6a3cb366413847ea5617595162045cccf3d5 Mon Sep 17 00:00:00 2001 From: mrkaye97 Date: Wed, 3 Sep 2025 12:40:11 -0400 Subject: [PATCH 04/19] fix: naming of partition --- .../migrations/20250829173445_v1_0_41.sql | 40 ++++++------------- 1 file changed, 13 insertions(+), 27 deletions(-) diff --git a/cmd/hatchet-migrate/migrate/migrations/20250829173445_v1_0_41.sql b/cmd/hatchet-migrate/migrate/migrations/20250829173445_v1_0_41.sql index 39d8a27b94..779a8db1d7 100644 --- a/cmd/hatchet-migrate/migrate/migrations/20250829173445_v1_0_41.sql +++ b/cmd/hatchet-migrate/migrate/migrations/20250829173445_v1_0_41.sql @@ -16,6 +16,11 @@ CREATE TABLE IF NOT EXISTS v1_lookup_table_partitioned ( DO $$ DECLARE partition_count INTEGER; + start_date_str varchar := '19700101'; + placeholder_start_date_str varchar; + end_date_str varchar; + target_table_name CONSTANT varchar := 'v1_lookup_table_partitioned'; + new_table_name varchar; BEGIN SELECT COUNT(*) INTO partition_count @@ -33,36 +38,17 @@ BEGIN RAISE NOTICE 'No partitions found. Creating partitions...'; + -- creates a partition for this week, truncated to the week start PERFORM create_v1_weekly_range_partition('v1_lookup_table_partitioned', NOW()::DATE); - PERFORM create_v1_weekly_range_partition('v1_lookup_table_partitioned', (NOW() - INTERVAL '1 week')::DATE); -END $$; + -- create a partition for everything before this week, truncated to the start of last week + -- this is a hack where we name the partition for last week, but just include everything in it + SELECT TO_CHAR(date_trunc('week', (NOW() - INTERVAL '1 week')::DATE), 'YYYYMMDD') INTO placeholder_start_date_str; + SELECT TO_CHAR(date_trunc('week', NOW()::DATE), 'YYYYMMDD') INTO end_date_str; -DO $$ -DECLARE - partition_count INTEGER; - start_date_str varchar; - end_date_str varchar; - target_table_name CONSTANT varchar := 'v1_lookup_table_partitioned'; - new_table_name varchar; -BEGIN - -- if the partition containing the old data already exists, exit - SELECT COUNT(*) - INTO partition_count - FROM pg_class c - JOIN pg_inherits i ON c.oid = i.inhrelid - JOIN pg_class parent ON i.inhparent = parent.oid - WHERE - c.relname = 'v1_lookup_table_partitioned_19700101'; + RAISE NOTICE 'Creating partition for range % to %', start_date_str, end_date_str; - IF partition_count > 0 THEN - RAISE NOTICE 'Table has % partitions. Exiting.', partition_count; - RETURN; - END IF; - - SELECT '19700101' INTO start_date_str; - SELECT TO_CHAR(date_trunc('week', (NOW() - INTERVAL '8 days')::DATE), 'YYYYMMDD') INTO end_date_str; - SELECT LOWER(FORMAT('%s_%s', target_table_name, start_date_str)) INTO new_table_name; + SELECT LOWER(FORMAT('%s_%s', target_table_name, placeholder_start_date_str)) INTO new_table_name; EXECUTE format('CREATE TABLE IF NOT EXISTS %s (LIKE %s INCLUDING INDEXES)', new_table_name, target_table_name); @@ -78,7 +64,7 @@ BEGIN )', new_table_name); EXECUTE format('ALTER TABLE %s ATTACH PARTITION %s FOR VALUES FROM (''%s'') TO (''%s'')', target_table_name, new_table_name, start_date_str, end_date_str); -END$$; +END $$; CREATE OR REPLACE FUNCTION v1_lookup_table_partitioned_insert_function() From 9d25ec7d97fad5319b5410170d1cf0cab9ef85b8 Mon Sep 17 00:00:00 2001 From: matt Date: Tue, 18 Nov 2025 20:41:09 -0500 Subject: [PATCH 05/19] Fix: Partition core `v1_dag_to_task` (#2241) * feat: initial up migration * fix: add down * fix: add partition for today * fix: weekly partitioning * fix: add partitioning logic for job * Revert "fix: weekly partitioning" This reverts commit 5fca616f8e7b3374b9de0b9edf4ef84d5cd59adb. * fix: date wrangling * fix: one more migration fix * fix: partitions * fix: ranges * debug: table name * fix: partition naming * feat: add analyze * feat: add more `ANALYZE` * fix: run analyze more often * fix: rm analyze * chore: gen * fix: ugly sql > merge conflicts * fix: migration version * fix: run analyze in migration * chore: gen --- .../migrations/20250912172758_v1_0_43.sql | 73 +++++++++++++++++++ pkg/repository/v1/olap.go | 18 +++++ pkg/repository/v1/sqlcv1/dags.sql | 3 + pkg/repository/v1/sqlcv1/dags.sql.go | 9 +++ pkg/repository/v1/sqlcv1/olap.sql | 9 +++ pkg/repository/v1/sqlcv1/olap.sql.go | 27 +++++++ pkg/repository/v1/sqlcv1/tasks.sql | 42 ++++++++--- pkg/repository/v1/sqlcv1/tasks.sql.go | 47 +++++++++--- pkg/repository/v1/task.go | 8 +- sql/schema/v1-core.sql | 2 +- 10 files changed, 216 insertions(+), 22 deletions(-) create mode 100644 cmd/hatchet-migrate/migrate/migrations/20250912172758_v1_0_43.sql diff --git a/cmd/hatchet-migrate/migrate/migrations/20250912172758_v1_0_43.sql b/cmd/hatchet-migrate/migrate/migrations/20250912172758_v1_0_43.sql new file mode 100644 index 0000000000..2feb3adbf6 --- /dev/null +++ b/cmd/hatchet-migrate/migrate/migrations/20250912172758_v1_0_43.sql @@ -0,0 +1,73 @@ +-- +goose Up +-- +goose StatementBegin +-- +goose no transaction +BEGIN; + +CREATE TABLE v1_dag_to_task_partitioned ( + dag_id BIGINT NOT NULL, + dag_inserted_at TIMESTAMPTZ NOT NULL, + task_id BIGINT NOT NULL, + task_inserted_at TIMESTAMPTZ NOT NULL, + PRIMARY KEY (dag_id, dag_inserted_at, task_id, task_inserted_at) +) PARTITION BY RANGE(dag_inserted_at); + +DO $$ +DECLARE + new_table_name TEXT; +BEGIN + -- We want to attach all of the existing data as today's partition, which + -- includes everything up until the _end_ of today. + -- The new partition will be named with today's date, but the end time will be tomorrow (midnight). + new_table_name := 'v1_dag_to_task_' || TO_CHAR(NOW()::DATE, 'YYYYMMDD'); + + RAISE NOTICE 'Renaming existing table to %', new_table_name; + + EXECUTE format('ALTER TABLE v1_dag_to_task RENAME TO %I', new_table_name); + EXECUTE format('ALTER INDEX v1_dag_to_task_pkey RENAME TO %I', new_table_name || '_pkey'); + + EXECUTE + format('ALTER TABLE %s SET ( + autovacuum_vacuum_scale_factor = ''0.1'', + autovacuum_analyze_scale_factor=''0.05'', + autovacuum_vacuum_threshold=''25'', + autovacuum_analyze_threshold=''25'', + autovacuum_vacuum_cost_delay=''10'', + autovacuum_vacuum_cost_limit=''1000'' + )', new_table_name); + + EXECUTE + format( + 'ALTER TABLE v1_dag_to_task_partitioned ATTACH PARTITION %s FOR VALUES FROM (''19700101'') TO (''%s'')', + new_table_name, + TO_CHAR((NOW() + INTERVAL '1 day')::DATE, 'YYYYMMDD') + ); +END $$; + +ALTER TABLE v1_dag_to_task_partitioned RENAME TO v1_dag_to_task; +ALTER INDEX v1_dag_to_task_partitioned_pkey RENAME TO v1_dag_to_task_pkey; + +SELECT create_v1_range_partition('v1_dag_to_task', (NOW() + INTERVAL '1 day')::DATE); +COMMIT; + +BEGIN; +ANALYZE v1_dag_to_task; +COMMIT; +-- +goose StatementEnd + +-- +goose Down +-- +goose StatementBegin +CREATE TABLE v1_dag_to_task_original ( + dag_id BIGINT NOT NULL, + dag_inserted_at TIMESTAMPTZ NOT NULL, + task_id BIGINT NOT NULL, + task_inserted_at TIMESTAMPTZ NOT NULL, + PRIMARY KEY (dag_id, dag_inserted_at, task_id, task_inserted_at) +); + +INSERT INTO v1_dag_to_task_original +SELECT * FROM v1_dag_to_task; + +DROP TABLE v1_dag_to_task; +ALTER TABLE v1_dag_to_task_original RENAME TO v1_dag_to_task; +ALTER INDEX v1_dag_to_task_original_pkey RENAME TO v1_dag_to_task_pkey; +-- +goose StatementEnd diff --git a/pkg/repository/v1/olap.go b/pkg/repository/v1/olap.go index ba542e3c83..cf87a1314c 100644 --- a/pkg/repository/v1/olap.go +++ b/pkg/repository/v1/olap.go @@ -2604,6 +2604,24 @@ func (r *OLAPRepositoryImpl) AnalyzeOLAPTables(ctx context.Context) error { return fmt.Errorf("error analyzing v1_dags_olap: %v", err) } + err = r.queries.AnalyzeV1EventsOLAP(ctx, tx) + + if err != nil { + return fmt.Errorf("error analyzing v1_events_olap: %v", err) + } + + err = r.queries.AnalyzeV1EventLookupTableOLAP(ctx, tx) + + if err != nil { + return fmt.Errorf("error analyzing v1_event_lookup_table_olap: %v", err) + } + + err = r.queries.AnalyzeV1EventToRunOLAP(ctx, tx) + + if err != nil { + return fmt.Errorf("error analyzing v1_event_to_run_olap: %v", err) + } + err = r.queries.AnalyzeV1DAGToTaskOLAP(ctx, tx) if err != nil { diff --git a/pkg/repository/v1/sqlcv1/dags.sql b/pkg/repository/v1/sqlcv1/dags.sql index f784d74e5c..b32bc39c56 100644 --- a/pkg/repository/v1/sqlcv1/dags.sql +++ b/pkg/repository/v1/sqlcv1/dags.sql @@ -63,3 +63,6 @@ INSERT INTO v1_dag_data ( $3, $4 ); + +-- name: AnalyzeV1DAG :exec +ANALYZE v1_dag; diff --git a/pkg/repository/v1/sqlcv1/dags.sql.go b/pkg/repository/v1/sqlcv1/dags.sql.go index 49f320d5f8..df57fcf43e 100644 --- a/pkg/repository/v1/sqlcv1/dags.sql.go +++ b/pkg/repository/v1/sqlcv1/dags.sql.go @@ -11,6 +11,15 @@ import ( "github.com/jackc/pgx/v5/pgtype" ) +const analyzeV1DAG = `-- name: AnalyzeV1DAG :exec +ANALYZE v1_dag +` + +func (q *Queries) AnalyzeV1DAG(ctx context.Context, db DBTX) error { + _, err := db.Exec(ctx, analyzeV1DAG) + return err +} + type CreateDAGDataParams struct { DagID int64 `json:"dag_id"` DagInsertedAt pgtype.Timestamptz `json:"dag_inserted_at"` diff --git a/pkg/repository/v1/sqlcv1/olap.sql b/pkg/repository/v1/sqlcv1/olap.sql index d0c3e7671d..d4fe9da08c 100644 --- a/pkg/repository/v1/sqlcv1/olap.sql +++ b/pkg/repository/v1/sqlcv1/olap.sql @@ -26,6 +26,15 @@ ANALYZE v1_tasks_olap; -- name: AnalyzeV1DAGsOLAP :exec ANALYZE v1_dags_olap; +-- name: AnalyzeV1EventsOLAP :exec +ANALYZE v1_events_olap; + +-- name: AnalyzeV1EventLookupTableOLAP :exec +ANALYZE v1_event_lookup_table_olap; + +-- name: AnalyzeV1EventToRunOLAP :exec +ANALYZE v1_event_to_run_olap; + -- name: AnalyzeV1DAGToTaskOLAP :exec ANALYZE v1_dag_to_task_olap; diff --git a/pkg/repository/v1/sqlcv1/olap.sql.go b/pkg/repository/v1/sqlcv1/olap.sql.go index 7dc5b5f5f9..ee2ee8b348 100644 --- a/pkg/repository/v1/sqlcv1/olap.sql.go +++ b/pkg/repository/v1/sqlcv1/olap.sql.go @@ -29,6 +29,33 @@ func (q *Queries) AnalyzeV1DAGsOLAP(ctx context.Context, db DBTX) error { return err } +const analyzeV1EventLookupTableOLAP = `-- name: AnalyzeV1EventLookupTableOLAP :exec +ANALYZE v1_event_lookup_table_olap +` + +func (q *Queries) AnalyzeV1EventLookupTableOLAP(ctx context.Context, db DBTX) error { + _, err := db.Exec(ctx, analyzeV1EventLookupTableOLAP) + return err +} + +const analyzeV1EventToRunOLAP = `-- name: AnalyzeV1EventToRunOLAP :exec +ANALYZE v1_event_to_run_olap +` + +func (q *Queries) AnalyzeV1EventToRunOLAP(ctx context.Context, db DBTX) error { + _, err := db.Exec(ctx, analyzeV1EventToRunOLAP) + return err +} + +const analyzeV1EventsOLAP = `-- name: AnalyzeV1EventsOLAP :exec +ANALYZE v1_events_olap +` + +func (q *Queries) AnalyzeV1EventsOLAP(ctx context.Context, db DBTX) error { + _, err := db.Exec(ctx, analyzeV1EventsOLAP) + return err +} + const analyzeV1PayloadsOLAP = `-- name: AnalyzeV1PayloadsOLAP :exec ANALYZE v1_payloads_olap ` diff --git a/pkg/repository/v1/sqlcv1/tasks.sql b/pkg/repository/v1/sqlcv1/tasks.sql index 05f532889f..f215f211b6 100644 --- a/pkg/repository/v1/sqlcv1/tasks.sql +++ b/pkg/repository/v1/sqlcv1/tasks.sql @@ -1,10 +1,13 @@ -- name: CreatePartitions :exec SELECT - create_v1_range_partition('v1_task', @date::date), - create_v1_range_partition('v1_dag', @date::date), - create_v1_range_partition('v1_task_event', @date::date), - create_v1_range_partition('v1_log_line', @date::date), - create_v1_range_partition('v1_payload', @date::date); + -- intentionally formatted this way to limit merge conflicts + create_v1_range_partition('v1_task', @date::date) + , create_v1_range_partition('v1_dag', @date::date) + , create_v1_range_partition('v1_task_event', @date::date) + , create_v1_range_partition('v1_log_line', @date::date) + , create_v1_range_partition('v1_payload', @date::date) + , create_v1_range_partition('v1_dag_to_task', @date::date) + ; -- name: EnsureTablePartitionsExist :one WITH tomorrow_date AS ( @@ -33,17 +36,26 @@ SELECT FROM partition_check; -- name: ListPartitionsBeforeDate :many -WITH task_partitions AS ( +WITH +-- intentionally formatted this way to limit merge conflicts +task_partitions AS ( SELECT 'v1_task' AS parent_table, p::text as partition_name FROM get_v1_partitions_before_date('v1_task', @date::date) AS p -), dag_partitions AS ( +) +, dag_partitions AS ( SELECT 'v1_dag' AS parent_table, p::text as partition_name FROM get_v1_partitions_before_date('v1_dag', @date::date) AS p -), task_event_partitions AS ( +) +, task_event_partitions AS ( SELECT 'v1_task_event' AS parent_table, p::text as partition_name FROM get_v1_partitions_before_date('v1_task_event', @date::date) AS p -), log_line_partitions AS ( +) +, log_line_partitions AS ( SELECT 'v1_log_line' AS parent_table, p::text as partition_name FROM get_v1_partitions_before_date('v1_log_line', @date::date) AS p -), payload_partitions AS ( +) +, payload_partitions AS ( SELECT 'v1_payload' AS parent_table, p::text as partition_name FROM get_v1_partitions_before_date('v1_payload', @date::date) AS p ) +, dag_to_task_partitions AS ( + SELECT 'v1_dag_to_task' AS parent_table, p::text as partition_name FROM get_v1_partitions_before_date('v1_dag_to_task', @date::date) AS p +) SELECT * @@ -77,6 +89,13 @@ SELECT * FROM payload_partitions + +UNION ALL + +SELECT + * +FROM + dag_to_task_partitions ; -- name: DefaultTaskActivityGauge :one @@ -919,6 +938,9 @@ ANALYZE v1_task; -- name: AnalyzeV1TaskEvent :exec ANALYZE v1_task_event; +-- name: AnalyzeV1DAGToTask :exec +ANALYZE v1_dag_to_task; + -- name: AnalyzeV1Dag :exec ANALYZE v1_dag; diff --git a/pkg/repository/v1/sqlcv1/tasks.sql.go b/pkg/repository/v1/sqlcv1/tasks.sql.go index e98a7f0d13..5280b9b7a2 100644 --- a/pkg/repository/v1/sqlcv1/tasks.sql.go +++ b/pkg/repository/v1/sqlcv1/tasks.sql.go @@ -12,6 +12,15 @@ import ( "github.com/jackc/pgx/v5/pgtype" ) +const analyzeV1DAGToTask = `-- name: AnalyzeV1DAGToTask :exec +ANALYZE v1_dag_to_task +` + +func (q *Queries) AnalyzeV1DAGToTask(ctx context.Context, db DBTX) error { + _, err := db.Exec(ctx, analyzeV1DAGToTask) + return err +} + const analyzeV1Dag = `-- name: AnalyzeV1Dag :exec ANALYZE v1_dag ` @@ -122,11 +131,13 @@ func (q *Queries) CleanupWorkflowConcurrencySlotsAfterInsert(ctx context.Context const createPartitions = `-- name: CreatePartitions :exec SELECT - create_v1_range_partition('v1_task', $1::date), - create_v1_range_partition('v1_dag', $1::date), - create_v1_range_partition('v1_task_event', $1::date), - create_v1_range_partition('v1_log_line', $1::date), - create_v1_range_partition('v1_payload', $1::date) + -- intentionally formatted this way to limit merge conflicts + create_v1_range_partition('v1_task', $1::date) + , create_v1_range_partition('v1_dag', $1::date) + , create_v1_range_partition('v1_task_event', $1::date) + , create_v1_range_partition('v1_log_line', $1::date) + , create_v1_range_partition('v1_payload', $1::date) + , create_v1_range_partition('v1_dag_to_task', $1::date) ` func (q *Queries) CreatePartitions(ctx context.Context, db DBTX, date pgtype.Date) error { @@ -969,17 +980,25 @@ func (q *Queries) ListMatchingTaskEvents(ctx context.Context, db DBTX, arg ListM } const listPartitionsBeforeDate = `-- name: ListPartitionsBeforeDate :many -WITH task_partitions AS ( +WITH +task_partitions AS ( SELECT 'v1_task' AS parent_table, p::text as partition_name FROM get_v1_partitions_before_date('v1_task', $1::date) AS p -), dag_partitions AS ( +) +, dag_partitions AS ( SELECT 'v1_dag' AS parent_table, p::text as partition_name FROM get_v1_partitions_before_date('v1_dag', $1::date) AS p -), task_event_partitions AS ( +) +, task_event_partitions AS ( SELECT 'v1_task_event' AS parent_table, p::text as partition_name FROM get_v1_partitions_before_date('v1_task_event', $1::date) AS p -), log_line_partitions AS ( +) +, log_line_partitions AS ( SELECT 'v1_log_line' AS parent_table, p::text as partition_name FROM get_v1_partitions_before_date('v1_log_line', $1::date) AS p -), payload_partitions AS ( +) +, payload_partitions AS ( SELECT 'v1_payload' AS parent_table, p::text as partition_name FROM get_v1_partitions_before_date('v1_payload', $1::date) AS p ) +, dag_to_task_partitions AS ( + SELECT 'v1_dag_to_task' AS parent_table, p::text as partition_name FROM get_v1_partitions_before_date('v1_dag_to_task', $1::date) AS p +) SELECT parent_table, partition_name @@ -1013,6 +1032,13 @@ SELECT parent_table, partition_name FROM payload_partitions + +UNION ALL + +SELECT + parent_table, partition_name +FROM + dag_to_task_partitions ` type ListPartitionsBeforeDateRow struct { @@ -1020,6 +1046,7 @@ type ListPartitionsBeforeDateRow struct { PartitionName string `json:"partition_name"` } +// intentionally formatted this way to limit merge conflicts func (q *Queries) ListPartitionsBeforeDate(ctx context.Context, db DBTX, date pgtype.Date) ([]*ListPartitionsBeforeDateRow, error) { rows, err := db.Query(ctx, listPartitionsBeforeDate, date) if err != nil { diff --git a/pkg/repository/v1/task.go b/pkg/repository/v1/task.go index c821a72121..9d4ba7c3c8 100644 --- a/pkg/repository/v1/task.go +++ b/pkg/repository/v1/task.go @@ -3634,7 +3634,13 @@ func (r *TaskRepositoryImpl) AnalyzeTaskTables(ctx context.Context) error { return fmt.Errorf("error analyzing v1_task_event: %v", err) } - err = r.queries.AnalyzeV1Dag(ctx, tx) + err = r.queries.AnalyzeV1DAGToTask(ctx, tx) + + if err != nil { + return fmt.Errorf("error analyzing v1_dag_to_task: %v", err) + } + + err = r.queries.AnalyzeV1DAG(ctx, tx) if err != nil { return fmt.Errorf("error analyzing v1_dag: %v", err) diff --git a/sql/schema/v1-core.sql b/sql/schema/v1-core.sql index a697120541..1e18cd3a18 100644 --- a/sql/schema/v1-core.sql +++ b/sql/schema/v1-core.sql @@ -652,7 +652,7 @@ CREATE TABLE v1_dag_to_task ( task_id BIGINT NOT NULL, task_inserted_at TIMESTAMPTZ NOT NULL, CONSTRAINT v1_dag_to_task_pkey PRIMARY KEY (dag_id, dag_inserted_at, task_id, task_inserted_at) -); +) PARTITION BY RANGE(dag_inserted_at); CREATE TABLE v1_dag_data ( dag_id BIGINT NOT NULL, From 5717e507ff1bc96085aa1b3f0c49100c0ec069e9 Mon Sep 17 00:00:00 2001 From: matt Date: Tue, 18 Nov 2025 20:45:03 -0500 Subject: [PATCH 06/19] Feat: Partition `v1_dag_data` (#2540) * feat: partitioning migration for v1_dag_data * feat: partition wiring * fix: pk name * chore: lint --- .../migrations/20251118172545_v1_0_54.sql | 73 +++++++++++++++++++ pkg/repository/v1/sqlcv1/tasks.sql | 19 ++++- pkg/repository/v1/sqlcv1/tasks.sql.go | 23 ++++++ pkg/repository/v1/task.go | 6 ++ sql/schema/v1-core.sql | 2 +- 5 files changed, 121 insertions(+), 2 deletions(-) create mode 100644 cmd/hatchet-migrate/migrate/migrations/20251118172545_v1_0_54.sql diff --git a/cmd/hatchet-migrate/migrate/migrations/20251118172545_v1_0_54.sql b/cmd/hatchet-migrate/migrate/migrations/20251118172545_v1_0_54.sql new file mode 100644 index 0000000000..f7263c01e1 --- /dev/null +++ b/cmd/hatchet-migrate/migrate/migrations/20251118172545_v1_0_54.sql @@ -0,0 +1,73 @@ +-- +goose Up +-- +goose StatementBegin +-- +goose no transaction +BEGIN; + +CREATE TABLE v1_dag_data_partitioned ( + dag_id BIGINT NOT NULL, + dag_inserted_at TIMESTAMPTZ NOT NULL, + input JSONB NOT NULL, + additional_metadata JSONB, + PRIMARY KEY (dag_id, dag_inserted_at) +) PARTITION BY RANGE(dag_inserted_at); + +DO $$ +DECLARE + new_table_name TEXT; +BEGIN + -- We want to attach all of the existing data as today's partition, which + -- includes everything up until the _end_ of today. + -- The new partition will be named with today's date, but the end time will be tomorrow (midnight). + new_table_name := 'v1_dag_data_' || TO_CHAR(NOW()::DATE, 'YYYYMMDD'); + + RAISE NOTICE 'Renaming existing table to %', new_table_name; + + EXECUTE format('ALTER TABLE v1_dag_data RENAME TO %I', new_table_name); + EXECUTE format('ALTER INDEX v1_dag_input_pkey RENAME TO %I', new_table_name || '_pkey'); + + EXECUTE + format('ALTER TABLE %s SET ( + autovacuum_vacuum_scale_factor = ''0.1'', + autovacuum_analyze_scale_factor=''0.05'', + autovacuum_vacuum_threshold=''25'', + autovacuum_analyze_threshold=''25'', + autovacuum_vacuum_cost_delay=''10'', + autovacuum_vacuum_cost_limit=''1000'' + )', new_table_name); + + EXECUTE + format( + 'ALTER TABLE v1_dag_data_partitioned ATTACH PARTITION %s FOR VALUES FROM (''19700101'') TO (''%s'')', + new_table_name, + TO_CHAR((NOW() + INTERVAL '1 day')::DATE, 'YYYYMMDD') + ); +END $$; + +ALTER TABLE v1_dag_data_partitioned RENAME TO v1_dag_data; +ALTER INDEX v1_dag_data_partitioned_pkey RENAME TO v1_dag_data_pkey; + +SELECT create_v1_range_partition('v1_dag_data', (NOW() + INTERVAL '1 day')::DATE); +COMMIT; + +BEGIN; +ANALYZE v1_dag_data; +COMMIT; +-- +goose StatementEnd + +-- +goose Down +-- +goose StatementBegin +CREATE TABLE v1_dag_data_original ( + dag_id BIGINT NOT NULL, + dag_inserted_at TIMESTAMPTZ NOT NULL, + input JSONB NOT NULL, + additional_metadata JSONB, + PRIMARY KEY (dag_id, dag_inserted_at) +); + +INSERT INTO v1_dag_data_original +SELECT * FROM v1_dag_data; + +DROP TABLE v1_dag_data; +ALTER TABLE v1_dag_data_original RENAME TO v1_dag_data; +ALTER INDEX v1_dag_data_original_pkey RENAME TO v1_dag_input_pkey; +-- +goose StatementEnd diff --git a/pkg/repository/v1/sqlcv1/tasks.sql b/pkg/repository/v1/sqlcv1/tasks.sql index f215f211b6..102bca0939 100644 --- a/pkg/repository/v1/sqlcv1/tasks.sql +++ b/pkg/repository/v1/sqlcv1/tasks.sql @@ -7,7 +7,8 @@ SELECT , create_v1_range_partition('v1_log_line', @date::date) , create_v1_range_partition('v1_payload', @date::date) , create_v1_range_partition('v1_dag_to_task', @date::date) - ; + , create_v1_range_partition('v1_dag_data', @date::date) +; -- name: EnsureTablePartitionsExist :one WITH tomorrow_date AS ( @@ -21,6 +22,10 @@ WITH tomorrow_date AS ( SELECT 'v1_task_event_' || to_char((SELECT date FROM tomorrow_date), 'YYYYMMDD') UNION ALL SELECT 'v1_log_line_' || to_char((SELECT date FROM tomorrow_date), 'YYYYMMDD') + UNION ALL + SELECT 'v1_payload' || to_char((SELECT date FROM tomorrow_date), 'YYYYMMDD') + UNION ALL + SELECT 'v1_dag_data' || to_char((SELECT date FROM tomorrow_date), 'YYYYMMDD') ), partition_check AS ( SELECT COUNT(*) AS total_tables, @@ -53,6 +58,9 @@ task_partitions AS ( , payload_partitions AS ( SELECT 'v1_payload' AS parent_table, p::text as partition_name FROM get_v1_partitions_before_date('v1_payload', @date::date) AS p ) +, dag_data_partitions AS ( + SELECT 'v1_dag_data' AS parent_table, p::text as partition_name FROM get_v1_partitions_before_date('v1_dag_data', @date::date) AS p +) , dag_to_task_partitions AS ( SELECT 'v1_dag_to_task' AS parent_table, p::text as partition_name FROM get_v1_partitions_before_date('v1_dag_to_task', @date::date) AS p ) @@ -96,6 +104,12 @@ SELECT * FROM dag_to_task_partitions + +UNION ALL + +SELECT * +FROM + dag_data_partitions ; -- name: DefaultTaskActivityGauge :one @@ -944,6 +958,9 @@ ANALYZE v1_dag_to_task; -- name: AnalyzeV1Dag :exec ANALYZE v1_dag; +-- name: AnalyzeV1DagData :exec +ANALYZE v1_dag_data; + -- name: CleanupV1TaskRuntime :execresult WITH locked_trs AS ( SELECT vtr.task_id, vtr.task_inserted_at, vtr.retry_count diff --git a/pkg/repository/v1/sqlcv1/tasks.sql.go b/pkg/repository/v1/sqlcv1/tasks.sql.go index 5280b9b7a2..ec0e24be47 100644 --- a/pkg/repository/v1/sqlcv1/tasks.sql.go +++ b/pkg/repository/v1/sqlcv1/tasks.sql.go @@ -30,6 +30,15 @@ func (q *Queries) AnalyzeV1Dag(ctx context.Context, db DBTX) error { return err } +const analyzeV1DagData = `-- name: AnalyzeV1DagData :exec +ANALYZE v1_dag_data +` + +func (q *Queries) AnalyzeV1DagData(ctx context.Context, db DBTX) error { + _, err := db.Exec(ctx, analyzeV1DagData) + return err +} + const analyzeV1Task = `-- name: AnalyzeV1Task :exec ANALYZE v1_task ` @@ -138,6 +147,7 @@ SELECT , create_v1_range_partition('v1_log_line', $1::date) , create_v1_range_partition('v1_payload', $1::date) , create_v1_range_partition('v1_dag_to_task', $1::date) + , create_v1_range_partition('v1_dag_data', $1::date) ` func (q *Queries) CreatePartitions(ctx context.Context, db DBTX, date pgtype.Date) error { @@ -229,6 +239,10 @@ WITH tomorrow_date AS ( SELECT 'v1_task_event_' || to_char((SELECT date FROM tomorrow_date), 'YYYYMMDD') UNION ALL SELECT 'v1_log_line_' || to_char((SELECT date FROM tomorrow_date), 'YYYYMMDD') + UNION ALL + SELECT 'v1_payload' || to_char((SELECT date FROM tomorrow_date), 'YYYYMMDD') + UNION ALL + SELECT 'v1_dag_data' || to_char((SELECT date FROM tomorrow_date), 'YYYYMMDD') ), partition_check AS ( SELECT COUNT(*) AS total_tables, @@ -996,6 +1010,9 @@ task_partitions AS ( , payload_partitions AS ( SELECT 'v1_payload' AS parent_table, p::text as partition_name FROM get_v1_partitions_before_date('v1_payload', $1::date) AS p ) +, dag_data_partitions AS ( + SELECT 'v1_dag_data' AS parent_table, p::text as partition_name FROM get_v1_partitions_before_date('v1_dag_data', $1::date) AS p +) , dag_to_task_partitions AS ( SELECT 'v1_dag_to_task' AS parent_table, p::text as partition_name FROM get_v1_partitions_before_date('v1_dag_to_task', $1::date) AS p ) @@ -1039,6 +1056,12 @@ SELECT parent_table, partition_name FROM dag_to_task_partitions + +UNION ALL + +SELECT parent_table, partition_name +FROM + dag_data_partitions ` type ListPartitionsBeforeDateRow struct { diff --git a/pkg/repository/v1/task.go b/pkg/repository/v1/task.go index 9d4ba7c3c8..80a2dc3b29 100644 --- a/pkg/repository/v1/task.go +++ b/pkg/repository/v1/task.go @@ -3652,6 +3652,12 @@ func (r *TaskRepositoryImpl) AnalyzeTaskTables(ctx context.Context) error { return fmt.Errorf("error analyzing v1_payload: %v", err) } + err = r.queries.AnalyzeV1DagData(ctx, tx) + + if err != nil { + return fmt.Errorf("error analyzing v1_dag_data: %v", err) + } + if err := commit(ctx); err != nil { return fmt.Errorf("error committing transaction: %v", err) } diff --git a/sql/schema/v1-core.sql b/sql/schema/v1-core.sql index 1e18cd3a18..7c87d7b0fd 100644 --- a/sql/schema/v1-core.sql +++ b/sql/schema/v1-core.sql @@ -659,7 +659,7 @@ CREATE TABLE v1_dag_data ( dag_inserted_at TIMESTAMPTZ NOT NULL, input JSONB NOT NULL, additional_metadata JSONB, - CONSTRAINT v1_dag_input_pkey PRIMARY KEY (dag_id, dag_inserted_at) + PRIMARY KEY (dag_id, dag_inserted_at) ); -- CreateTable From 4a29b6fa380c30d6a91b90242106d8c2373c7132 Mon Sep 17 00:00:00 2001 From: mrkaye97 Date: Tue, 18 Nov 2025 20:48:08 -0500 Subject: [PATCH 07/19] fix: migration versions --- .../{20250829173445_v1_0_41.sql => 20251116173445_v1_0_54.sql} | 0 .../{20250912172758_v1_0_43.sql => 20251117172758_v1_0_55.sql} | 0 .../{20251118172545_v1_0_54.sql => 20251118172545_v1_0_56.sql} | 0 3 files changed, 0 insertions(+), 0 deletions(-) rename cmd/hatchet-migrate/migrate/migrations/{20250829173445_v1_0_41.sql => 20251116173445_v1_0_54.sql} (100%) rename cmd/hatchet-migrate/migrate/migrations/{20250912172758_v1_0_43.sql => 20251117172758_v1_0_55.sql} (100%) rename cmd/hatchet-migrate/migrate/migrations/{20251118172545_v1_0_54.sql => 20251118172545_v1_0_56.sql} (100%) diff --git a/cmd/hatchet-migrate/migrate/migrations/20250829173445_v1_0_41.sql b/cmd/hatchet-migrate/migrate/migrations/20251116173445_v1_0_54.sql similarity index 100% rename from cmd/hatchet-migrate/migrate/migrations/20250829173445_v1_0_41.sql rename to cmd/hatchet-migrate/migrate/migrations/20251116173445_v1_0_54.sql diff --git a/cmd/hatchet-migrate/migrate/migrations/20250912172758_v1_0_43.sql b/cmd/hatchet-migrate/migrate/migrations/20251117172758_v1_0_55.sql similarity index 100% rename from cmd/hatchet-migrate/migrate/migrations/20250912172758_v1_0_43.sql rename to cmd/hatchet-migrate/migrate/migrations/20251117172758_v1_0_55.sql diff --git a/cmd/hatchet-migrate/migrate/migrations/20251118172545_v1_0_54.sql b/cmd/hatchet-migrate/migrate/migrations/20251118172545_v1_0_56.sql similarity index 100% rename from cmd/hatchet-migrate/migrate/migrations/20251118172545_v1_0_54.sql rename to cmd/hatchet-migrate/migrate/migrations/20251118172545_v1_0_56.sql From 8e830f3ea280e579f3af4636e85515b9374a8890 Mon Sep 17 00:00:00 2001 From: mrkaye97 Date: Tue, 18 Nov 2025 20:54:57 -0500 Subject: [PATCH 08/19] fix: some cleanup --- pkg/repository/v1/sqlcv1/dags.sql | 2 -- pkg/repository/v1/sqlcv1/dags.sql.go | 9 --------- pkg/repository/v1/sqlcv1/tasks.sql | 1 + pkg/repository/v1/task.go | 2 +- 4 files changed, 2 insertions(+), 12 deletions(-) diff --git a/pkg/repository/v1/sqlcv1/dags.sql b/pkg/repository/v1/sqlcv1/dags.sql index b32bc39c56..8933e61e16 100644 --- a/pkg/repository/v1/sqlcv1/dags.sql +++ b/pkg/repository/v1/sqlcv1/dags.sql @@ -64,5 +64,3 @@ INSERT INTO v1_dag_data ( $4 ); --- name: AnalyzeV1DAG :exec -ANALYZE v1_dag; diff --git a/pkg/repository/v1/sqlcv1/dags.sql.go b/pkg/repository/v1/sqlcv1/dags.sql.go index df57fcf43e..49f320d5f8 100644 --- a/pkg/repository/v1/sqlcv1/dags.sql.go +++ b/pkg/repository/v1/sqlcv1/dags.sql.go @@ -11,15 +11,6 @@ import ( "github.com/jackc/pgx/v5/pgtype" ) -const analyzeV1DAG = `-- name: AnalyzeV1DAG :exec -ANALYZE v1_dag -` - -func (q *Queries) AnalyzeV1DAG(ctx context.Context, db DBTX) error { - _, err := db.Exec(ctx, analyzeV1DAG) - return err -} - type CreateDAGDataParams struct { DagID int64 `json:"dag_id"` DagInsertedAt pgtype.Timestamptz `json:"dag_inserted_at"` diff --git a/pkg/repository/v1/sqlcv1/tasks.sql b/pkg/repository/v1/sqlcv1/tasks.sql index 102bca0939..af21095da5 100644 --- a/pkg/repository/v1/sqlcv1/tasks.sql +++ b/pkg/repository/v1/sqlcv1/tasks.sql @@ -8,6 +8,7 @@ SELECT , create_v1_range_partition('v1_payload', @date::date) , create_v1_range_partition('v1_dag_to_task', @date::date) , create_v1_range_partition('v1_dag_data', @date::date) + , create_v1_weekly_range_partition('v1_lookup_table', @date::date) ; -- name: EnsureTablePartitionsExist :one diff --git a/pkg/repository/v1/task.go b/pkg/repository/v1/task.go index 80a2dc3b29..007c26a8d9 100644 --- a/pkg/repository/v1/task.go +++ b/pkg/repository/v1/task.go @@ -3640,7 +3640,7 @@ func (r *TaskRepositoryImpl) AnalyzeTaskTables(ctx context.Context) error { return fmt.Errorf("error analyzing v1_dag_to_task: %v", err) } - err = r.queries.AnalyzeV1DAG(ctx, tx) + err = r.queries.AnalyzeV1Dag(ctx, tx) if err != nil { return fmt.Errorf("error analyzing v1_dag: %v", err) From 04cb22cc6260776d100b163c423dd886d90ba526 Mon Sep 17 00:00:00 2001 From: mrkaye97 Date: Tue, 18 Nov 2025 21:06:25 -0500 Subject: [PATCH 09/19] fix: comment --- pkg/repository/v1/sqlcv1/tasks.sql | 4 ++-- pkg/repository/v1/sqlcv1/tasks.sql.go | 5 +++-- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/pkg/repository/v1/sqlcv1/tasks.sql b/pkg/repository/v1/sqlcv1/tasks.sql index af21095da5..44091c63de 100644 --- a/pkg/repository/v1/sqlcv1/tasks.sql +++ b/pkg/repository/v1/sqlcv1/tasks.sql @@ -1,6 +1,6 @@ -- name: CreatePartitions :exec SELECT - -- intentionally formatted this way to limit merge conflicts + -- intentionally formatted this way to limit merge conflicts + diff sizes create_v1_range_partition('v1_task', @date::date) , create_v1_range_partition('v1_dag', @date::date) , create_v1_range_partition('v1_task_event', @date::date) @@ -43,7 +43,7 @@ FROM partition_check; -- name: ListPartitionsBeforeDate :many WITH --- intentionally formatted this way to limit merge conflicts +-- intentionally formatted this way to limit merge conflicts + diff sizes task_partitions AS ( SELECT 'v1_task' AS parent_table, p::text as partition_name FROM get_v1_partitions_before_date('v1_task', @date::date) AS p ) diff --git a/pkg/repository/v1/sqlcv1/tasks.sql.go b/pkg/repository/v1/sqlcv1/tasks.sql.go index ec0e24be47..34d846b410 100644 --- a/pkg/repository/v1/sqlcv1/tasks.sql.go +++ b/pkg/repository/v1/sqlcv1/tasks.sql.go @@ -140,7 +140,7 @@ func (q *Queries) CleanupWorkflowConcurrencySlotsAfterInsert(ctx context.Context const createPartitions = `-- name: CreatePartitions :exec SELECT - -- intentionally formatted this way to limit merge conflicts + -- intentionally formatted this way to limit merge conflicts + diff sizes create_v1_range_partition('v1_task', $1::date) , create_v1_range_partition('v1_dag', $1::date) , create_v1_range_partition('v1_task_event', $1::date) @@ -148,6 +148,7 @@ SELECT , create_v1_range_partition('v1_payload', $1::date) , create_v1_range_partition('v1_dag_to_task', $1::date) , create_v1_range_partition('v1_dag_data', $1::date) + , create_v1_weekly_range_partition('v1_lookup_table', $1::date) ` func (q *Queries) CreatePartitions(ctx context.Context, db DBTX, date pgtype.Date) error { @@ -1069,7 +1070,7 @@ type ListPartitionsBeforeDateRow struct { PartitionName string `json:"partition_name"` } -// intentionally formatted this way to limit merge conflicts +// intentionally formatted this way to limit merge conflicts + diff sizes func (q *Queries) ListPartitionsBeforeDate(ctx context.Context, db DBTX, date pgtype.Date) ([]*ListPartitionsBeforeDateRow, error) { rows, err := db.Query(ctx, listPartitionsBeforeDate, date) if err != nil { From 3c9b5f249a508e82c0595c4e010c3ac809487d8f Mon Sep 17 00:00:00 2001 From: mrkaye97 Date: Wed, 19 Nov 2025 09:25:57 -0500 Subject: [PATCH 10/19] feat: add func to rename partitions --- .../migrations/20251116173445_v1_0_54.sql | 72 +++++++++++++++++++ 1 file changed, 72 insertions(+) diff --git a/cmd/hatchet-migrate/migrate/migrations/20251116173445_v1_0_54.sql b/cmd/hatchet-migrate/migrate/migrations/20251116173445_v1_0_54.sql index 779a8db1d7..a82d49a9b3 100644 --- a/cmd/hatchet-migrate/migrate/migrations/20251116173445_v1_0_54.sql +++ b/cmd/hatchet-migrate/migrate/migrations/20251116173445_v1_0_54.sql @@ -2,6 +2,52 @@ -- +goose Up -- +goose StatementBegin BEGIN; + +CREATE OR REPLACE FUNCTION rename_partitions( + parent_table_name TEXT, + new_prefix TEXT +) +RETURNS TABLE(old_name TEXT, new_name TEXT) +LANGUAGE plpgsql +AS $$ +DECLARE + partition_record RECORD; + old_partition_name TEXT; + new_partition_name TEXT; + partition_suffix TEXT; +BEGIN + FOR partition_record IN + SELECT c.relname AS partition_name + FROM pg_inherits i + JOIN pg_class c ON i.inhrelid = c.oid + JOIN pg_namespace n ON c.relnamespace = n.oid + JOIN pg_class parent ON i.inhparent = parent.oid + WHERE parent.relname = parent_table_name + ORDER BY c.relname + LOOP + old_partition_name := partition_record.partition_name; + + partition_suffix := replace(old_partition_name, parent_table_name || '_', ''); + new_partition_name := new_prefix || '_' || partition_suffix; + + EXECUTE format('ALTER TABLE %I RENAME TO %I', + old_partition_name, + new_partition_name + ); + EXECUTE format('ALTER INDEX %I RENAME TO %I', + old_partition_name || '_pkey', + new_partition_name || '_pkey' + ); + + RETURN NEXT; + + RAISE NOTICE 'Renamed: % -> %', old_partition_name, new_partition_name; + END LOOP; + + RETURN; +END; +$$; + CREATE TABLE IF NOT EXISTS v1_lookup_table_partitioned ( tenant_id UUID NOT NULL, external_id UUID NOT NULL, @@ -139,9 +185,13 @@ ON CONFLICT (external_id, inserted_at) DO NOTHING; -- +goose StatementBegin BEGIN; DROP TABLE IF EXISTS v1_lookup_table; + +SELECT rename_partitions('v1_lookup_table_partitioned', 'v1_lookup_table'); + ALTER TABLE v1_lookup_table_partitioned RENAME TO v1_lookup_table; + CREATE OR REPLACE FUNCTION v1_dag_insert_function() RETURNS TRIGGER AS $$ @@ -317,7 +367,28 @@ ALTER INDEX v1_lookup_table_partitioned_pkey DROP TRIGGER IF EXISTS v1_lookup_table_partitioned_insert_trigger ON v1_lookup_table_partitioned; DROP FUNCTION IF EXISTS v1_lookup_table_partitioned_insert_function; + + COMMIT; + +CREATE OR REPLACE FUNCTION get_v1_weekly_partitions_before_date( + targetTableName text, + targetDate date +) RETURNS TABLE(partition_name text) + LANGUAGE plpgsql AS +$$ +BEGIN + RETURN QUERY + SELECT + inhrelid::regclass::text AS partition_name + FROM + pg_inherits + WHERE + inhparent = targetTableName::regclass + AND substring(inhrelid::regclass::text, format('%s_(\d{8})', targetTableName)) ~ '^\d{8}' + AND (substring(inhrelid::regclass::text, format('%s_(\d{8})', targetTableName))::date) < targetDate; +END; +$$; -- +goose StatementEnd -- +goose Down @@ -544,6 +615,7 @@ BEGIN END; $$ LANGUAGE plpgsql; +DROP FUNCTION rename_partitions(TEXT, TEXT); COMMIT; -- +goose StatementEnd From 8c44dd5caf67daf2096397a03598ecc45e425294 Mon Sep 17 00:00:00 2001 From: mrkaye97 Date: Wed, 19 Nov 2025 09:26:34 -0500 Subject: [PATCH 11/19] feat: add helper to schema --- sql/schema/v1-core.sql | 46 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 46 insertions(+) diff --git a/sql/schema/v1-core.sql b/sql/schema/v1-core.sql index 7c87d7b0fd..c7ceabee95 100644 --- a/sql/schema/v1-core.sql +++ b/sql/schema/v1-core.sql @@ -1781,3 +1781,49 @@ CREATE TABLE v1_operation_interval_settings ( interval_nanoseconds BIGINT NOT NULL, PRIMARY KEY (tenant_id, operation_id) ); + +-- helper to rename all of the partitions of a parent table +CREATE OR REPLACE FUNCTION rename_partitions( + parent_table_name TEXT, + new_prefix TEXT +) +RETURNS TABLE(old_name TEXT, new_name TEXT) +LANGUAGE plpgsql +AS $$ +DECLARE + partition_record RECORD; + old_partition_name TEXT; + new_partition_name TEXT; + partition_suffix TEXT; +BEGIN + FOR partition_record IN + SELECT c.relname AS partition_name + FROM pg_inherits i + JOIN pg_class c ON i.inhrelid = c.oid + JOIN pg_namespace n ON c.relnamespace = n.oid + JOIN pg_class parent ON i.inhparent = parent.oid + WHERE parent.relname = parent_table_name + ORDER BY c.relname + LOOP + old_partition_name := partition_record.partition_name; + + partition_suffix := replace(old_partition_name, parent_table_name || '_', ''); + new_partition_name := new_prefix || '_' || partition_suffix; + + EXECUTE format('ALTER TABLE %I RENAME TO %I', + old_partition_name, + new_partition_name + ); + EXECUTE format('ALTER INDEX %I RENAME TO %I', + old_partition_name || '_pkey', + new_partition_name || '_pkey' + ); + + RETURN NEXT; + + RAISE NOTICE 'Renamed: % -> %', old_partition_name, new_partition_name; + END LOOP; + + RETURN; +END; +$$; \ No newline at end of file From ffc2d51973b2193be810a37a9a557d7775bf1844 Mon Sep 17 00:00:00 2001 From: mrkaye97 Date: Wed, 19 Nov 2025 15:50:43 -0500 Subject: [PATCH 12/19] fix: use new func --- .../migrate/migrations/20251117172758_v1_0_55.sql | 4 ++-- .../migrate/migrations/20251118172545_v1_0_56.sql | 3 +-- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/cmd/hatchet-migrate/migrate/migrations/20251117172758_v1_0_55.sql b/cmd/hatchet-migrate/migrate/migrations/20251117172758_v1_0_55.sql index 2feb3adbf6..c448e431b3 100644 --- a/cmd/hatchet-migrate/migrate/migrations/20251117172758_v1_0_55.sql +++ b/cmd/hatchet-migrate/migrate/migrations/20251117172758_v1_0_55.sql @@ -43,8 +43,7 @@ BEGIN ); END $$; -ALTER TABLE v1_dag_to_task_partitioned RENAME TO v1_dag_to_task; -ALTER INDEX v1_dag_to_task_partitioned_pkey RENAME TO v1_dag_to_task_pkey; +SELECT rename_partitions('v1_dag_to_task_partitioned', 'v1_dag_to_task'); SELECT create_v1_range_partition('v1_dag_to_task', (NOW() + INTERVAL '1 day')::DATE); COMMIT; @@ -68,6 +67,7 @@ INSERT INTO v1_dag_to_task_original SELECT * FROM v1_dag_to_task; DROP TABLE v1_dag_to_task; + ALTER TABLE v1_dag_to_task_original RENAME TO v1_dag_to_task; ALTER INDEX v1_dag_to_task_original_pkey RENAME TO v1_dag_to_task_pkey; -- +goose StatementEnd diff --git a/cmd/hatchet-migrate/migrate/migrations/20251118172545_v1_0_56.sql b/cmd/hatchet-migrate/migrate/migrations/20251118172545_v1_0_56.sql index f7263c01e1..f679e18b65 100644 --- a/cmd/hatchet-migrate/migrate/migrations/20251118172545_v1_0_56.sql +++ b/cmd/hatchet-migrate/migrate/migrations/20251118172545_v1_0_56.sql @@ -43,8 +43,7 @@ BEGIN ); END $$; -ALTER TABLE v1_dag_data_partitioned RENAME TO v1_dag_data; -ALTER INDEX v1_dag_data_partitioned_pkey RENAME TO v1_dag_data_pkey; +SELECT rename_partitions('v1_dag_data_partitioned', 'v1_dag_data'); SELECT create_v1_range_partition('v1_dag_data', (NOW() + INTERVAL '1 day')::DATE); COMMIT; From 60bb6f6017c1bee51bdbd9fea6227b2e5b0cc158 Mon Sep 17 00:00:00 2001 From: mrkaye97 Date: Wed, 19 Nov 2025 15:50:56 -0500 Subject: [PATCH 13/19] chore: lint --- pkg/repository/v1/sqlcv1/dags.sql | 1 - sql/schema/v1-core.sql | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/pkg/repository/v1/sqlcv1/dags.sql b/pkg/repository/v1/sqlcv1/dags.sql index 8933e61e16..f784d74e5c 100644 --- a/pkg/repository/v1/sqlcv1/dags.sql +++ b/pkg/repository/v1/sqlcv1/dags.sql @@ -63,4 +63,3 @@ INSERT INTO v1_dag_data ( $3, $4 ); - diff --git a/sql/schema/v1-core.sql b/sql/schema/v1-core.sql index c7ceabee95..38a8c90b37 100644 --- a/sql/schema/v1-core.sql +++ b/sql/schema/v1-core.sql @@ -1826,4 +1826,4 @@ BEGIN RETURN; END; -$$; \ No newline at end of file +$$; From c030548eee67f5aa86ef17d8419ea271002732e6 Mon Sep 17 00:00:00 2001 From: mrkaye97 Date: Wed, 19 Nov 2025 15:56:54 -0500 Subject: [PATCH 14/19] fix: partition logic --- pkg/repository/v1/sqlcv1/tasks.sql | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/pkg/repository/v1/sqlcv1/tasks.sql b/pkg/repository/v1/sqlcv1/tasks.sql index 44091c63de..487e993d07 100644 --- a/pkg/repository/v1/sqlcv1/tasks.sql +++ b/pkg/repository/v1/sqlcv1/tasks.sql @@ -34,11 +34,8 @@ WITH tomorrow_date AS ( FROM expected_partitions ep LEFT JOIN pg_catalog.pg_tables pt ON pt.tablename = ep.expected_partition_name ) -SELECT - CASE - WHEN existing_partitions = total_tables THEN TRUE - ELSE FALSE - END AS all_partitions_exist + +SELECT existing_partitions = total_tables AS all_partitions_exist FROM partition_check; -- name: ListPartitionsBeforeDate :many @@ -65,6 +62,11 @@ task_partitions AS ( , dag_to_task_partitions AS ( SELECT 'v1_dag_to_task' AS parent_table, p::text as partition_name FROM get_v1_partitions_before_date('v1_dag_to_task', @date::date) AS p ) +, lookup_table_partitions AS ( + -- using one week before the provided date here since the lookup table partitions are weekly, and we don't want to + -- drop the current week's partition if the date is in the current week + SELECT 'v1_lookup_table' AS parent_table, p::text as partition_name FROM get_v1_partitions_before_date('v1_lookup_table', (@date - INTERVAL '1 week')::date) AS p +) SELECT * @@ -111,6 +113,12 @@ UNION ALL SELECT * FROM dag_data_partitions + +UNION ALL + +SELECT * +FROM + lookup_table_partitions ; -- name: DefaultTaskActivityGauge :one From c9f96ab1261ad80c3a0054f07b89ecec08a16678 Mon Sep 17 00:00:00 2001 From: mrkaye97 Date: Wed, 19 Nov 2025 16:09:51 -0500 Subject: [PATCH 15/19] fix: query syntax --- pkg/repository/v1/sqlcv1/tasks.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/repository/v1/sqlcv1/tasks.sql b/pkg/repository/v1/sqlcv1/tasks.sql index 487e993d07..3d35464652 100644 --- a/pkg/repository/v1/sqlcv1/tasks.sql +++ b/pkg/repository/v1/sqlcv1/tasks.sql @@ -35,7 +35,7 @@ WITH tomorrow_date AS ( LEFT JOIN pg_catalog.pg_tables pt ON pt.tablename = ep.expected_partition_name ) -SELECT existing_partitions = total_tables AS all_partitions_exist +SELECT (existing_partitions = total_tables) AS all_partitions_exist FROM partition_check; -- name: ListPartitionsBeforeDate :many From 47148c276d31dd22a8c2c84bfb955cb76134e2f5 Mon Sep 17 00:00:00 2001 From: mrkaye97 Date: Wed, 19 Nov 2025 16:14:29 -0500 Subject: [PATCH 16/19] Revert "fix: query syntax" This reverts commit c9f96ab1261ad80c3a0054f07b89ecec08a16678. --- pkg/repository/v1/sqlcv1/tasks.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/repository/v1/sqlcv1/tasks.sql b/pkg/repository/v1/sqlcv1/tasks.sql index 3d35464652..487e993d07 100644 --- a/pkg/repository/v1/sqlcv1/tasks.sql +++ b/pkg/repository/v1/sqlcv1/tasks.sql @@ -35,7 +35,7 @@ WITH tomorrow_date AS ( LEFT JOIN pg_catalog.pg_tables pt ON pt.tablename = ep.expected_partition_name ) -SELECT (existing_partitions = total_tables) AS all_partitions_exist +SELECT existing_partitions = total_tables AS all_partitions_exist FROM partition_check; -- name: ListPartitionsBeforeDate :many From 7ee215ebfdd42bbd1fd1c1ac40d37002ce5d4b0c Mon Sep 17 00:00:00 2001 From: mrkaye97 Date: Wed, 19 Nov 2025 16:25:55 -0500 Subject: [PATCH 17/19] fix: sql --- pkg/repository/v1/sqlcv1/tasks.sql | 4 +--- pkg/repository/v1/sqlcv1/tasks.sql.go | 25 ++++++++++++++++++------- pkg/repository/v1/task.go | 16 +++++++++++++--- 3 files changed, 32 insertions(+), 13 deletions(-) diff --git a/pkg/repository/v1/sqlcv1/tasks.sql b/pkg/repository/v1/sqlcv1/tasks.sql index 487e993d07..140005cbe9 100644 --- a/pkg/repository/v1/sqlcv1/tasks.sql +++ b/pkg/repository/v1/sqlcv1/tasks.sql @@ -63,9 +63,7 @@ task_partitions AS ( SELECT 'v1_dag_to_task' AS parent_table, p::text as partition_name FROM get_v1_partitions_before_date('v1_dag_to_task', @date::date) AS p ) , lookup_table_partitions AS ( - -- using one week before the provided date here since the lookup table partitions are weekly, and we don't want to - -- drop the current week's partition if the date is in the current week - SELECT 'v1_lookup_table' AS parent_table, p::text as partition_name FROM get_v1_partitions_before_date('v1_lookup_table', (@date - INTERVAL '1 week')::date) AS p + SELECT 'v1_dag_to_task' AS parent_table, p::text as partition_name FROM get_v1_partitions_before_date('v1_dag_to_task', @oneWeekAgo::date) AS p ) SELECT diff --git a/pkg/repository/v1/sqlcv1/tasks.sql.go b/pkg/repository/v1/sqlcv1/tasks.sql.go index 34d846b410..4cd8cf759e 100644 --- a/pkg/repository/v1/sqlcv1/tasks.sql.go +++ b/pkg/repository/v1/sqlcv1/tasks.sql.go @@ -251,11 +251,8 @@ WITH tomorrow_date AS ( FROM expected_partitions ep LEFT JOIN pg_catalog.pg_tables pt ON pt.tablename = ep.expected_partition_name ) -SELECT - CASE - WHEN existing_partitions = total_tables THEN TRUE - ELSE FALSE - END AS all_partitions_exist + +SELECT existing_partitions = total_tables AS all_partitions_exist FROM partition_check ` @@ -1017,6 +1014,9 @@ task_partitions AS ( , dag_to_task_partitions AS ( SELECT 'v1_dag_to_task' AS parent_table, p::text as partition_name FROM get_v1_partitions_before_date('v1_dag_to_task', $1::date) AS p ) +, lookup_table_partitions AS ( + SELECT 'v1_dag_to_task' AS parent_table, p::text as partition_name FROM get_v1_partitions_before_date('v1_dag_to_task', $2::date) AS p +) SELECT parent_table, partition_name @@ -1063,16 +1063,27 @@ UNION ALL SELECT parent_table, partition_name FROM dag_data_partitions + +UNION ALL + +SELECT parent_table, partition_name +FROM + lookup_table_partitions ` +type ListPartitionsBeforeDateParams struct { + Date pgtype.Date `json:"date"` + Oneweekago pgtype.Date `json:"oneweekago"` +} + type ListPartitionsBeforeDateRow struct { ParentTable string `json:"parent_table"` PartitionName string `json:"partition_name"` } // intentionally formatted this way to limit merge conflicts + diff sizes -func (q *Queries) ListPartitionsBeforeDate(ctx context.Context, db DBTX, date pgtype.Date) ([]*ListPartitionsBeforeDateRow, error) { - rows, err := db.Query(ctx, listPartitionsBeforeDate, date) +func (q *Queries) ListPartitionsBeforeDate(ctx context.Context, db DBTX, arg ListPartitionsBeforeDateParams) ([]*ListPartitionsBeforeDateRow, error) { + rows, err := db.Query(ctx, listPartitionsBeforeDate, arg.Date, arg.Oneweekago) if err != nil { return nil, err } diff --git a/pkg/repository/v1/task.go b/pkg/repository/v1/task.go index 007c26a8d9..4f1598cf6d 100644 --- a/pkg/repository/v1/task.go +++ b/pkg/repository/v1/task.go @@ -317,6 +317,10 @@ func (r *TaskRepositoryImpl) UpdateTablePartitions(ctx context.Context) error { tomorrow := today.AddDate(0, 0, 1) removeBefore := today.Add(-1 * r.taskRetentionPeriod) + // using one week before the provided date here since the lookup table partitions are weekly, and we don't want to + // drop the current week's partition if the date is in the current week + oneAdditionalWeekBefore := removeBefore.AddDate(0, 0, -7) + err = r.queries.CreatePartitions(ctx, r.pool, pgtype.Date{ Time: today, Valid: true, @@ -335,9 +339,15 @@ func (r *TaskRepositoryImpl) UpdateTablePartitions(ctx context.Context) error { return err } - partitions, err := r.queries.ListPartitionsBeforeDate(ctx, r.pool, pgtype.Date{ - Time: removeBefore, - Valid: true, + partitions, err := r.queries.ListPartitionsBeforeDate(ctx, r.pool, sqlcv1.ListPartitionsBeforeDateParams{ + Date: pgtype.Date{ + Time: removeBefore, + Valid: true, + }, + Oneweekago: pgtype.Date{ + Time: oneAdditionalWeekBefore, + Valid: true, + }, }) if err != nil { From 0f4139fff3b2ce0b2aa08c47d68524babb303592 Mon Sep 17 00:00:00 2001 From: mrkaye97 Date: Wed, 19 Nov 2025 16:29:27 -0500 Subject: [PATCH 18/19] fix: migrations --- .../migrate/migrations/20251117172758_v1_0_55.sql | 3 ++- .../migrate/migrations/20251118172545_v1_0_56.sql | 4 ++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/cmd/hatchet-migrate/migrate/migrations/20251117172758_v1_0_55.sql b/cmd/hatchet-migrate/migrate/migrations/20251117172758_v1_0_55.sql index c448e431b3..9d2f82a966 100644 --- a/cmd/hatchet-migrate/migrate/migrations/20251117172758_v1_0_55.sql +++ b/cmd/hatchet-migrate/migrate/migrations/20251117172758_v1_0_55.sql @@ -43,7 +43,8 @@ BEGIN ); END $$; -SELECT rename_partitions('v1_dag_to_task_partitioned', 'v1_dag_to_task'); +ALTER TABLE v1_dag_to_task_partitioned RENAME TO v1_dag_to_task; +ALTER INDEX v1_dag_to_task_partitioned_pkey RENAME TO v1_dag_to_task_pkey; SELECT create_v1_range_partition('v1_dag_to_task', (NOW() + INTERVAL '1 day')::DATE); COMMIT; diff --git a/cmd/hatchet-migrate/migrate/migrations/20251118172545_v1_0_56.sql b/cmd/hatchet-migrate/migrate/migrations/20251118172545_v1_0_56.sql index f679e18b65..576ef48e85 100644 --- a/cmd/hatchet-migrate/migrate/migrations/20251118172545_v1_0_56.sql +++ b/cmd/hatchet-migrate/migrate/migrations/20251118172545_v1_0_56.sql @@ -43,8 +43,8 @@ BEGIN ); END $$; -SELECT rename_partitions('v1_dag_data_partitioned', 'v1_dag_data'); - +ALTER TABLE v1_dag_data_partitioned RENAME TO v1_dag_data; +ALTER INDEX v1_dag_data_partitioned_pkey RENAME TO v1_dag_data_pkey; SELECT create_v1_range_partition('v1_dag_data', (NOW() + INTERVAL '1 day')::DATE); COMMIT; From f44bf1ddf932bb62450158f6ab760ab9b4425edf Mon Sep 17 00:00:00 2001 From: mrkaye97 Date: Thu, 20 Nov 2025 12:27:21 -0500 Subject: [PATCH 19/19] fix: ordering --- .../migrations/20251116173445_v1_0_54.sql | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) diff --git a/cmd/hatchet-migrate/migrate/migrations/20251116173445_v1_0_54.sql b/cmd/hatchet-migrate/migrate/migrations/20251116173445_v1_0_54.sql index a82d49a9b3..10977b8f81 100644 --- a/cmd/hatchet-migrate/migrate/migrations/20251116173445_v1_0_54.sql +++ b/cmd/hatchet-migrate/migrate/migrations/20251116173445_v1_0_54.sql @@ -186,11 +186,17 @@ ON CONFLICT (external_id, inserted_at) DO NOTHING; BEGIN; DROP TABLE IF EXISTS v1_lookup_table; +DROP TRIGGER IF EXISTS v1_lookup_table_partitioned_insert_trigger ON v1_lookup_table_partitioned; +DROP FUNCTION IF EXISTS v1_lookup_table_partitioned_insert_function; + SELECT rename_partitions('v1_lookup_table_partitioned', 'v1_lookup_table'); ALTER TABLE v1_lookup_table_partitioned RENAME TO v1_lookup_table; +ALTER INDEX v1_lookup_table_partitioned_pkey + RENAME TO v1_lookup_table_pkey; + CREATE OR REPLACE FUNCTION v1_dag_insert_function() RETURNS TRIGGER AS @@ -360,17 +366,6 @@ END; $$ LANGUAGE plpgsql; - - -ALTER INDEX v1_lookup_table_partitioned_pkey - RENAME TO v1_lookup_table_pkey; - -DROP TRIGGER IF EXISTS v1_lookup_table_partitioned_insert_trigger ON v1_lookup_table_partitioned; -DROP FUNCTION IF EXISTS v1_lookup_table_partitioned_insert_function; - - -COMMIT; - CREATE OR REPLACE FUNCTION get_v1_weekly_partitions_before_date( targetTableName text, targetDate date @@ -389,6 +384,8 @@ BEGIN AND (substring(inhrelid::regclass::text, format('%s_(\d{8})', targetTableName))::date) < targetDate; END; $$; + +COMMIT; -- +goose StatementEnd -- +goose Down