Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
618 changes: 618 additions & 0 deletions cmd/hatchet-migrate/migrate/migrations/20251116173445_v1_0_54.sql

Large diffs are not rendered by default.

74 changes: 74 additions & 0 deletions cmd/hatchet-migrate/migrate/migrations/20251117172758_v1_0_55.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
-- +goose Up
-- +goose StatementBegin
-- +goose no transaction
BEGIN;

CREATE TABLE v1_dag_to_task_partitioned (
dag_id BIGINT NOT NULL,
dag_inserted_at TIMESTAMPTZ NOT NULL,
task_id BIGINT NOT NULL,
task_inserted_at TIMESTAMPTZ NOT NULL,
PRIMARY KEY (dag_id, dag_inserted_at, task_id, task_inserted_at)
) PARTITION BY RANGE(dag_inserted_at);

DO $$
DECLARE
new_table_name TEXT;
BEGIN
-- We want to attach all of the existing data as today's partition, which
-- includes everything up until the _end_ of today.
-- The new partition will be named with today's date, but the end time will be tomorrow (midnight).
new_table_name := 'v1_dag_to_task_' || TO_CHAR(NOW()::DATE, 'YYYYMMDD');

RAISE NOTICE 'Renaming existing table to %', new_table_name;

EXECUTE format('ALTER TABLE v1_dag_to_task RENAME TO %I', new_table_name);
EXECUTE format('ALTER INDEX v1_dag_to_task_pkey RENAME TO %I', new_table_name || '_pkey');

EXECUTE
format('ALTER TABLE %s SET (
autovacuum_vacuum_scale_factor = ''0.1'',
autovacuum_analyze_scale_factor=''0.05'',
autovacuum_vacuum_threshold=''25'',
autovacuum_analyze_threshold=''25'',
autovacuum_vacuum_cost_delay=''10'',
autovacuum_vacuum_cost_limit=''1000''
)', new_table_name);

EXECUTE
format(
'ALTER TABLE v1_dag_to_task_partitioned ATTACH PARTITION %s FOR VALUES FROM (''19700101'') TO (''%s'')',
new_table_name,
TO_CHAR((NOW() + INTERVAL '1 day')::DATE, 'YYYYMMDD')
);
END $$;

ALTER TABLE v1_dag_to_task_partitioned RENAME TO v1_dag_to_task;
ALTER INDEX v1_dag_to_task_partitioned_pkey RENAME TO v1_dag_to_task_pkey;

SELECT create_v1_range_partition('v1_dag_to_task', (NOW() + INTERVAL '1 day')::DATE);
COMMIT;

BEGIN;
ANALYZE v1_dag_to_task;
COMMIT;
-- +goose StatementEnd

-- +goose Down
-- +goose StatementBegin
CREATE TABLE v1_dag_to_task_original (
dag_id BIGINT NOT NULL,
dag_inserted_at TIMESTAMPTZ NOT NULL,
task_id BIGINT NOT NULL,
task_inserted_at TIMESTAMPTZ NOT NULL,
PRIMARY KEY (dag_id, dag_inserted_at, task_id, task_inserted_at)
);

INSERT INTO v1_dag_to_task_original
SELECT * FROM v1_dag_to_task;

DROP TABLE v1_dag_to_task;

ALTER TABLE v1_dag_to_task_original RENAME TO v1_dag_to_task;
ALTER INDEX v1_dag_to_task_original_pkey RENAME TO v1_dag_to_task_pkey;
-- +goose StatementEnd
72 changes: 72 additions & 0 deletions cmd/hatchet-migrate/migrate/migrations/20251118172545_v1_0_56.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
-- +goose Up
-- +goose StatementBegin
-- +goose no transaction
BEGIN;

CREATE TABLE v1_dag_data_partitioned (
dag_id BIGINT NOT NULL,
dag_inserted_at TIMESTAMPTZ NOT NULL,
input JSONB NOT NULL,
additional_metadata JSONB,
PRIMARY KEY (dag_id, dag_inserted_at)
) PARTITION BY RANGE(dag_inserted_at);

DO $$
DECLARE
new_table_name TEXT;
BEGIN
-- We want to attach all of the existing data as today's partition, which
-- includes everything up until the _end_ of today.
-- The new partition will be named with today's date, but the end time will be tomorrow (midnight).
new_table_name := 'v1_dag_data_' || TO_CHAR(NOW()::DATE, 'YYYYMMDD');

RAISE NOTICE 'Renaming existing table to %', new_table_name;

EXECUTE format('ALTER TABLE v1_dag_data RENAME TO %I', new_table_name);
EXECUTE format('ALTER INDEX v1_dag_input_pkey RENAME TO %I', new_table_name || '_pkey');

EXECUTE
format('ALTER TABLE %s SET (
autovacuum_vacuum_scale_factor = ''0.1'',
autovacuum_analyze_scale_factor=''0.05'',
autovacuum_vacuum_threshold=''25'',
autovacuum_analyze_threshold=''25'',
autovacuum_vacuum_cost_delay=''10'',
autovacuum_vacuum_cost_limit=''1000''
)', new_table_name);

EXECUTE
format(
'ALTER TABLE v1_dag_data_partitioned ATTACH PARTITION %s FOR VALUES FROM (''19700101'') TO (''%s'')',
new_table_name,
TO_CHAR((NOW() + INTERVAL '1 day')::DATE, 'YYYYMMDD')
);
END $$;

ALTER TABLE v1_dag_data_partitioned RENAME TO v1_dag_data;
ALTER INDEX v1_dag_data_partitioned_pkey RENAME TO v1_dag_data_pkey;
SELECT create_v1_range_partition('v1_dag_data', (NOW() + INTERVAL '1 day')::DATE);
COMMIT;

BEGIN;
ANALYZE v1_dag_data;
COMMIT;
-- +goose StatementEnd

-- +goose Down
-- +goose StatementBegin
CREATE TABLE v1_dag_data_original (
dag_id BIGINT NOT NULL,
dag_inserted_at TIMESTAMPTZ NOT NULL,
input JSONB NOT NULL,
additional_metadata JSONB,
PRIMARY KEY (dag_id, dag_inserted_at)
);

INSERT INTO v1_dag_data_original
SELECT * FROM v1_dag_data;

DROP TABLE v1_dag_data;
ALTER TABLE v1_dag_data_original RENAME TO v1_dag_data;
ALTER INDEX v1_dag_data_original_pkey RENAME TO v1_dag_input_pkey;
-- +goose StatementEnd
18 changes: 18 additions & 0 deletions pkg/repository/v1/olap.go
Original file line number Diff line number Diff line change
Expand Up @@ -2604,6 +2604,24 @@ func (r *OLAPRepositoryImpl) AnalyzeOLAPTables(ctx context.Context) error {
return fmt.Errorf("error analyzing v1_dags_olap: %v", err)
}

err = r.queries.AnalyzeV1EventsOLAP(ctx, tx)

if err != nil {
return fmt.Errorf("error analyzing v1_events_olap: %v", err)
}

err = r.queries.AnalyzeV1EventLookupTableOLAP(ctx, tx)

if err != nil {
return fmt.Errorf("error analyzing v1_event_lookup_table_olap: %v", err)
}

err = r.queries.AnalyzeV1EventToRunOLAP(ctx, tx)

if err != nil {
return fmt.Errorf("error analyzing v1_event_to_run_olap: %v", err)
}

err = r.queries.AnalyzeV1DAGToTaskOLAP(ctx, tx)

if err != nil {
Expand Down
9 changes: 9 additions & 0 deletions pkg/repository/v1/sqlcv1/olap.sql
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,15 @@ ANALYZE v1_tasks_olap;
-- name: AnalyzeV1DAGsOLAP :exec
ANALYZE v1_dags_olap;

-- name: AnalyzeV1EventsOLAP :exec
ANALYZE v1_events_olap;

-- name: AnalyzeV1EventLookupTableOLAP :exec
ANALYZE v1_event_lookup_table_olap;

-- name: AnalyzeV1EventToRunOLAP :exec
ANALYZE v1_event_to_run_olap;

-- name: AnalyzeV1DAGToTaskOLAP :exec
ANALYZE v1_dag_to_task_olap;

Expand Down
27 changes: 27 additions & 0 deletions pkg/repository/v1/sqlcv1/olap.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

76 changes: 61 additions & 15 deletions pkg/repository/v1/sqlcv1/tasks.sql
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
-- name: CreatePartitions :exec
SELECT
create_v1_range_partition('v1_task', @date::date),
create_v1_range_partition('v1_dag', @date::date),
create_v1_range_partition('v1_task_event', @date::date),
create_v1_range_partition('v1_log_line', @date::date),
create_v1_range_partition('v1_payload', @date::date);
-- intentionally formatted this way to limit merge conflicts + diff sizes
create_v1_range_partition('v1_task', @date::date)
, create_v1_range_partition('v1_dag', @date::date)
, create_v1_range_partition('v1_task_event', @date::date)
, create_v1_range_partition('v1_log_line', @date::date)
, create_v1_range_partition('v1_payload', @date::date)
, create_v1_range_partition('v1_dag_to_task', @date::date)
, create_v1_range_partition('v1_dag_data', @date::date)
, create_v1_weekly_range_partition('v1_lookup_table', @date::date)
;

-- name: EnsureTablePartitionsExist :one
WITH tomorrow_date AS (
Expand All @@ -18,32 +23,48 @@ WITH tomorrow_date AS (
SELECT 'v1_task_event_' || to_char((SELECT date FROM tomorrow_date), 'YYYYMMDD')
UNION ALL
SELECT 'v1_log_line_' || to_char((SELECT date FROM tomorrow_date), 'YYYYMMDD')
UNION ALL
SELECT 'v1_payload' || to_char((SELECT date FROM tomorrow_date), 'YYYYMMDD')
UNION ALL
SELECT 'v1_dag_data' || to_char((SELECT date FROM tomorrow_date), 'YYYYMMDD')
), partition_check AS (
SELECT
COUNT(*) AS total_tables,
COUNT(pt.tablename) AS existing_partitions
FROM expected_partitions ep
LEFT JOIN pg_catalog.pg_tables pt ON pt.tablename = ep.expected_partition_name
)
SELECT
CASE
WHEN existing_partitions = total_tables THEN TRUE
ELSE FALSE
END AS all_partitions_exist

SELECT existing_partitions = total_tables AS all_partitions_exist
FROM partition_check;

-- name: ListPartitionsBeforeDate :many
WITH task_partitions AS (
WITH
-- intentionally formatted this way to limit merge conflicts + diff sizes
task_partitions AS (
SELECT 'v1_task' AS parent_table, p::text as partition_name FROM get_v1_partitions_before_date('v1_task', @date::date) AS p
), dag_partitions AS (
)
, dag_partitions AS (
SELECT 'v1_dag' AS parent_table, p::text as partition_name FROM get_v1_partitions_before_date('v1_dag', @date::date) AS p
), task_event_partitions AS (
)
, task_event_partitions AS (
SELECT 'v1_task_event' AS parent_table, p::text as partition_name FROM get_v1_partitions_before_date('v1_task_event', @date::date) AS p
), log_line_partitions AS (
)
, log_line_partitions AS (
SELECT 'v1_log_line' AS parent_table, p::text as partition_name FROM get_v1_partitions_before_date('v1_log_line', @date::date) AS p
), payload_partitions AS (
)
, payload_partitions AS (
SELECT 'v1_payload' AS parent_table, p::text as partition_name FROM get_v1_partitions_before_date('v1_payload', @date::date) AS p
)
, dag_data_partitions AS (
SELECT 'v1_dag_data' AS parent_table, p::text as partition_name FROM get_v1_partitions_before_date('v1_dag_data', @date::date) AS p
)
, dag_to_task_partitions AS (
SELECT 'v1_dag_to_task' AS parent_table, p::text as partition_name FROM get_v1_partitions_before_date('v1_dag_to_task', @date::date) AS p
)
, lookup_table_partitions AS (
SELECT 'v1_dag_to_task' AS parent_table, p::text as partition_name FROM get_v1_partitions_before_date('v1_dag_to_task', @oneWeekAgo::date) AS p
)

SELECT
*
Expand Down Expand Up @@ -77,6 +98,25 @@ SELECT
*
FROM
payload_partitions

UNION ALL

SELECT
*
FROM
dag_to_task_partitions

UNION ALL

SELECT *
FROM
dag_data_partitions

UNION ALL

SELECT *
FROM
lookup_table_partitions
;

-- name: DefaultTaskActivityGauge :one
Expand Down Expand Up @@ -919,9 +959,15 @@ ANALYZE v1_task;
-- name: AnalyzeV1TaskEvent :exec
ANALYZE v1_task_event;

-- name: AnalyzeV1DAGToTask :exec
ANALYZE v1_dag_to_task;

-- name: AnalyzeV1Dag :exec
ANALYZE v1_dag;

-- name: AnalyzeV1DagData :exec
ANALYZE v1_dag_data;

-- name: CleanupV1TaskRuntime :execresult
WITH locked_trs AS (
SELECT vtr.task_id, vtr.task_inserted_at, vtr.retry_count
Expand Down
Loading
Loading